" 返回《C 併發編程》" "1. 簡介" "2. 鏈接數據流塊" "3. 傳遞出錯信息" "4. 斷開鏈接" "5. 限制流量" "6. 數據流塊的並行處理" "7. 創建自定義數據流塊" 1. 簡介 TPL 數據流(dataflow)庫的功能很強大,可用來創建 網格 (mesh)和 管道 (pi ...
1. 簡介
TPL 數據流(dataflow)庫的功能很強大,可用來創建網格(mesh)和管道(pipleline), 並通過它們以非同步方式發送數據。
主要命名空間: System.Threading.Tasks.Dataflow
2. 鏈接數據流塊
創建網格時,需要把數據流塊互相連接起來。
public static void LinkBlockRun()
{
System.Console.WriteLine("Building Block link.");
TransformBlock<int, int> multiplyBlock = new TransformBlock<int, int>(item =>
{
System.Console.WriteLine("first block.");
Thread.Sleep(500);
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item =>
{
System.Console.WriteLine("last block.");
Thread.Sleep(500);
return item - 2;
});
var options = new DataflowLinkOptions
{
PropagateCompletion = true
};
multiplyBlock.LinkTo(subtractBlock, options);
System.Console.WriteLine("Builded Block link.");
var task = Task.Run(async () =>
{
System.Console.WriteLine("Posting");
for (int i = 0; i < 3; i++)
{
multiplyBlock.Post(i);
}
System.Console.WriteLine("Posted");
// 第一個塊的完成情況自動傳遞給第二個塊。
// Complete 後,再進行 Post 是無效的
multiplyBlock.Complete();
await multiplyBlock.Completion;
// 鏈接使用完了
System.Console.WriteLine("Block link Ended.");
});
task.Wait();
}
輸出為:
Building Block link.
Builded Block link.
Posting
Posted
first block.
first block.
last block.
first block.
last block.
last block.
Block link Ended.
3. 傳遞出錯信息
public static void BlockErrorRun()
{
Task.Run(async () =>
{
try
{
//單個塊異常類型
var block = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
block.Post(1);
await block.Completion;
}
catch (InvalidOperationException ex)
{
System.Console.WriteLine(ex.GetType().Name);
}
try
{
//被連接的塊異常類型
var multiplyBlock = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(1);
await subtractBlock.Completion;
}
catch (AggregateException ex)
{
System.Console.WriteLine(ex.GetType().Name);
}
}).Wait();
}
輸出為:
InvalidOperationException
AggregateException
- 對於最簡單的情況,最好是把錯誤傳遞下去,等到最後再作一次性處理。
- 對於更複雜的網格,在數據流完成後需要檢查每一個數據流塊。
4. 斷開鏈接
public static void BlockDisposeRun()
{
var multiplyBlock = new TransformBlock<int, int>(item =>
{
System.Console.WriteLine("first block.");
Thread.Sleep(500);
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item =>
{
System.Console.WriteLine("last block.");
Thread.Sleep(500);
return item - 2;
});
IDisposable link = multiplyBlock.LinkTo(subtractBlock);
multiplyBlock.Post(1);
multiplyBlock.Post(2);
// 斷開數據流塊的鏈接。
// 前面的代碼中,數據可能已經通過鏈接傳遞過去,也可能還沒有。
// 在實際應用中,考慮使用代碼塊,而不是調用 Dispose。
link.Dispose();
Thread.Sleep(1200);
}
輸出為:
first block.
first block.
5. 限制流量
用數據流塊 的 BoundedCapacity
屬性,來限制目標塊的流量(throttling)。 BoundedCapacity
的預設設置是 DataflowBlockOptions.Unbounded
解決的問題:
- 防止數據的數據太多太快,導致第一個目標塊在還來不及處理數據時就得對所有數據進行了緩衝
public static void BlockBoundedCapacityRun()
{
var sourceBlock = new BufferBlock<int>();
var options = new DataflowBlockOptions
{
BoundedCapacity = 10
//BoundedCapacity = DataflowBlockOptions.Unbounded
};
var targetBlockA = new BufferBlock<int>(options);
var targetBlockB = new BufferBlock<int>(options);
sourceBlock.LinkTo(targetBlockA);
sourceBlock.LinkTo(targetBlockB);
for (int i = 0; i < 31; i++)
{
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Post:{i % 10}");
sourceBlock.Post(i % 10);
}
//向水管中註入31個水滴
//由於分支的限流, targetBlockA 和 targetBlockB 各得到了10各水滴
var task = Task.Run(() =>
{
int i = 0;
System.Console.WriteLine("先處理 targetBlockA 的水滴,此處迴圈接收會將水滴接乾,但是接不到存在 targetBlockB 中的水滴");
do
{
IList<int> res;
if (targetBlockA.TryReceiveAll(out res))
{
i += res.Count;
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcA:{string.Join(",", res)} {i}");
}
else
{
break;
}
Thread.Sleep(100);
} while (true);
i = 0;
System.Console.WriteLine("處理 targetBlockB 的水滴,只剩下緩衝的水滴");
do
{
IList<int> res;
if (targetBlockB.TryReceiveAll(out res))
{
i += res.Count;
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcB:{string.Join(",", res)} {i}");
}
else
{
break;
}
Thread.Sleep(100);
} while (true);
});
task.Wait();
}
輸出為:
40:28.026 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
先處理 targetBlockA 的水滴,此處迴圈接收會將水滴接乾,但是接不到存在 targetBlockB 中的水滴
40:28.043 RevcA:0,1,2,3,4,5,6,7,8,9 10
40:28.149 RevcA:0,1,2,3,4,5,6,7,8,9 20
40:28.249 RevcA:0 21
處理 targetBlockB 的水滴,只剩下緩衝的水滴
40:28.350 RevcB:0,1,2,3,4,5,6,7,8,9 10
限流例子: 在用 I/O 操作的數據填充數據流網格時,可以設置數據流塊的 BoundedCapacity
屬性。這樣,在網格來不及處理數據時,就不會讀取過多的 I/O 數據,網格也不會緩存所有數據。
6. 數據流塊的並行處理
public static void BlockParalleRun()
{
var multiplyBlock = new TransformBlock<int, int>(
item =>
{
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block.");
Thread.Sleep(100);
return item * 2;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
}
);
var subtractBlock = new TransformBlock<int, int>(item =>
{
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block.");
Thread.Sleep(100);
return item - 2;
});
multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });
var task = Task.Run(async () =>
{
for (int i = 0; i < 7; i++)
{
multiplyBlock.Post(i);
}
multiplyBlock.Complete();
await multiplyBlock.Completion;
var tk = Task.Run(() =>
{
IList<int> recvResList;
//此處延時為了TryReceiveAll獲取所有數據,防止 subtractBlock 還有數據未接收
Thread.Sleep(1500);
if (subtractBlock.TryReceiveAll(out recvResList))
{
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}.");
}
else
{
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc null.");
}
});
await tk;
// multiplyBlock 已經調用完成,subtractBlock 的完成狀態依賴於 Link 參數 PropagateCompletion
await subtractBlock.Completion;
});
task.Wait();
}
輸出為:
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.146 last block.
44:16.250 last block.
44:16.351 last block.
44:16.452 last block.
44:16.552 last block.
44:16.652 last block.
44:16.753 last block.
44:17.656 Revc -2,0,2,4,6,8,10.
真正的難點: 找出哪些數據流塊需要並行處理
7. 創建自定義數據流塊
public static void BlockCustomRun()
{
var block = CreateMyCustomBlock();
for (int i = 0; i < 7; i++)
{
block.Post(i);//target
}
var task = Task.Run(async () =>
{
var tk = Task.Run(() =>
{
List<int> recvResList = new List<int>();
//此處延時為了TryReceiveAll獲取所有數據,防止 subtractBlock 還有數據未接收
while (true)
{
try
{
var recvRes = block.Receive();//source
recvResList.Add(recvRes);
}
catch (System.InvalidOperationException)
{
break;
}
}
Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}.");
});
block.Complete();//target
await block.Completion;//source
await tk;
});
task.Wait();
}
static IPropagatorBlock<int, int> CreateMyCustomBlock()
{
var multiplyBlock = new TransformBlock<int, int>(item =>
{
int res = item * 2;
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block {res}.");
Thread.Sleep(100);
return res;
});
var addBlock = new TransformBlock<int, int>(item =>
{
int res = item + 2;
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} next block {res}.");
Thread.Sleep(100);
return res;
});
var divideBlock = new TransformBlock<int, int>(item =>
{
int res = item / 2;
System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block {res}.");
Thread.Sleep(100);
return res;
});
var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
multiplyBlock.LinkTo(addBlock, flowCompletion);
addBlock.LinkTo(divideBlock, flowCompletion);
return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
}
輸出為:
45:00.528 first block 0.
45:00.639 first block 2.
45:00.641 next block 2.
45:00.739 first block 4.
45:00.746 next block 4.
45:00.747 last block 1.
45:00.844 first block 6.
45:00.847 next block 6.
45:00.848 last block 2.
45:00.947 first block 8.
45:00.951 next block 8.
45:00.951 last block 3.
45:01.049 first block 10.
45:01.055 next block 10.
45:01.056 last block 4.
45:01.152 first block 12.
45:01.159 next block 12.
45:01.160 last block 5.
45:01.264 next block 14.
45:01.265 last block 6.
45:01.365 last block 7.
45:01.472 Revc 1,2,3,4,5,6,7.
DataflowBlock.Encapsulate
只會封裝只有一個輸入塊和一個輸出塊的網格。如果一個可重用的網格帶有多個輸入或輸出,就應該把它封裝進一個自定義對象