如果程式中有大量的計算任務,並且這些任務能分割成幾個互相獨立的任務塊,那就應該使用並行編程。 並行編程用於分解計算密集型的任務片段,並將它們分配給多個線程。這些並行處理方法只適用於計算密集型的任務。 一 數據的並行處理 如果有一批數據,需要對每個數據進行相同的操作,其操作是計算密集型的,需要耗費一定 ...
如果程式中有大量的計算任務,並且這些任務能分割成幾個互相獨立的任務塊,那就應該使用並行編程。
並行編程用於分解計算密集型的任務片段,並將它們分配給多個線程。這些並行處理方法只適用於計算密集型的任務。
一 數據的並行處理
如果有一批數據,需要對每個數據進行相同的操作,其操作是計算密集型的,需要耗費一定的時間。
Parallel 類型有 ForEach 方法可以解決上述問題。
下例使用了一批矩陣,對每一個矩陣都進行旋轉,Matrix類的Rotate方法是計算密集型的任務。
void RotateMatrices(IEnumerable<Matrix> matrices, float degrees) { Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees)); }
在某些情況下需要儘早結束這個迴圈,例如發現了無效值時。下例反轉每一個矩陣,但是如果發現有無效的矩陣,則中斷迴圈:
void InvertMatrices(IEnumerable<Matrix> matrices) { Parallel.ForEach(matrices, (matrix, state) => { if (!matrix.IsInvertible) state.Stop(); else matrix.Invert(); }); }
更常見的情況是可以取消並行迴圈,這與結束迴圈不同。結束(stop)迴圈是在迴圈內部進行,
而取消(cancel)迴圈是在迴圈外部進行的。例如,點擊“取消”按鈕可以取消一個 CancellationTokenSource,以取消並行迴圈,如下:
void RotateMatrices(IEnumerable<Matrix> matrices, float degrees,CancellationToken token) { Parallel.ForEach(matrices,new ParallelOptions { CancellationToken = token }, matrix => matrix.Rotate(degrees)); }
註意,每個並行任務可能都在不同的線程中運行,因此必須保護對共用的狀態。
二 並行聚合
使用Parallel,在並行操作結束時,可以根據需要聚合結果,包括累加和、平均值等。Parallel 類通過局部值(local value)的概念來實現聚合,局部值就是只在並行迴圈內部存在的變數。
這意味著迴圈體中的代碼可以直接訪問值,不需要擔心同步問題。
迴圈中的代碼使用 LocalFinally 委托來對每個局部值進行聚合。
需要註意的是,localFinally 委托需要以同步的方式對存放結果的變數進行訪問。
下麵是一個並行求累加和的例子:
//註意,這不是最高效的實現方式,只是舉個例子,說明用鎖來保護共用狀態。 static int ParallelSum(IEnumerable<int> values) { object mutex = new object(); int result = 0; Parallel.ForEach( source: values, localInit: () => 0, body: (item, state, localValue) => localValue + item, localFinally: localValue => { lock (mutex) result += localValue; } ); return result; }
並行 LINQ 對聚合的支持,比 Parallel 類更加易用:
static int ParallelSum(IEnumerable<int> values) { return values.AsParallel().Sum(); }
PLINQ 本身支持很多常規操作(例如求累加和)。大多數情況下PLINQ 對聚合的支持更有表現力,代碼也更少。
PLINQ也可通過 Aggregate 實現通用的聚合功能:
static int ParallelSum(IEnumerable<int> values) { return values.AsParallel().Aggregate( seed: 0, func: (sum, item) => sum + item ); }
三 並行調用
如果需要並行調用一批方法,並且這些方法(大部分)是互相獨立的。
Parallel 類有一個簡單的成員 Invoke,可用於這種場合。下麵的例子將一個數組分為兩半,並且分別獨立處理:
static void ProcessArray(double[] array) { Parallel.Invoke( () => ProcessPartialArray(array, 0, array.Length / 2), () => ProcessPartialArray(array, array.Length / 2, array.Length) ); } static void ProcessPartialArray(double[] array, int begin, int end) { // 計算密集型的處理過程 ... }
如果在運行之前都無法確定調用的方法數量,就可以在 Parallel.Invoke 函數中輸入一個委托數組,Parallel.Invoke 也支持取消操作:
static void DoAction20Times(Action action, CancellationToken token) { Action[] actions = Enumerable.Repeat(action, 20).ToArray(); Parallel.Invoke(new ParallelOptions { CancellationToken = token }, actions); }
對於簡單的並行調用,Parallel.Invoke 是一個非常不錯的解決方案。
但在以下兩種情況中使用 Parallel.Invoke 並不是很合適:
要對每一個輸入的數據調用一個操作(改用Parallel.Foreach),或者每一個操作產生了一些輸出(改用並行 LINQ)。
四 並行LINQ
LINQ 可以實現在序列上”拉取“數據的運算。並行LINQ(PLINQ)擴展了 LINQ,以支持並行處理。
PLINQ 非常適用於數據流的操作,一個數據隊列作為輸入,一個數據隊列作為輸出。
下麵簡單的例子將序列中的每個元素都乘以2:
static IEnumerable<int> MultiplyBy2(IEnumerable<int> values) { return values.AsParallel().Select(item => item * 2); //實際應用中,計算工作量要大得多 }
按照並行 LINQ 的預設方式,這個例子中輸出數據隊列的次序是不固定的。
我們可以指明要求保持原來的次序。下麵的例子也是並行執行的,但保留了數據的原有次序:
static IEnumerable<int> MultiplyBy2(IEnumerable<int> values) { return values.AsParallel().AsOrdered().Select(item => item * 2); }
Parallel 類可適用於很多場合,但是在做聚合或進行數據序列的轉換時,PLINQ 的代碼更加簡潔。
PLINQ 為各種各樣的操作提供了並行的版本,包括過濾(Where)、投影(Select)以及各種聚合運算,
例如 Sum、Average 和更通用的 Aggregate。一般來說,對常規 LINQ 的所有操作都可以通過並行方式對 PLINQ 執行。
以上。