Parallel Programming-實現並行操作的流水線(生產者、消費者) ...
本文介紹如何使用C#實現並行執行的流水線(生產者消費者):
1.流水線示意圖
2.實現並行流水線
一、流水線示意圖
上圖演示了流水線,action1接收input,然後產生結果保存在buffer1中,action2讀取buffer1中由action1產生的數據,以此類推指導action4完成產生Output。
以上也是典型的生產者消費者模式。
上面的模式如果使用普通常規的串列執行是很簡單的,按部就班按照流程圖一步一步執行即可。如果為了提高效率,想使用並行執行,也就是說生產者和消費者同時並行執行,該怎麼辦麽?
二、實現並行流水線
2.1 代碼
class PiplelineDemo { private int seed; public PiplelineDemo() { seed = 10; } public void Action1(BlockingCollection<string> output) { try { for (var i = 0; i < seed; i++) { output.Add(i.ToString());//initialize data to buffer1 } } finally { output.CompleteAdding(); } } public void Action2(BlockingCollection<string> input, BlockingCollection<string> output) { try { foreach (var item in input.GetConsumingEnumerable()) { var itemToInt = int.Parse(item); output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2 } } finally { output.CompleteAdding(); } } public void Action3(BlockingCollection<string> input, BlockingCollection<string> output) { try { foreach (var item in input.GetConsumingEnumerable()) { output.Add(item);//set data into buffer3 } } finally { output.CompleteAdding(); } } public void Pipeline() { var buffer1 = new BlockingCollection<string>(seed); var buffer2 = new BlockingCollection<string>(seed); var buffer3 = new BlockingCollection<string>(seed); var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var stage1 = taskFactory.StartNew(() => Action1(buffer1)); var stage2 = taskFactory.StartNew(() => Action2(buffer1, buffer2)); var stage3 = taskFactory.StartNew(() => Action3(buffer2, buffer3)); Task.WaitAll(stage1, stage2, stage3); foreach(var item in buffer3.GetConsumingEnumerable())//print data in buffer3 { Console.WriteLine(item); } } } class Program { static void Main(string[] args) { new PiplelineDemo().Pipeline(); Console.Read(); } }
2.2 運行結果
預期列印出了0-9自我相乘的結果。
2.3 代碼解釋
代碼本身的邏輯和本文開始的流程圖是一一對應的。
BlockingCollection<T>是.Net裡面的一個線程安全集合。實現了IProducerConsumerCollection<T>.
- Add方法:將元素加入集合
- CompleteAdding方法:告訴消費者,在當調用該方法之前的元素處理完之後就不要再等待處理了,可以結束處理了。這個非常重要,一定要執行,所以放在finally中(就算exception也要執行)
- GetConsumingEnumberable,給消費者返回一個可以便利的集合