並行編程(Parallel Framework)

来源:https://www.cnblogs.com/jonins/archive/2018/09/22/9558276.html
-Advertisement-
Play Games

前言 並行編程:通過編碼方式利用多核或多處理器稱為並行編程,多線程概念的一個子集。 並行處理:把正在執行的大量的任務分割成小塊,分配給多個同時運行的線程。多線程的一種。 並行編程分為如下幾個結構: 1.並行的LINQ或PLINQ 2.Parallel類 3.任務並行結構 4.併發集合 5.SpinL ...


前言

並行編程:通過編碼方式利用多核或多處理器稱為並行編程,多線程概念的一個子集。

並行處理:把正在執行的大量的任務分割成小塊,分配給多個同時運行的線程。多線程的一種。

並行編程分為如下幾個結構

1.並行的LINQPLINQ

2.Parallel

3.任務並行結構

4.併發集合

5.SpinLockSpinWait

這些是.NET 4.0引入的功能,一般被稱為PFX(Parallel Framework,並行框架)

Parallel類和任務並行結構稱為TPL(Task Parallel Library,任務並行庫)

 

並行框架(PFX)

1.並行框架基礎

當前CPU技術達到瓶頸,而製造商將關註重點轉移到提高內核技術上,而標準單線程代碼並不會因此而自動提高運行速度。
利用多核提升程式性能通常需要對計算密集型代碼進行一些處理:
1.將代碼劃分成塊。
2.通過多線程並行執行這些代碼塊。
3.結果變為可用後,以線程安全和高性能的方式整合這些結果。
傳統多線程結構雖然實現功能,但難度頗高且不方便,特別是劃分和整理的步驟(本質問題是:多線程同時使用相同數據時,出於線程安全考慮進行鎖定的常用策略會引發大量競爭)。
並行框架(Parallel Framework)專門用於在這些應用場景中提供幫助。

2.並行框架組成

PFX:高層由兩個數據並行API組成:PLINQ或Parallel類。底層包含任務並行類和一組另外的結構為並行編程提供幫助。

 

基礎並行語言集成查詢(PLINQ)

語言集成查詢(Language Integrated Query,LINQ)提供了一個簡捷的語法來查詢數據集合。而這種由一個線程順序處理數據集合的方式我們稱為順序查詢(sequential query)

並行語言集成查詢(Parallel LINQ)LINQ並行版。它將順序查詢轉換為並行查詢,在內部使用任務,將集合中數據項的處理工作分散到多個CPU上,以併發處理多個數據項。

PLINQ將自動並行化本地的LINQ查詢System.Linq.ParallelEnumerable類(它定義在System.Core.dll中,需要引用System.Linq)公開了所有標準LINQ操作符的並行版本。這些所有方法是依據System.Linq.ParallelQuery<T>擴展而來。

1.LINQ to PLINQ

要讓LINQ查詢調用並行版本,必須將自己的順序查詢(基於IEnumerable或IEnumerable<T>)轉換成並行查詢(基於ParallelQuery或ParallelQuery<T>),使用ParallelEnumerableAsParallel方法實現,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             ParallelQuery parallelQuery =
 7                 from n in numbers.AsParallel()//轉換為並行
 8                 where n > 3
 9                 select n;
10             foreach (var item in parallelQuery)
11             {
12                 Console.WriteLine(item);
13             }
14             Console.ReadKey();
15         }
16     }

結果如下:使用Enumerable.Range生成的集合是順序的,但是經過並行查詢後順序被打亂。

2.PLINQ to LINQ

 將執行並行查詢的操作切換回執行順序查詢(並不常用),通過ParalleIEnumerableAsSequential實現。此時操作只由一個線程執行。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             IEnumerable<int> enumerable = numbers.AsParallel().AsSequential().Where(c => c > 3);
 7             foreach (var item in enumerable)
 8             {
 9                 Console.WriteLine(item);
10             }
11             Console.ReadKey();
12         }
13     }

3.整合結果集(ForAll)

通常,一個LINQ查詢的結果數據是讓某個線程執行一個foreach來處理,此時只有一個線程遍歷查詢的所有結果,如果希望以並行方式處理查詢結果,通過ParalleIEnumerableForAll方法處理查詢,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             (from n in numbers.AsParallel() where n > 3 select n).ForAll((d) =>
 7              {
 8                  d = d + 1000;
 9                  Console.WriteLine(d);//Console在此回損害性能,因為內部回對線程進行同步,此處因演示所以暫且一用
10              });
11             Console.ReadKey();
12         }
13     }

 執行結果如下:

 

解析PLINQ

1.PLINQ執行模型

如圖所示:

2.異常處理

PLINQ的報錯將以AggregateException形式被重拋,其中InnerExceeptions屬性包含一個或多個真正異常,示例可看 非同步編程(async&await)內的異常處理部分。

3.PLINQ結果的排序

 並行化查詢當整理結果時不能保持初始化數據的原始次序。如果要保持序列的原始序列,可以通過在AsParallel之後調用AsOrdered來強制實現:

1             IEnumerable<int> numbers = Enumerable.Range(1, 10000);
2             var enumerable = numbers.AsParallel().Where(c => c > 3);

調用AsOrdered時,因為PLINQ要保持跟蹤每個元素的原始位置,會導致性能損失。

調用AsUnordered,可以在後續的查詢中低效AsOrdered產生的副作用,允許查詢從調用AsUnordered時起更高效的執行。

4.PLINQ存在的局限與限制

1.若要使PLINQ發揮作用,必須具有一定數量的計算密集型工作可分配給工作者線程。大多數的LINQ to Objects查詢執行速度很快,不僅沒有必要並行化,而且劃分、整理和協調額外線程的開銷實際上會降低執行速度。而且查詢若調用了非線程安全的方法,PLINQ的結果有可能不正確。

2.PLINQ能夠並行化的內容還有些限制,以下查詢運算符防止查詢被並行化,除非源元素位於他們的元素索引位置:Take、TakeWhile、Skip和SkipWhileSelect、SelectMany和ElementAt的索引版本。

3.以下查詢運算符是並行化的,但所使用的複雜劃分策略有時可能比順序處理的速度還要低:Join、GroupBy、GroupJonin、Distinct、Union、Intersect和Except。

5.PLINQ的結果

和普通LINQ查詢一樣,PLINQ查詢也是延遲求值的。意味著執行只在開始使用時觸發。但是列舉結果集時和普通順序查詢有區別:

順序查詢:完全由使用者從輸入序列中“拉取”每個元素。

並行查詢:通常使用獨立線程來獲取序列中的元素,時間上比使用者需要它們時要提前,再通過查詢鏈並行處理元素後將結果保存在一塊緩存中,以便使用者按需取用。

註意:過早暫停結果列舉,查詢處理器也會暫停或結束,目的是不浪費CPU的時間或記憶體。在調用AsParallel之後調用WithMergeOptions可以調節PLINQ的緩衝行為。

6.如何使用PLINQ

為何優化將LINQ都並行化是不可取的,因為LINQ能解決大多數問題,執行速度也很快,因此無法從並行化中收益。

一種更好的方式是找出CPU密集的瓶頸,然後考慮通過LINQ的形式表達(這類重構,LINQ往往會使代碼量變少,而且增強可讀性)。

PLINQ十分適用於易並行問題。他還可以很好地處理結構化的阻塞任務。

PLINQ不適於鏡像製作,因為將數百萬元素整理為一個輸出序列將帶來瓶頸,相反將元素寫入一個數組或托管記憶體塊中,然後使用Parallel類或任務並行管理多線程是更好的選擇。

 

Parallel類

Parallel類是對線程的一個很好的抽象。該類位於System.Threading.Tasks命名空間中,提供了數據和任務並行性

PFX通過Parallel類中的三個靜態方法,提供了一種基本形式的結構化並行機制:

1.Parallel.Invoke

Parallel.Invoke用於並行執行一組委托,示例如下:

1         static void Main(string[] args)
2         {
3             Parallel.Invoke(
4                 () => Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId}"),
5                 () => Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId}")
6                 );
7             Console.ReadKey();
8         }

執行結果

Parallel.Invoke方法並行執行一組Action委托,然後等待它們完成。

1 public static void Invoke(params Action[] actions);

示例看起來像是創建和等待兩個Task對象的一種捷徑。但兩者存在重要的區別:
如果傳入一個包含數據量非常大的委托數組時,Parallel.Invoke方法仍然能高效工作,這是因為在底層,Parallel.Invoke方法是將大量元素劃分成較小的塊,分配給底層的Task執行,而不是每個委托創建一個獨立Task

2.Parallel.For

Parallel.For執行C# for迴圈的並行化等價迴圈,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //順序迴圈
 6             {
 7                 for (int i = 0; i < 10; i++)
 8                 {
 9                     Test(i);
10                 }
11             }
12             Console.WriteLine("並行化for開始");
13             //順序執行轉換為並行化
14             {
15                 Parallel.For(0, 10, i => Test(i));
16             }
17             //順序執行轉換為並行化(更簡單的方式)
18             {
19                 Parallel.For(0, 10, Test);
20             }
21             Console.ReadKey();
22         }
23         static void Test(int i)
24         {
25             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{i}");
26         }
27     }

結果如下:

3.Parallel.ForEach

Parallel.ForEach執行C# foreach迴圈的並行化等價迴圈,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             //順序迴圈
 7             {
 8                 foreach (string num in data)
 9                 {
10                     Test(num);
11                 }
12             }
13             Console.WriteLine("並行化foreach開始");
14             //順序執行轉換為並行化
15             {
16                 Parallel.ForEach(data, num => Test(num));
17             }
18             Console.ReadKey();
19         }
20         static void Test(string str)
21         {
22             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
23         }
24     }

執行結果:

註意:以上三個方法都會引發阻塞直到所有工作完成為止。和PLINQ一樣,在出現未處理的異常之後,餘下的工作者在它們當前的迭代之後停止,而一場將被拋回給調用者,並封裝在一個AggregateException中。

4.索引&跳出(ParallelLoopState)

有時迭代索引很有用處,但是切忌不可像順序迴圈的用法使用共用變數(迴圈內i++)的方式使用,因為共用變數值在並行上下文中是線程不安全的

同樣的,因為並行ForForEach中的迴圈體是一個委托,所以無法使用break語句提前退出迴圈,必須調用ParallelLoopState對象上的BreakStop方法。

ForEach為例,ForEach重載的其中之一如下,它包含Acton的其中有三個參數(TSourec=子元素,ParallelLoopState=並行迴圈狀態,long=索引):

1 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)

所以,想要得到索引和提前跳出的正確方式如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.ForEach(data, (num, state, i) =>
 7             {
 8                 Console.WriteLine($"當前索引為:{i},狀態為:{state}");
 9                 Test(num);
10                 if (num == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
18         }
19     }

結果如下:

For的版本如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.For(0, data.Length, (i, state) =>
 7             {
 8                 Console.WriteLine($"當前索引為:{i},狀態為:{state}");
 9                 Test(data[i]);
10                 if (data[i] == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
18         }
19     }

 

任務並行

對於任務並行的內容,請戳 任務(Task)非同步編程(async&await)

 

併發集合概述

.NET 4.0在System.Collections.Concurrent命名空間中提供了一組新的集合。所有這些集合都完全是線程安全的:

這些集合不僅是為使用帶鎖的普通集合提供了快捷方式,而且可以在一般的多線程中使用併發集合,但需要註意
1.併發集合針對並行編程進行了調整。只有在高度併發的應用場景中,傳統集合的性能才能勝過它們

2.線程安全的集合不能確保使用它的代碼也是安全的

3.如果枚舉一個併發集合的同時,另一個線程要修改它,不會拋出任何異常,相反,得到舊內容與新內容的混合。

4.不存在任何List<T>的併發版本。

5.它們的記憶體利用率沒有非併發的Stack和Queue類高效,但對於併發訪問的效果更好。

1.結構概述

這些併發集合與傳統集合的區別是:它們公開了特殊方法來執行原子測試和行動操作,而這些方法都是通過IProducerConsumerCollection<T>介面提供的。

IProducerConsumerCollection<T>介面代表一個線程安全的生產者/消費者集合,這三個類繼承並實現了IProducerConsumerCollection<T>介面:

ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

它們實現的TryAddTryTake方法用於測試一個添加/刪除操作能否執行,如果可以,則執行添加/刪除操作。測試與行動不需要對傳統集合上鎖。

ConcurrentBag<T>用於保存對象的無需集合,適用於調用Take或TryTake時不關心獲取那個元素的額情況。

相對於併發隊列或堆棧,在多線程同時調用一個ConcurrentBag的Add時,不存在競爭,但隊列或堆棧並行調用Add會引起一些競爭,所以ConcurrentBag上調用Take方法非常高效。

BlockingCollection<T>類似阻塞集合,適用於等待新元素的出現,可以把它看作一個容器,使用一個阻塞集合封裝所有實現IProducerConsumerCollection<T>的集合,並且允許從封裝的集合中去除元素,若沒有元素,操作會阻塞

2.基礎方法

常用的一些方法,整理自 zy__ :

ConcurrentQueue:完全無鎖,但面臨資源競爭失敗時可能會陷入自旋並重試操作。

Enqueue:在隊尾插入元素

TryDequeue:嘗試刪除隊頭元素,並通過out參數返回

TryPeek:嘗試將對頭元素通過out參數返回,但不刪除該元素。

ConcurrentStack:完全無鎖,但面臨資源競爭失敗時可能會陷入自旋並重試操作。

Push:向棧頂插入元素

TryPop:從棧頂彈出元素,並且通過out 參數返回

TryPeek:返回棧頂元素,但不彈出。

ConcurrentBag:一個無序的集合,程式可以向其中插入元素,或刪除元素。在同一個線程中向集合插入,刪除元素的效率很高。

Add:向集合中插入元素 

TryTake:從集合中取出元素並刪除 

TryPeek:從集合中取出元素,但不刪除該元素。

BlockingCollection:一個支持界限和阻塞的容器

Add :向容器中插入元素

TryTake:從容器中取出元素並刪除

TryPeek:從容器中取出元素,但不刪除。

CompleteAdding:告訴容器,添加元素完成。此時如果還想繼續添加會發生異常。

IsCompleted:告訴消費線程,生產者線程還在繼續運行中,任務還未完成。

ConcurrentDictionary對於讀操作是完全無鎖的,當很多線程要修改數據時,它會使用細粒度的鎖。

AddOrUpdate:如果鍵不存在,方法會在容器中添加新的鍵和值,如果存在,則更新現有的鍵和值。

GetOrAdd:如果鍵不存在,方法會向容器中添加新的鍵和值,如果存在則返回現有的值,並不添加新值。
TryAdd:嘗試在容器中添加新的鍵和值。

TryGetValue:嘗試根據指定的鍵獲得值。

TryRemove:嘗試刪除指定的鍵。

TryUpdate:有條件的更新當前鍵所對應的值。

GetEnumerator:返回一個能夠遍歷整個容器的枚舉器。

 

結語

根據ConcurrentBag編寫線程安全的生產者消費者請戳:這裡 。

說實在的寫這篇文章挺煩的,主要涉及的知識點太多講的太細篇幅會很長況且我自己有些也還沒用過,所以是概述性文章,對PFX有個基本的認識,當需要具體深入使用某些知識時再查詢相關文檔。

關於 併發編程(Concurrent programming)更新到這裡基本已經完結,謝謝大家的支持

因個人的興趣,所以準備沉澱下來專攻 數據結構和演算法,然後研究 人工智慧(Microsoft的人工智慧平臺Windows ML不會涉及,選擇研究Google的第二代人工智慧學習系統TensorFlow )。

接下來會對LinuxPython進行基礎的學習並更新文章。

但是最核心的還是數據結構&演算法使用那種編程語言並不重要

感興趣的朋友可以關註。

 

參考文獻

CLR via C#(第4版) Jeffrey Richter

C#高級編程(第10版) C# 6 & .NET Core 1.0   Christian Nagel  

果殼中的C# C#5.0權威指南  Joseph Albahari

...


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言:項目中經常要用到Maven,從來也沒有配置過,直到當人問到Maven是乾什麼的,是怎麼管理項目的?一頭霧水,所以寫了這篇博客,首先附上百度百科的詞條: Maven項目對象模型(POM),可以通過一小段描述信息來管理項目的構建,報告和文檔的軟體項目管理工具。 一、Maven的下載環境變數配置 下 ...
  • 題意 題目鏈接 Sol 如果給出的樹是鏈的話顯然就是LIS 不是鏈的時候直接當鏈做,每個節點維護一個multiset表示計算LIS過程中的單調棧 啟髮式合併即可 時間複雜度:$O(nlog^2n)$ ...
  • 明白生產環境中的jvm參數 寫代碼的時候,程式寫完了,發到線上去運行,跑一段時間後,程式變慢了,cpu負載高了……一堆問題出來了,所以瞭解一下生產環境的機器上的jvm配置是有必要的。比如說: JDK版本是多少?採用何種垃圾回收器? 程式啟動的時候預設分配堆記憶體空間是多少?隨著程式的運行,程式最多能使 ...
  • 每個程式員、或者說每個工作者都應該有自己的職業規劃,如果你不是富二代,不是官二代,也沒有職業規劃,希望你可以思考一下自己的將來。今天給大家分享的是一篇來自阿裡Java架構師對普通程式員的職業建議,希望對你有啟發。 普通程式員,三年成為年薪70w架構師,只因做到了這些 ...
  • 1.分析 上傳文件的過程:客服端選擇一個文件後,寫入到伺服器端,伺服器端使用一個目錄來存儲該文件--底層IO流操作 2.jsp文件上的表單設計 表單傳輸格式用multipart/form-data,要上傳的文件input標簽name屬性最好用同樣的首碼或者尾碼好獲取 3後臺Servlet處理 1.S ...
  • 題意 題目鏈接 題意:給出一張無向圖,每次詢問兩點之間的最短路,滿足$m - n <= 20$ $n, m, q \leqslant 10^5$ Sol 非常好的一道題。 首先建出一個dfs樹。 因為邊數-點數非常少,所以我們可以對於某些非樹邊特殊考慮。 具體做法是:對於非樹邊連接的兩個點,暴力求出 ...
  • 分散式架構有以下幾點普適性的共性需求: 1. 提供集群的集中化的配置管理功能,可以不重啟就讓新的配置參數生效,類似與配置中心 2. 簡單可靠的集群節點動態發現機制,便於動態發現服務,動態擴展節點 3. 簡單可靠的leader選舉機制 4. 提供分散式鎖 zookeeper的數據結構整體上可以看作一顆 ...
  • 在前面大致預覽了常用變數的結構之後,我們今天來仔細的剖析一下字元串的具體實現。 一、字元串的結構 zend_refcounted_h對應的結構體: 下麵我們來瞭解一下具體每個成員的作用: gc:就是_zend_refcounted_h結構體,主要作用是引用計數以及標記變數的類別。 h:字元串的哈希值 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...