详细的.Net并行编程高级教程--Parallel

一直觉得自己对并发了解不够深入,特别是看了《代码整洁之道》觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准。而且在《失控》这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物。人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情。所以好奇心驱使下学习并发。便有了此文。

一、理解硬件线程和软件线程

 
 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够同时并行运行。硬件线程也称为逻辑内核,一个物理内

核可以使用超线程技术提供多个硬件线程。所以一个硬件线程并不代表一个物理内核;Windows中每个运行的程序都是一个进程,每一个进程都会创建并运行
一个或多个线程,这些线程称为软件线程。硬件线程就像是一条泳道,而软件线程就是在其中游泳的人。

二、并行场合

  .Net Framework4 引入了新的Task Parallel Library(任务并行库,TPL),它支持数据并行、任务并行和流水线。让开发人员应付不同的并行场合。

  • 数据并行:有大量数据需要处理,并且必须对每一份数据执行同样的操作。比如通过256bit的密钥对100个Unicode字符串进行AES算法加密。
  • 任务并行:通过任务并发运行不同的操作。例如生成文件散列码,加密字符串,创建缩略图。
  • 流水线:这是任务并行和数据并行的结合体。

  TPL引入了System.Threading.Tasks
,主类是Task,这个类表示一个异步的并发的操作,然而我们不一定要使用Task类的实例,可以使用Parallel静态类。它提供了
Parallel.Invoke, Parallel.For Parallel.Forecah 三个方法。

三、Parallel.Invoke

   试图让很多方法并行运行的最简单的方法就是使用Parallel类的Invoke方法。例如有四个方法:

  • WatchMovie
  • HaveDinner
  • ReadBook
  • WriteBlog

  通过下面的代码就可以使用并行。

System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);

这段代码会创建指向每一个方法的委托。Invoke方法接受一个Action的参数组。


1

public static void Invoke(params Action[] actions);

用lambda表达式或匿名委托可以达到同样的效果。

System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });

 1.没有特定的执行顺序。

 Parallel.Invoke方法只有在4个方法全部完成之后才会返回。它至少需要4个硬件线程才足以让这4个方法并发运行。但并不保证这4个方法能够同时启动运行,如果一个或者多个内核处于繁忙状态,那么底层的调度逻辑可能会延迟某些方法的初始化执行。

给方法加上延时,就可以看到必须等待最长的方法执行完成才回到主方法。


  1. static void Main(string[] args) 
  2.         { 
  3.             System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, 
  4.                 WriteBlog); 
  5.             Console.WriteLine("执行完成"); 
  6.             Console.ReadKey(); 
  7.         } 
  8.  
  9.         static void WatchMovie() 
  10.         { 
  11.             Thread.Sleep(5000); 
  12.             Console.WriteLine("看电影"); 
  13.         } 
  14.         static void HaveDinner() 
  15.         { 
  16.             Thread.Sleep(1000); 
  17.             Console.WriteLine("吃晚饭"); 
  18.         } 
  19.         static void ReadBook() 
  20.         { 
  21.             Thread.Sleep(2000); 
  22.             Console.WriteLine("读书"); 
  23.         } 
  24.         static void WriteBlog() 
  25.         { 
  26.             Thread.Sleep(3000); 
  27.             Console.WriteLine("写博客"); 
  28.         } 

这样会造成很多逻辑内核处于长时间闲置状态。

四、Parallel.For

Parallel.For为固定数目的独立For循环迭代提供了负载均衡 (即将工作分发到不同的任务中执行,这样所有的任务在大部分时间都可以保持繁忙) 的并行执行。从而能尽可能地充分利用所有的可用的内核。

我们比较下下面两个方法,一个使用For循环,一个使用Parallel.For  都是生成密钥在转换为十六进制字符串。


  1. private static void GenerateAESKeys() 
  2.         { 
  3.             var sw = Stopwatch.StartNew(); 
  4.             for (int i = 0; i < NUM_AES_KEYS; i++) 
  5.             { 
  6.                 var aesM = new AesManaged(); 
  7.                 aesM.GenerateKey(); 
  8.                 byte[] result = aesM.Key; 
  9.                 string hexStr = ConverToHexString(result); 
  10.             } 
  11.             Console.WriteLine("AES:"+sw.Elapsed.ToString()); 
  12.         } 
  13.  
  14.  private static void ParallelGenerateAESKeys() 
  15.         { 
  16.             var sw = Stopwatch.StartNew(); 
  17.             System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) => 
  18.             { 
  19.                 var aesM = new AesManaged(); 
  20.                 aesM.GenerateKey(); 
  21.                 byte[] result = aesM.Key; 
  22.                 string hexStr = ConverToHexString(result); 
  23.             }); 
  24.  
  25.             Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString()); 
  26.         } 

private static int NUM_AES_KEYS = 100000;
        static void Main(string[] args)
        {
            Console.WriteLine("执行"+NUM_AES_KEYS+"次:"); GenerateAESKeys();
            ParallelGenerateAESKeys();
            Console.ReadKey();
        }

执行1000000次

这里并行的时间是串行的一半。

五、Parallel.ForEach

在Parallel.For中,有时候对既有循环进行优化可能会是一个非常复杂的任务。Parallel.ForEach为固定数目的独立For

Each循环迭代提供了负载均衡的并行执行,且支持自定义分区器,让使用者可以完全掌握数据分发。实质就是将所有要处理的数据区分为多个部分,然后并行运
行这些串行循环。

修改上面的代码:


  1. System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range => 
  2.             { 
  3.                 var aesM = new AesManaged(); 
  4.                 Console.WriteLine("AES Range({0},{1} 循环开始时间:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay); 
  5.  
  6.                 for (int i = range.Item1; i < range.Item2; i++) 
  7.                 { 
  8.                     aesM.GenerateKey(); 
  9.                     byte[] result = aesM.Key; 
  10.                     string hexStr = ConverToHexString(result); 
  11.                 } 
  12.                 Console.WriteLine("AES:"+sw.Elapsed.ToString()); 
  13.             }); 

从执行结果可以看出,分了13个段执行的。

第二次执行还是13个段。速度上稍微有差异。开始没有指定分区数,Partitioner.Create使用的是内置默认值。

而且我们发现这些分区并不是同时执行的,大致是分了三个时间段执行。而且执行顺序是不同的。总的时间和Parallel.For的方法差不多。

public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)

Parallel.ForEach方法定义了source和Body两个参数。source是指分区器。提供了分解为多个分区的数据源。body是

要调用的委托。它接受每一个已定义的分区作为参数。一共有20多个重载,在上面的例子中,分区的类型为Tuple<int,int>,是一个
二元组类型。此外,返回一个ParallelLoopResult的值。

Partitioner.Create 创建分区是根据逻辑内核数及其他因素决定。


  1. public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive) 
  2.     { 
  3.       int num = 3; 
  4.       if (toExclusive <= fromInclusive) 
  5.         throw new ArgumentOutOfRangeException("toExclusive"); 
  6.       int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num); 
  7.       if (rangeSize == 0) 
  8.         rangeSize = 1; 
  9.       return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); 
  10.     } 

因此我们可以修改分区数目,rangesize大致为250000左右。也就是说我的逻辑内核是4.

var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
   System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>

再次执行:

分区变成了四个,时间上没有多大差别(第一个时间是串行时间)。我们看见这四个分区几乎是同时执行的。大部分情况下,TPL在幕后使用的负载均衡机制都是非常高效的,然而对分区的控制便于使用者对自己的工作负载进行分析,来改进整体的性能。

Parallel.ForEach也能对IEnumerable<int>集合进行重构。Enumerable.Range生产了序列化的数目。但这样就没有上面的分区效果。


  1. private static void ParallelForEachGenerateMD5HasHes() 
  2.         { 
  3.             var sw = Stopwatch.StartNew(); 
  4.             System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number => 
  5.             { 
  6.                 var md5M = MD5.Create(); 
  7.                 byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); 
  8.                 byte[] result = md5M.ComputeHash(data); 
  9.                 string hexString = ConverToHexString(result); 
  10.             }); 
  11.             Console.WriteLine("MD5:"+sw.Elapsed.ToString()); 
  12.         } 

六、从循环中退出

和串行运行中的break不同,ParallelLoopState 提供了两个方法用于停止Parallel.For 和 Parallel.ForEach的执行。

  • Break:让循环在执行了当前迭代后尽快停止执行。比如执行到100了,那么循环会处理掉所有小于100的迭代。
  • Stop:让循环尽快停止执行。如果执行到了100的迭代,那不能保证处理完所有小于100的迭代。

修改上面的方法:执行3秒后退出。


  1. private static void ParallelLoopResult(ParallelLoopResult loopResult) 
  2.         { 
  3.             string text; 
  4.             if (loopResult.IsCompleted) 
  5.             { 
  6.                 text = "循环完成"; 
  7.             } 
  8.             else 
  9.             { 
  10.                 if (loopResult.LowestBreakIteration.HasValue) 
  11.                 { 
  12.                     text = "Break终止"; 
  13.                 } 
  14.                 else 
  15.                 { 
  16.                     text = "Stop 终止"; 
  17.                 } 
  18.             } 
  19.             Console.WriteLine(text); 
  20.         } 
  21.  
  22.  
  23.         private static void ParallelForEachGenerateMD5HasHesBreak() 
  24.         { 
  25.             var sw = Stopwatch.StartNew(); 
  26.             var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) => 
  27.             { 
  28.                 var md5M = MD5.Create(); 
  29.                 byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); 
  30.                 byte[] result = md5M.ComputeHash(data); 
  31.                 string hexString = ConverToHexString(result); 
  32.                 if (sw.Elapsed.Seconds > 3) 
  33.                 { 
  34.                     loopState.Stop(); 
  35.                 } 
  36.             }); 
  37.             ParallelLoopResult(loopresult); 
  38.             Console.WriteLine("MD5:" + sw.Elapsed); 
  39.         } 

七、捕捉并行循环中发生的异常。

当并行迭代中调用的委托抛出异常,这个异常没有在委托中被捕获到时,就会变成一组异常,新的System.AggregateException负责处理这一组异常。


  1. private static void ParallelForEachGenerateMD5HasHesException() 
  2.         { 
  3.             var sw = Stopwatch.StartNew(); 
  4.             var loopresult = new ParallelLoopResult(); 
  5.             try 
  6.             { 
  7.                 loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) => 
  8.                 { 
  9.                     var md5M = MD5.Create(); 
  10.                     byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); 
  11.                     byte[] result = md5M.ComputeHash(data); 
  12.                     string hexString = ConverToHexString(result); 
  13.                     if (sw.Elapsed.Seconds > 3) 
  14.                     { 
  15.                         throw new TimeoutException("执行超过三秒"); 
  16.                     } 
  17.                 }); 
  18.             } 
  19.             catch (AggregateException ex) 
  20.             { 
  21.                 foreach (var innerEx in  ex.InnerExceptions) 
  22.                 { 
  23.                     Console.WriteLine(innerEx.ToString()); 
  24.                 } 
  25.             } 
  26.             
  27.             ParallelLoopResult(loopresult); 
  28.             Console.WriteLine("MD5:" + sw.Elapsed); 
  29.         } 

结果:

异常出现了好几次。

 八、指定并行度。

TPL的方法总会试图利用所有可用的逻辑内核来实现最好的结果,但有时候你并不希望在并行循环中使用所有的内核。比如你需要留出一个不参与并行计算
的内核,来创建能够响应用户的应用程序,而且这个内核需要帮助你运行代码中的其他部分。这个时候一种好的解决方法就是指定最大并行度。

这需要创建一个ParallelOptions的实例,设置MaxDegreeOfParallelism的值。


  1. private static void ParallelMaxDegree(int maxDegree) 
  2.         { 
  3.             var parallelOptions = new ParallelOptions(); 
  4.             parallelOptions.MaxDegreeOfParallelism = maxDegree; 
  5.  
  6.             var sw = Stopwatch.StartNew(); 
  7.             System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) => 
  8.             { 
  9.                 var aesM = new AesManaged(); 
  10.                 aesM.GenerateKey(); 
  11.                 byte[] result = aesM.Key; 
  12.                 string hexStr = ConverToHexString(result); 
  13.             }); 
  14.             Console.WriteLine("AES:" + sw.Elapsed.ToString()); 
  15.         } 

调用:如果在四核微处理器上运行,那么将使用3个内核。

ParallelMaxDegree(Environment.ProcessorCount - 1);

时间上大致慢了点(第一次Parallel.For 3.18s),但可以腾出一个内核来处理其他的事情。

小结:这次学习了Parallel相关方法以及如何退出并行循环和捕获异常、设置并行度,还有并行相关的知识。园子里也有类似的博客。但作为自己知识的管理,在这里梳理一遍。

作者:Stoneniqiu

来源:51CTO

时间: 2024-10-18 17:22:05

详细的.Net并行编程高级教程--Parallel的相关文章

C#并行编程-Parallel

原文:C#并行编程-Parallel 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正.   TPL中引入了一个新命名空间System.Threading.Tasks,在该命名空间下Task是主类,表示一个类的异步的并发的操作,创建并行代码的时候不一定要直接使用Task类,在某些情况下可以直接使用Parallel静态类(System.Threading.Tasks.Parallel)下所提供的方法,而不用底层的Task实例. Parallel.Invoke  试图将很

C#并行编程-并发集合

原文:C#并行编程-并发集合 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合. System.Collenctions

C#并行编程-线程同步原语

原文:C#并行编程-线程同步原语 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 背景 有时候必须访问变量.实例.方法.属性或者结构体,而这些并没有准备好用于并发访问,或者有时候需要执行部分代码,而这些代码必须单独运行,这是不得不通过将任务分解的方式让它们独立运行. 当任务和线程要访问共享的数据和资源的时候,您必须添加显示的同步,或者使用原子操作或锁. 之前的.NET Framework提供了昂贵的锁机制以及遗留的多线程模型,新的数据结构允许细粒度的并发和并行化,

C#并行编程-Task

原文:C#并行编程-Task 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 任务简介 TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需要编写底层复杂且重量级的线程代码. 但需要注意:任务并不是线程(任务运行的时候需要使用线程,但并不是说任务取代了线程,任务代码是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并没有一对一的关系.) 创建一个新的任务时,调度器(调度器依赖于底层的线程池

C#并行编程-相关概念

原文:C#并行编程-相关概念 菜鸟初步学习,不对的地方请大神指教,参考<C#并行编程高级教程.pdf> 背景 当今计算机至少都有一颗双核的微处理器,带有四核.八核的计算机非常常见,在单个处理器上具有多个内核的时代正在来临,现代微处理器提供了新型的多核架构,因此软件设计和编码能够充分发挥这些架构的功能是非常重要的事情,也要与时俱进. 多核微处理器 多核微处理器有很多种不同的复杂微架构,意在提供更强的并行执行能力,提升吞吐量,减少潜在的性能瓶颈,缩减电源消耗,并减少发热量,因此,现代很多的微处理器

JavaScript高级教程5.6之基本包装类型(详细)_javascript技巧

为了便于操作基本类型值,ECMAScript还提供了3个特殊的引用类型:Boolean,Number,String. 实际上,每当读取一个基本类型值的时候,后台应付创建一个对应的基本包装类型的对象,从而让我们能够调用一些方法来操作这些数据. var s1="some text"; var s2=s1.substring(2); console.log(s2);//me text 这个例子中s1包含了一个字符串,字符串是基本类型值.第二行调用了s1的subsstring()方法,并将返回

《OpenACC并行编程实战》—— 导读

前 言     2010年以来,中国超级计算机建设突飞猛进,欣欣向荣.一个原因是国力强盛,大力投资高新科技:另一个原因是整体科技水平提高,需求旺盛.天气预报.石油物探.工程仿真.基因测序等传统应用对计算资源的需求持续增长,以深度学习为代表的人工智能大爆发,资金雄厚的互联网公司对计算能力极度渴求.超级计算机的建设.应用主战场正在从教育科研单位转向科技企业. 本书特色     笔者学习超算技术时有过苦泪:教材一上来就讲技术细节,只能机械地学习,不清楚这些算法.语法要解决什么问题,花费巨大精力后却发现

DOS批处理高级教程 第一章 批处理基础_DOS/BAT

第一节 常用批处理内部命令简介 批处理定义:顾名思义,批处理文件是将一系列命令按一定的顺序集合为一个可执行的文本文件,其扩展名为BAT或者CMD.这些命令统称批处理命令.小知识:可以在键盘上按下Ctrl+C组合键来强行终止一个批处理的执行过程. 了解了大概意思后,我们正式开始学习.先看一个简单的例子! @echo off echo "欢迎来到非常BAT!" pause 把上面的3条命令保存为文件test.bat或者test.cmd然后执行,他就会在屏幕上显示以下二行话: 复制代码 代码

HTML高级教程:学习HTML 4.0事件属性

高级|教程 HTML 4.0的新特性之一是使HTML事件触发浏览器中的行为,比方说当用户点击一个HTML元素时启动一段JavaScript.以下就是可被插入HTML标签以定义事件行为的一系列属性. 假如你希望学习如何使用这些事件进行编程,那么你应该学习我们的JavaScript教程和DHTML教程. 窗口事件 (Window Events) 仅在body和frameset元素中有效. 属性 值 描述 onload 脚本 当文档被载入时执行脚本 onunload 脚本 当文档被卸下时执行脚本 表单