數據流(任務並行庫 TPL)

来源:http://www.cnblogs.com/yswenli/archive/2017/12/15/8042594.html
-Advertisement-
Play Games

TPL 數據流庫向具有高吞吐量和低滯後時間的占用大量 CPU 和 I/O 操作的應用程式的並行化和消息傳遞提供了基礎。 它還能顯式控制緩存數據的方式以及在系統中移動的方式。 為了更好地瞭解數據流編程模型,請考慮一個以非同步方式從磁碟載入圖像並創建複合圖像的應用程式。 傳統編程模型通常需要使用回調和同步 ...


        TPL 數據流庫向具有高吞吐量和低滯後時間的占用大量 CPU 和 I/O 操作的應用程式的並行化和消息傳遞提供了基礎。 它還能顯式控制緩存數據的方式以及在系統中移動的方式。 為了更好地瞭解數據流編程模型,請考慮一個以非同步方式從磁碟載入圖像並創建複合圖像的應用程式。 傳統編程模型通常需要使用回調和同步對象(例如鎖)來協調任務和訪問共用數據。 通過使用數據流編程模型,您可以從磁碟讀取時創建處理圖像的數據流對象。 在數據流模型下,您可以聲明當數據可用時的處理方式,以及數據之間的所有依賴項。 由於運行時管理數據之間的依賴項,因此通常可以避免這種要求來同步訪問共用數據。 此外,因為運行時計劃基於數據的非同步到達,所以數據流可以通過有效管理基礎線程提高響應能力和吞吐量。

        System.Threading.Tasks.Dataflow 命名空間提供基於角色的編程模型,用以支持粗粒度數據流和流水線操作任務的進程內消息傳遞。TDP的主要作用就是Buffering Data和Processing Data,在TDF中,有兩個非常重要的介面,ISourceBlock<T> 和ITargetBlock<T>介面。繼承於ISourceBlock<T>的對象時作為提供數據的數據源對象-生產者,而繼承於ITargetBlock<T>介面類主要是扮演目標對象-消費者。在這個類庫中,System.Threading.Tasks.Dataflow名稱空間下,提供了很多以Block名字結尾的類,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9個Block,我們在開發中通常使用單個或多個Block組合的方式來實現一些功能,以下逐個來簡單介紹一下。

BufferBlock

        BufferBlock是TDF中最基礎的Block。BufferBlock提供了一個有界限或沒有界限的Buffer,該Buffer中存儲T。該Block很像BlockingCollection<T>。可以用過Post往裡面添加數據,也可以通過Receive方法阻塞或非同步的的獲取數據,數據處理的順序是FIFO的。它也可以通過Link向其他Block輸出數據。

image

 

 

簡單的同步的生產者消費者代碼示例:

複製代碼
private static BufferBlock<int> m_buffer = new BufferBlock<int>();

// Producer
private static void Producer()
{
    while(true)
    {
        int item = Produce();
        m_buffer.Post(item);
    }
}

// Consumer
private static void Consumer()
{
    while(true)
    {
        int item = m_buffer.Receive();
        Process(item);
    }
}

// Main
public static void Main()
{
    var p = Task.Factory.StartNew(Producer);
    var c = Task.Factory.StartNew(Consumer);
    Task.WaitAll(p,c);
}
複製代碼

 

ActionBlock

 

        ActionBlock實現ITargetBlock,說明它是消費數據的,也就是對輸入的一些數據進行處理。它在構造函數中,允許輸入一個委托,來對每一個進來的數據進行一些操作。如果使用Action(T)委托,那說明每一個數據的處理完成需要等待這個委托方法結束,如果使用了Func<TInput, Task>)來構造的話,那麼數據的結束將不是委托的返回,而是Task的結束。預設情況下,ActionBlock會FIFO的處理每一個數據,而且一次只能處理一個數據,一個處理完了再處理第二個,但也可以通過配置來並行的執行多個數據。

 

image

先看一個例子:

複製代碼
public ActionBlock<int> abSync = new ActionBlock<int>((i) =>
            {
                Thread.Sleep(1000);
                Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
            }
        );

        public void TestSync()
        {
            for (int i = 0; i < 10; i++)
            {
                abSync.Post(i);
            }

            Console.WriteLine("Post finished");
        }
複製代碼

 

6{%7WGLQU90CW4[(OF)H6DC

 

可見,ActionBlock是順序處理數據的,這也是ActionBlock一大特性之一。主線程在往ActionBlock中Post數據以後馬上返回,具體數據的處理是另外一個線程來做的。數據是非同步處理的,但處理本身是同步的,這樣在一定程度上保證數據處理的準確性。下麵的例子是使用async和await。

public ActionBlock<int> abSync2 = new ActionBlock<int>(async (i) =>
        {
            await Task.Delay(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }

U55C4LS4`0SY0O)}[5W]{%C

雖然還是1秒鐘處理一個數據,但是處理數據的線程會有不同。

 

如果你想非同步處理多個消息的話,ActionBlock也提供了一些介面,讓你輕鬆實現。在ActionBlock的構造函數中,可以提供一個ExecutionDataflowBlockOptions的類型,讓你定義ActionBlock的執行選項,在下麵了例子中,我們定義了MaxDegreeOfParallelism選項,設置為3。目的的讓ActionBlock中的Item最多可以3個並行處理。

複製代碼
public ActionBlock<int> abAsync = new ActionBlock<int>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }
        , new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 3 });

public void TestAsync()
        {
            for (int i = 0; i < 10; i++)
            {
                abAsync.Post(i);
            }
            Console.WriteLine("Post finished");
        }
複製代碼

XVGW}JJK7YY7(%E}])11J7V

 

 

運行程式,我們看見,每3個數據幾乎同時處理,並且他們的線程ID也是不一樣的。

 

 

ActionBlock也有自己的生命周期,所有繼承IDataflowBlock的類型都有Completion屬性和Complete方法。調用Complete方法是讓ActionBlock停止接收數據,而Completion屬性則是一個Task,是在ActionBlock處理完所有數據時候會執行的任務,我們可以使用Completion.Wait()方法來等待ActionBlock完成所有的任務,Completion屬性只有在設置了Complete方法後才會有效。

 

 

複製代碼
public void TestAsync()
        {
            for (int i = 0; i < 10; i++)
            {
                abAsync.Post(i);
            }
            abAsync.Complete();
            Console.WriteLine("Post finished");
            abAsync.Completion.Wait();
            Console.WriteLine("Process finished");
        }
複製代碼

 

 

$WSKZ$6M1`[J7T_W@~Y~WZ3

 

TransformBlock

TransformBlock是TDF提供的另一種Block,顧名思義它常常在數據流中充當數據轉換處理的功能。在TransformBlock內部維護了2個Queue,一個InputQueue,一個OutputQueue。InputQueue存儲輸入的數據,而通過Transform處理以後的數據則放在OutputQueue,OutputQueue就好像是一個BufferBlock。最終我們可以通過Receive方法來阻塞的一個一個獲取OutputQueue中的數據。TransformBlock的Completion.Wait()方法只有在OutputQueue中的數據為0的時候才會返回。

image

舉個例子,我們有一組網址的URL,我們需要對每個URL下載它的HTML數據並存儲。那我們通過如下的代碼來完成:

複製代碼
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) =>
        {
            WebClient webClient = new WebClient();
            return webClient.DownloadString(new Uri(url));
        }

        public void TestDownloadHTML()
        {
            tbUrl.Post("www.baidu.com");
            tbUrl.Post("www.sina.com.cn");

            string baiduHTML = tbUrl.Receive();
            string sinaHTML = tbUrl.Receive();
        }
複製代碼

當然,Post操作和Receive操作可以在不同的線程中進行,Receive操作同樣也是阻塞操作,在OutputQueue中有可用的數據時,才會返回。

 

TransformManyBlock

TransformManyBlock和TransformBlock非常類似,關鍵的不同點是,TransformBlock對應於一個輸入數據只有一個輸出數據,而TransformManyBlock可以有多個,及可以從InputQueue中取一個數據出來,然後放多個數據放入到OutputQueue中。

image

 

複製代碼
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] { i, i + 1 }; });

        ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i));

        public void TestSync()
        {
            tmb.LinkTo(ab);

            for (int i = 0; i < 4; i++)
            {
                tmb.Post(i);
            }

            Console.WriteLine("Finished post");
        }
複製代碼

GC(K]J4DB4UKP$S@8C9ZVMV

 

BroadcastBlock

BroadcastBlock的作用不像BufferBlock,它是使命是讓所有和它相聯的目標Block都收到數據的副本,這點從它的命名上面就可以看出來了。還有一點不同的是,BroadcastBlock並不保存數據,在每一個數據被髮送到所有接收者以後,這條數據就會被後面最新的一條數據所覆蓋。如沒有目標Block和BroadcastBlock相連的話,數據將被丟棄。但BroadcastBlock總會保存最後一個數據,不管這個數據是不是被髮出去過,如果有一個新的目標Block連上來,那麼這個Block將收到這個最後一個數據。

image

複製代碼
        BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });

        ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));

        ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));

        ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));

        public void TestSync()
        {
            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);

            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Console.WriteLine("Post finished");
        }
複製代碼

A][PVWN1@4UMGZ[YTEV$[E9

 

如果我們在Post以後再添加連接Block的話,那些Block就只會收到最後一個數據了。

複製代碼
public void TestSync()
        {
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Thread.Sleep(5000);

            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);
            Console.WriteLine("Post finished");
        }
複製代碼

 

AC}VT(NM__HO1@UJ948)$@W

 

WriteOnceBlock

如果說BufferBlock是最基本的Block,那麼WriteOnceBock則是最最簡單的Block。它最多只能存儲一個數據,一旦這個數據被髮送出去以後,這個數據還是會留在Block中,但不會被刪除或被新來的數據替換,同樣所有的接收者都會收到這個數據的備份。

image

和BroadcastBlock同樣的代碼,但是結果不一樣:

複製代碼
WriteOnceBlock<int> bb = new WriteOnceBlock<int>((i) => { return i; });

        ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));

        ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));

        ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));

        public void TestSync()
        {
            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Console.WriteLine("Post finished");
        }
複製代碼

@2[203}OL`G6VH2K}}9}DNE

WriteOnceBock只會接收一次數據。而且始終保留那個數據。

同樣使用Receive方法來獲取數據也是一樣的結果,獲取到的都是第一個數據:

複製代碼
public void TestReceive()
        {
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }
            Console.WriteLine("Post finished");

            Console.WriteLine("1st Receive:" + bb.Receive());
            Console.WriteLine("2nd Receive:" + bb.Receive());
            Console.WriteLine("3rd Receive:" + bb.Receive());
        }
複製代碼

7M5Q]MH5K82OVQ}N]E(J8MV

 

 BatchBlock

 

 image

BatchBlock提供了能夠把多個單個的數據組合起來處理的功能,如上圖。應對有些需求需要固定多個數據才能處理的問題。在構造函數中需要制定多少個為一個Batch,一旦它收到了那個數量的數據後,會打包放在它的OutputQueue中。當BatchBlock被調用Complete告知Post數據結束的時候,會把InputQueue中餘下的數據打包放入OutputQueue中等待處理,而不管InputQueue中的數據量是不是滿足構造函數的數量。

複製代碼
        BatchBlock<int> bb = new BatchBlock<int>(3);

        ActionBlock<int[]> ab = new ActionBlock<int[]>((i) => 
            {
                string s = string.Empty;

                foreach (int m in i)
                {
                    s += m + " ";
                }
                Console.WriteLine(s);
            });

        public void TestSync()
        {
            bb.LinkTo(ab);

            for (int i = 0; i < 10; i++)
            {
                bb.Post(i);
            }
            bb.Complete();

            Console.WriteLine("Finished post");
        }
複製代碼

@D__1{B5V72~T7`AGM74D_0

BatchBlock執行數據有兩種模式:貪婪模式和非貪婪模式。貪婪模式是預設的。貪婪模式是指任何Post到BatchBlock,BatchBlock都接收,並等待個數滿了以後處理。非貪婪模式是指BatchBlock需要等到構造函數中設置的BatchSize個數的Source都向BatchBlock發數據,Post數據的時候才會處理。不然都會留在Source的Queue中。也就是說BatchBlock可以使用在每次從N個Source那個收一個數據打包處理或從1個Source那裡收N個數據打包處理。這裡的Source是指其他的繼承ISourceBlock的,用LinkTo連接到這個BatchBlock的Block。

在另一個構造參數中GroupingDataflowBlockOptions,可以通過設置Greedy屬性來選擇是否貪婪模式和MaxNumberOfGroups來設置最大產生Batch的數量,如果到達了這個數量,BatchBlock將不會再接收數據。

 

JoinBlock

image

JoinBlock一看名字就知道是需要和兩個或兩個以上的Source Block相連接的。它的作用就是等待一個數據組合,這個組合需要的數據都到達了,它才會處理數據,並把這個組合作為一個Tuple傳遞給目標Block。舉個例子,如果定義了JoinBlock<int, string>類型,那麼JoinBlock內部會有兩個ITargetBlock,一個接收int類型的數據,一個接收string類型的數據。那隻有當兩個ITargetBlock都收到各自的數據後,才會放到JoinBlock的OutputQueue中,輸出。

 

 

 

複製代碼
JoinBlock<int, string> jb = new JoinBlock<int, string>();
        ActionBlock<Tuple<int, string>> ab = new ActionBlock<Tuple<int, string>>((i) =>
            {
                Console.WriteLine(i.Item1 + " " + i.Item2);
            });
            
        public void TestSync()
        {
            jb.LinkTo(ab);

            for (int i = 0; i < 5; i++)
            {
                jb.Target1.Post(i);
            }

            for (int i = 5; i > 0; i--)
            {
                Thread.Sleep(1000);
                jb.Target2.Post(i.ToString());
            }

            Console.WriteLine("Finished post");
        }
複製代碼

)DNMCJE%H41G[2YBPD%W4%B

 

BatchedJoinBlock

image

BatchedJoinBlock一看就是BacthBlock和JoinBlick的組合。JoinBlick是組合目標隊列的一個數據,而BatchedJoinBlock是組合目標隊列的N個數據,當然這個N可以在構造函數中配置。如果我們定義的是BatchedJoinBlock<int, string>, 那麼在最後的OutputQueue中存儲的是Tuple<IList<int>, IList<string>>,也就是說最後得到的數據是Tuple<IList<int>, IList<string>>。它的行為是這樣的,還是假設上文的定義,BatchedJoinBlock<int, string>, 構造BatchSize輸入為3。那麼在這個BatchedJoinBlock種會有兩個ITargetBlock,會接收Post的數據。那什麼時候會生成一個Tuple<IList<int>,IList<string>>到OutputQueue中呢,測試下來並不是我們想的需要有3個int數據和3個string數據,而是只要2個ITargetBlock中的數據個數加起來等於3就可以了。3和0,2和1,1和2或0和3的組合都會生成Tuple<IList<int>,IList<string>>到OutputQueue中。可以參看下麵的例子:

複製代碼
BatchedJoinBlock<int, string> bjb = new BatchedJoinBlock<int, string>(3);

        ActionBlock<Tuple<IList<int>, IList<string>>> ab = new ActionBlock<Tuple<IList<int>, IList<string>>>((i) =>
            {
                Console.WriteLine("-----------------------------");

                foreach (int m in i.Item1)
                {
                    Console.WriteLine(m);
                };

                foreach (string s in i.Item2)
                {
                    Console.WriteLine(s);
                };
            });

        public void TestSync()
        {
            bjb.LinkTo(ab);

            for (int i = 0; i < 5; i++)
            {
                bjb.Target1.Post(i);
            }

            for (int i = 5; i > 0; i--)
            {
                bjb.Target2.Post(i.ToString());
            }

            Console.WriteLine("Finished post");
        }
複製代碼

GZ}X_[]DM}42_()PXL05A(T

最後剩下的一個數據1,由於沒有滿3個,所以一直被保留在Target2中。

 

TDF中最有用的功能之一就是多個Block之間可以組合應用。ISourceBlock可以連接ITargetBlock,一對一,一對多,或多對多。下麵的例子就是一個TransformBlock和一個ActionBlock的組合。TransformBlock用來把數據*2,並轉換成字元串,然後把數據扔到ActionBlock中,而ActionBlock則用來最後的處理數據列印結果。

 

 

 

 

複製代碼
public ActionBlock<string> abSync = new ActionBlock<string>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }
);

        public TransformBlock<int, string> tbSync = new TransformBlock<int, string>((i) =>
            {
                i = i * 2;
                return i.ToString();
            }
        );

        public void TestSync()
        {
            tbSync.LinkTo(abSync);

            for (int i = 0; i < 10; i++)
            {
                tbSync.Post(i);
            }
            tbSync.Complete();
            Console.WriteLine("Post finished");

            tbSync.Completion.Wait();
            Console.WriteLine("TransformBlock process finished");
        }
複製代碼

7S`N)T79TI4~0X8${XF8[PB

TDF提供的一些Block,通過對這些Block配置和組合,可以滿足很多的數據處理的場景。這一篇將繼續介紹與這些Block配置的相關類,和挖掘一些高級功能。

 

在一些Block的構造函數中,我們常常可以看見需要你輸入DataflowBlockOptions 類型或者它的兩個派生類型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。

DataflowBlockOptions

DataflowBlockOptions有五個屬性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。

用BoundedCapacity來限定容量

這個屬性用來限制一個Block中最多可以緩存數據項的數量,大多數Block都支持這個屬性,這個值預設是DataflowBlockOptions.Unbounded = -1,也就是說沒有限制。開發人員可以制定這個屬性設置數量的上限。那後面的新數據將會延遲。比如說用一個BufferBlock連接一個ActionBlock,如果在ActionBlock上面設置了上限,ActionBlock處理的操作速度比較慢,留在ActionBlock中的數據到達了上限,那麼餘下的數據將留在BufferBlock中,直到ActionBlock中的數據量低於上限。這種情況常常會發生在生產者生產的速度大於消費者速度的時候,導致的問題是記憶體越來越大,數據操作越來越延遲。我們可以通過一個BufferBlock連接多個ActionBlock來解決這樣的問題,也就是負載均衡。一個ActionBlock滿了,就會放到另外一個ActionBlock中去了。

 

用CancellationToken來取消操作

TPL中常用的類型。在Block的構造函數中放入CancellationToken,Block將在它的整個生命周期中全程監控這個對象,只要在這個Block結束運行(調用Complete方法)前,用CancellationToken發送取消請求,該Block將會停止運行,如果Block中還有沒有處理的數據,那麼將不會再被處理。

用MaxMessagesPerTask控制公平性

每一個Block內部都是非同步處理,都是使用TPL的Task。TDF的設計是在保證性能的情況下,儘量使用最少的任務對象來完成數據的操作,這樣效率會高一些,一個任務執行完成一個數據以後,任務對象並不會銷毀,而是會保留著去處理下一個數據,直到沒有數據處理的時候,Block才會回收掉這個任務對象。但是如果數據來自於多個Source,公平性就很難保證。從其他Source來的數據必須要等到早前的那些Source的數據都處理完了才能被處理。這時我們就可以通過MaxMessagesPerTask來控制。這個屬性的預設值還是DataflowBlockOptions.Unbounded=-1,表示沒有上限。假如這個數值被設置為1的話,那麼單個任務只會處理一個數據。這樣就會帶來極致的公平性,但是將帶來更多的任務對象消耗。

用NameFormat來定義Block名稱

MSDN上說屬性NameFormat用來獲取或設置查詢塊的名稱時要使用的格式字元串。

Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。所以當我們輸入”{0}”的時候,名字就是block.GetType ().Name,如果我們數據的是”{1}”,那麼名字就是block.Completion.Id。如果是“{2}”,那麼就會拋出異常。

用TaskScheduler來調度Block行為

TaskScheduler是非常重要的屬性。同樣這個類型來源於TPL。每個Block裡面都使用TaskScheduler來調度行為,無論是源Block和目標Block之間的數據傳遞,還是用戶自定義的執行數據方法委托,都是使用的TaskScheduler。如果沒有特別設置的話,將使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)來調度。我們可以使用其他的一些繼承於TaskScheduler的類型來設置這個調度器,一旦設置了以後,Block中的所有行為都會使用這個調度器來執行。.Net Framework 4中內建了兩個Scheduler,一個是預設的ThreadPoolTaskScheduler,另一個是用於UI線程切換的SynchronizationContextTaskScheduler。如果你使用的Block設計到UI的話,那可以使用後者,這樣在UI線程切換上面將更加方便。

.Net Framework 4.5 中,還有一個類型被加入到System.Threading.Tasks名稱空間下:ConcurrentExclusiveSchedulerPair。這個類是兩個TaskScheduler的組合。它提供兩個TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;我們可以把這兩個TaskScheduler構造進要使用的Block中。他們保證了在沒有排他任務的時候(使用ExclusiveScheduler的任務),其他任務(使用ConcurrentScheduler)可以同步進行,當有排他任務在運行的時候,其他任務都不能運行。其實它裡面就是一個讀寫鎖。這在多個Block操作共用資源的問題上是一個很方便的解決方案。

複製代碼
public ActionBlock<int> readerAB1;

        public ActionBlock<int> readerAB2;

        public ActionBlock<int> readerAB3;

        public ActionBlock<int> writerAB1;

        public BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });

        public void Test()
        {
            ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair();

            readerAB1 = new ActionBlock<int>((i) =>
                {
                    Console.WriteLine("ReaderAB1 begin handling." + " Execute Time:" + DateTime.Now);
                    Thread.Sleep(500);
                }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });

            readerAB2 = new ActionBlock<int>((i) =>
                {
                    Console.WriteLine("ReaderAB2 begin handling." + " Execute Time:" + DateTime.Now);
                    Thread.Sleep(500);
                }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });

            readerAB3 = new ActionBlock<int>((i) =>
                {
                    Console.WriteLine("ReaderAB3 begin handling." + " Execute Time:" + DateTime.Now);
                    Thread.Sleep(500);
                }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });

            writerAB1 = new ActionBlock<int>((i) =>
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("WriterAB1 begin handling." + " Execute Time:" + DateTime.Now);
                Console.ResetColor();
                Thread.Sleep(3000);
            }
            , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });

            bb.LinkTo(readerAB1);
            bb.LinkTo(readerAB2);
            bb.LinkTo(readerAB3);


            Task.Run(() =>
                {
                    while (true)
                    {
                        bb.Post(1);
                        Thread.Sleep(1000);
                    }
                });

            Task.Run(() =>
            {
                while (true)
                {
                    Thread.Sleep(6000);
                    writerAB1.Post(1);
                }
            });

        }
複製代碼

 

FXE@)NN3Q($(J$AXHGWOP~N

 

用MaxDegreeOfParallelism來並行處理

通常,Block中處理數據都是單線程的,一次只能處理一個數據,比如說ActionBlock中自定義的代理。使用MaxDegreeOfParallelism可以讓你並行處理這些數據。屬性的定義是最大的並行處理個數。如果定義成-1的話,那就是沒有限制。用戶需要在實際情況中選擇這個值的大小,並不是越大越好。如果是平行處理的話,還應該考慮是否有共用資源。

 

 

TDF中的負載均衡

我們可以使用Block很方便的構成一個生產者消費者的模式來處理數據。當生產者產生數據的速度快於消費者的時候,消費者Block的Buffer中的數據會越來越多,消耗大量的記憶體,數據處理也會延時。這時,我們可以用一個生產者Block連接多個消費者Block來解決這個問題。由於多個消費者Block一定是並行處理,所以對共用資源的處理一定要做同步處理。

使用BoundedCapacity屬性來實現

當連接多個ActionBlock的時候,可以通過設置ActionBlock的BoundedCapacity屬性。當第一個滿了,就會放到第二個,第二個滿了就會放到第三個。

複製代碼
public BufferBlock<int> bb = new BufferBlock<int>();

        public ActionBlock<int> ab1 = new ActionBlock<int>((i) =>
            {
                Thread.Sleep(1000);
                Console.WriteLine("ab1 handle data" + i + " Execute Time:" + DateTime.Now);
            }
        , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });

        public ActionBlock<int> ab2 = new ActionBlock<int>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("ab2 handle data" + i + " Execute Time:" + DateTime.Now);
        }
        , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });

        public ActionBlock<int> ab3 = new ActionBlock<int>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("ab3 handle data:" + i + " Execute Time:" + DateTime.Now);
        }
        , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });

        public void Test()
        {
            bb.LinkTo(ab1);
            bb.LinkTo(ab2);
            bb.LinkTo(ab3);

            for (int i = 0; i < 9; i++)
            {
                bb.Post(i);
            }
        }
複製代碼

PNQFKIK2OK)}SOWA]RF(~$M

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 關於這三個類在字元串處理中的位置不言而喻,那麼他們到底有什麼優缺點,到底什麼時候該用誰呢?下麵我們從以下幾點說明一下 1、三者在執行速度方面的比較: StringBuilder > StringBuffer > String 為什麼String的執行速度最慢? String:字元串常量 String ...
  • 前一篇有學習過《把List<T>轉換為DataTable》http://www.cnblogs.com/insus/p/8043173.html 那此篇,將是學習反向,把DataTable轉換為List<T>。這個方法使用的較多。很多情況之後,從數據讀出來的數據是DataSet或是DataTable ...
  • .net小白一枚,經過了幾個小時的研究,由於錯誤的寫法導致後臺始終接受不到前臺傳遞過來的參數。首先看看控制器的參數 本人實在是偷懶才使用dynamic關鍵字,ajax使用如下寫法,會一直出現 不能綁定null或者是account沒有之類的錯誤。 設置了content-type後臺也不能接收到。這比較 ...
  • 下麵這個學習,把List<T>轉換為Datatable。 下麵先創建一個對象T: class Ay { private int _ID; public int ID { get { return _ID; } set { _ID = value; } } private string _Accoun ...
  • 一、代碼結構: 二、數據實體類: using System; using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.Text; using Syste ...
  • using System.IO;using UnityEngine;using UnityEditor; public class TestSaveSprite{ [MenuItem("LLWH/SpriteSplit")] public static void SpriteSlice() { Te ...
  • REmote DIctionary Server(Redis) 是一個由Salvatore Sanfilippo寫的key-value存儲系統。 Redis是一個開源的使用ANSI C語言編寫、遵守BSD協議、支持網路、可基於記憶體亦可持久化的日誌型、Key-Value資料庫,並提供多種語言的API。 ...
  • 使用一個小例子來演示:創建一個普通類別: class Ax { private int _ID; public int ID { get { return _ID; } set { _ID = value; } } private string _Name; public string Name { ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...