1、Parallel.Invoke 主要用於任務的並行 這個函數的功能和Task有些相似,就是併發執行一系列任務,然後等待所有完成。和Task比起來,省略了Task.WaitAll這一步,自然也缺少了Task的相關管理功能。它有兩種形式: Parallel.Invoke( params Action ...
1、Parallel.Invoke 主要用於任務的並行
這個函數的功能和Task有些相似,就是併發執行一系列任務,然後等待所有完成。和Task比起來,省略了Task.WaitAll這一步,自然也缺少了Task的相關管理功能。它有兩種形式:
Parallel.Invoke( params Action[] actions);
Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);
using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { var actions = new Action[] { () => ActionTest("test 1"), () => ActionTest("test 2"), () => ActionTest("test 3"), () => ActionTest("test 4") }; Console.WriteLine("Parallel.Invoke 1 Test"); Parallel.Invoke(actions); Console.WriteLine("結束!"); } static void ActionTest(object value) { Console.WriteLine(">>> thread:{0}, value:{1}", Thread.CurrentThread.ManagedThreadId, value); } } }Program
2、For方法,主要用於處理針對數組元素的並行操作(數據的並行)
using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; Parallel.For(0, nums.Length, (i) => { Console.WriteLine("針對數組索引{0}對應的那個元素{1}的一些工作代碼……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId); }); Console.ReadKey(); } } }Program
3、Foreach方法,主要用於處理泛型集合元素的並行操作(數據的並行)
using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; Parallel.ForEach(nums, (item) => { Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId); }); Console.ReadKey(); } } }Program
數據的並行的方式二(AsParallel()):
using System; using System.Collections.Generic; using System.Linq; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; var evenNumbers = nums.AsParallel().Select(item => Calculate(item)); //註意這裡是個延遲載入,也就是不用集合的時候 這個Calculate裡面的演算法 是不會去運行 可以屏蔽下麵的代碼看效果; Console.WriteLine(evenNumbers.Count()); //foreach (int item in evenNumbers) // Console.WriteLine(item); Console.ReadKey(); } static int Calculate(int number) { Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId); return number * 2; } } }Program
.AsOrdered() 對結果進行排序:
using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main(string[] args) { List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; var evenNumbers = nums.AsParallel().AsOrdered().Select(item => Calculate(item)); //註意這裡是個延遲載入,也就是不用集合的時候 這個Calculate裡面的演算法 是不會去運行 可以屏蔽下麵的代碼看效果; //Console.WriteLine(evenNumbers.Count()); foreach (int item in evenNumbers) Console.WriteLine(item); Console.ReadKey(); } static int Calculate(int number) { Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId); return number * 2; } } }Program
ForEach的獨到之處就是可以將數據進行分區,每一個小區內實現串列計算,分區採用Partitioner.Create實現。
using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { for (int j = 1; j < 4; j++) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); Parallel.ForEach(Partitioner.Create(0, 3000000), i => { for (int m = i.Item1; m < i.Item2; m++) { bag.Add(m); } }); Console.WriteLine("並行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); } } } }Program
ParallelOptions類
ParallelOptions options = new ParallelOptions();
//指定使用的硬體線程數為4
options.MaxDegreeOfParallelism = 4;
有時候我們的線程可能會跑遍所有的內核,為了提高其他應用程式的穩定性,就要限制參與的內核,正好ParallelOptions提供了MaxDegreeOfParallelism屬性。
using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; namespace ConsoleApp1 { public class Student { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime CreateTime { get; set; } } class Program { static void Main(string[] args) { var dic = LoadData(); Stopwatch watch = new Stopwatch(); watch.Start(); var query2 = (from n in dic.Values.AsParallel() where n.Age > 20 && n.Age < 25 select n).ToList(); watch.Stop(); Console.WriteLine("並行計算耗費時間:{0}", watch.ElapsedMilliseconds); Console.Read(); } public static ConcurrentDictionary<int, Student> LoadData() { ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>(); ParallelOptions options = new ParallelOptions(); //指定使用的硬體線程數為4 options.MaxDegreeOfParallelism = 4; //預載入1500w條記錄 Parallel.For(0, 15000000, options, (i) => { var single = new Student() { ID = i, Name = "hxc" + i, Age = i % 151, CreateTime = DateTime.Now.AddSeconds(i) }; dic.TryAdd(i, single); }); return dic; } } }Program
常見問題的處理
<1> 如何中途退出並行迴圈?
是的,在串列代碼中我們break一下就搞定了,但是並行就不是這麼簡單了,不過沒關係,在並行迴圈的委托參數中提供了一個ParallelLoopState,該實例提供了Break和Stop方法來幫我們實現。
Break: 當然這個是通知並行計算儘快的退出迴圈,比如並行計算正在迭代100,那麼break後程式還會迭代所有小於100的。
Stop:這個就不一樣了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。
using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 20000000, (i, state) => { if (bag.Count == 1000) { //state.Break(); state.Stop(); return; } bag.Add(i); }); Console.WriteLine("當前集合有{0}個元素。", bag.Count); } } }Program
取消(cancel)
using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { public static void Main() { var cts = new CancellationTokenSource(); var ct = cts.Token; Task.Factory.StartNew(() => fun(ct)); Console.ReadKey(); //Thread.Sleep(3000); cts.Cancel(); Console.WriteLine("任務取消了!"); } static void fun(CancellationToken token) { Parallel.For(0, 100000, new ParallelOptions { CancellationToken = token }, (i) => { Console.WriteLine("針對數組索引{0}的一些工作代碼……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId); }); } } }Program
<2> 並行計算中拋出異常怎麼處理?
首先任務是並行計算的,處理過程中可能會產生n多的異常,那麼如何來獲取到這些異常呢?普通的Exception並不能獲取到異常,然而為並行誕生的AggregateExcepation就可以獲取到一組異常。
using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { try { Parallel.Invoke(Run1, Run2); } catch (AggregateException ex) { foreach (var single in ex.InnerExceptions) { Console.WriteLine(single.Message); } } Console.WriteLine("結束了!"); //Console.Read(); } static void Run1() { Thread.Sleep(3000); throw new Exception("我是任務1拋出的異常"); } static void Run2() { Thread.Sleep(5000); throw new Exception("我是任務2拋出的異常"); } } }Program
註意Parallel裡面 不建議拋出異常 因為在極端的情況下比如進去的第一批線程先都拋異常了 此時AggregateExcepation就只能捕獲到這一批的錯誤,然後程式就結束了
using System; using System.Collections.Generic; using System.Threading.Tasks; namespace ConsoleApp1 { public class TestClass { public static List<int> NumberList = null; private static readonly object locker = new object(); public void Test(int Number) { throw new Exception("1111"); //lock (locker) //{ // if (NumberList == null) // { // Console.WriteLine("執行添加"); // NumberList = new List<int>(); // NumberList.Add(1); // //Thread.Sleep(1000); // } //} //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number)); //Console.WriteLine(Number); } } class Program { private static readonly object locker = new object(); static void Main(string[] args) { List<string> errList = new List<string>(); try { Parallel.For(0, 10, (i) => { try { TestClass a = new TestClass(); a.Test(i); } catch (Exception ex) { lock (locker) { errList.Add(ex.Message); throw ex; } } }); } catch (AggregateException ex) { foreach (var single in ex.InnerExceptions) { Console.WriteLine(single.Message); } } int Index = 1; foreach (string err in errList) { Console.WriteLine("{0}、的錯誤:{1}", Index++, err); } } } }Program
可以向下麵這樣來處理一下
不在AggregateExcepation中來處理 而是在Parallel裡面的try catch來記錄錯誤,或處理錯誤
using System; using System.Collections.Generic; using System.Threading.Tasks; namespace ConsoleApp1 { public class TestClass { public static List<int> NumberList = null; private static readonly object locker = new object(); public void Test(int Number) { throw new Exception("1111"); //lock (locker) //{ // if (NumberList == null) // { // Console.WriteLine("執行添加"); // NumberList = new List<int>(); // NumberList.Add(1); // //Thread.Sleep(1000); // } //} //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number)); //Console.WriteLine(Number); } } class Program { private static readonly object locker = new object(); static void Main(string[] args) { List<string> errList = new List<string>(); Parallel.For(0, 10, (i) => { try { TestClass a = new TestClass(); a.Test(i); } catch (Exception ex) { lock (locker) { errList.Add(ex.Message); } //Console.WriteLine(ex.Message); //註:這裡不再將錯誤拋出..... //throw ex; } }); int Index = 1; foreach (string err in errList) { Console.WriteLine("{0}、的錯誤:{1}", Index++, err); } } } }Program