" 返回《C 併發編程》" "1. 調度到線程池" "2. 任務調度器" "2.1. Default 調度器" "2.2. 捕獲當前同步上下文 調度器" "2.3. ConcurrentExclusiveSchedulerPair 調度器" "3. 調度並行代碼" "4. 用調度器實現數據流的同步" ...
1. 調度到線程池
Task task = Task.Run(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(2));
});
Task.Run
也能正常地返回結果,能使用非同步 Lambda 表達式。下麵代碼中 Task.Run
返回的 task 會在 2 秒後完成,並返回結果 13:
Task<int> task = Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(2));
return 13;
});
Task.Run
返回一個 Task
(或 Task<T>
)對象,該對象可以被非同步或響應式代碼正常使用。
註意: 但不要在 ASP.NET 中使用
Task.Run
,除非你有絕對的把握。在 ASP.NET 中, 處理請求的代碼本來就是在 ASP.NET 線程池線程中運行的,強行把它放到另一個線程池線程通常會適得其反。
但UI程式,使用Task.Run可以執行耗時操作,有效的防止頁面卡住問題。
在進行動態並行開發時, 一定要用 Task.Factory.StartNew
來代替 Task.Run
- 因為根據預設配置,
Task.Run
返回的Task
對象適合被非同步調用(即被非同步代碼或響應式代碼使用)。 Task.Run
也不支持動態並行代碼中普遍使用的高級概念,例如 父/子任務。
2. 任務調度器
需要讓多個代碼段按照指定的方式運行。例如
- 讓所有代碼段在 UI 線程中運行
- 只允許特定數量的代碼段同時運行。
2.1. Default 調度器
TaskScheduler.Default
,它的作用是讓任務在線程池中排隊, Task.Run
、並行、數據流的代碼用的都是 TaskScheduler.Default
。
2.2. 捕獲當前同步上下文 調度器
可以捕獲一個特定的上下文,用 TaskScheduler.FromCurrentSynchronizationContext
調度任務,讓它回到該上下文:
TaskScheduler scheduler = TaskScheduler.FromCurrentSynchronizationContext();
這條語句創建了一個捕獲當前 同步上下文 的 TaskScheduler
對象,並將代碼調度到這個上下文中
SynchronizationContext
類表示一個通用的調度上下文。- 大多數 UI 框架有一個表示 UI 線程的 同步上下文
- ASP.NET 有一個表示 HTTP 請求的 同步上下文
建議:
在 UI 線程上執行代碼時,永遠不要使用針對特定平臺的類型。
+ WPF、IOS、Android 都有 Dispatcher
類
- Windows 應用商店平臺使用
CoreDispatcher
- WinForms 有
ISynchronizeInvoke
介面(即Control.Invoke
)
不要在新寫的代碼中使用這些類型,就當它們不存在吧。使用這些類型會使代碼無謂地綁定在某個特定平臺上。
同步上下文 是通用的、基於上述類型的抽象類。
2.3. ConcurrentExclusiveSchedulerPair 調度器
它實際上是兩個互相關聯的調度器。 只要 ExclusiveScheduler
上沒有運行任務, ConcurrentScheduler
就可以讓多個任務同時執行。只有當 ConcurrentScheduler
沒有執行任務時, ExclusiveScheduler
才可以執行任務,並且每次只允許運行一個任務:
public static void ConcurrentExclusiveSchedulerPairRun()
{
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 2);
//由於並行被限流,所以ConcurrentScheduler 會兩個兩個輸出,然後執行完這兩個開啟的8個串列任務
TaskScheduler concurrent = schedulerPair.ConcurrentScheduler;
TaskScheduler exclusive = schedulerPair.ExclusiveScheduler;
//Default 由於沒有限制,所以第一層會先輸出,全部隨機
// TaskScheduler concurrent = TaskScheduler.Default;
// TaskScheduler exclusive =TaskScheduler.Default;
var list = new List<List<int>>();
for (int i = 0; i < 4; i++)
{
var actionList = new List<int>();
list.Add(actionList);
for (int j = 0; j < 4; j++)
{
actionList.Add(i * 10 + j);
}
}
var tasks = list.Select(u => Task.Factory.StartNew(state =>
{
System.Console.WriteLine($"ConcurrentScheduler");
((List<int>)state).Select(i => Task.Factory.StartNew(state2 => System.Console.WriteLine($"ExclusiveScheduler:{state2}"), i, CancellationToken.None, TaskCreationOptions.None, exclusive)).ToArray();
}, u, CancellationToken.None, TaskCreationOptions.None, concurrent));
Task.WaitAll(tasks.ToArray());
}
輸出:
ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:0
ExclusiveScheduler:1
ExclusiveScheduler:2
ExclusiveScheduler:3
ExclusiveScheduler:10
ExclusiveScheduler:11
ExclusiveScheduler:12
ExclusiveScheduler:13
ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:20
ExclusiveScheduler:21
ExclusiveScheduler:22
ExclusiveScheduler:23
ExclusiveScheduler:30
ExclusiveScheduler:31
ExclusiveScheduler:32
ExclusiveScheduler:33
ConcurrentExclusiveSchedulerPair
的常見用法是
- 用
ExclusiveScheduler
來確保每次只運行一個任務。 ExclusiveScheduler
執行的代碼會線上程池中運行,但是使用了同一個ExclusiveScheduler
對象的其他代碼不能同時運行。
ConcurrentExclusiveSchedulerPair
的另一個用法是作為限流調度器。
- 創建的
ConcurrentExclusiveSchedulerPair
對象可以限制自身的併發數量。 - 這時通常不使用 ExclusiveScheduler
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,maxConcurrencyLevel: 8);
TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;
3. 調度並行代碼
public static void RotateMatricesRun()
{
List<List<Action<float>>> actionLists = new List<List<Action<float>>>();
for (int i = 0; i < 15; i++)
{
var actionList = new List<Action<float>>();
actionLists.Add(actionList);
for (int j = 0; j < 15; j++)
{
actionList.Add(new Action<float>(degrees =>
{
Thread.Sleep(200);
System.Console.WriteLine("degrees:" + degrees + " " + DateTime.Now.ToString("HHmmss.fff"));
}));
}
}
RotateMatrices(actionLists, 10);
//雖然兩個並行嵌套但是由於調度器的設置,導致任務是8個8個執行的,結果是8個後200ms再8個
}
static void RotateMatrices(IEnumerable<IEnumerable<Action<float>>> collections, float degrees)
{
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 8);
TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;
ParallelOptions options = new ParallelOptions
{
TaskScheduler = scheduler
};
Parallel.ForEach(collections, options,
matrices =>
{
Parallel.ForEach(matrices,
options,
matrix => matrix.Invoke(degrees)
);
System.Console.WriteLine($"============");
});
}
輸出:
degrees:10 190424.120
... 118個 ...
degrees:10 190426.963
============
============
============
============
============
============
============
============
degrees:10 190427.167
... 6個 ...
degrees:10 190427.167
... 5個 ...
degrees:10 190428.589
... 6個 ...
degrees:10 190428.589
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
============
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.994
... 6個 ...
degrees:10 190428.994
============
degrees:10 190429.194
... 6個 ...
degrees:10 190429.194
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.800
============
4. 用調度器實現數據流的同步
Stopwatch sw = Stopwatch.StartNew();
// 模擬 UI同步上下文
AsyncContext.Run(() =>
{
var options = new ExecutionDataflowBlockOptions
{
//使用次調度器,則代碼會放到創建線程的同步上下文上執行(若是當前同步上下文是UI Context 或 此例的AsyncContext)
//運行和註釋下行運行觀察Creator和Executor線程Id的變化
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
};
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
System.Console.WriteLine($"Creator ThreadId: {Thread.CurrentThread.ManagedThreadId}.");
var displayBlock = new ActionBlock<int>(result =>
{
// ListBox.Items.Add(result)
System.Console.WriteLine($"Executor ThreadId: {Thread.CurrentThread.ManagedThreadId} res:{result}.");
}, options);
multiplyBlock.LinkTo(displayBlock);
for (int i = 0; i < 5; i++)
{
multiplyBlock.Post(i);
System.Console.WriteLine($"Post {i}");
}
multiplyBlock.Completion.Wait(2000);
});
System.Console.WriteLine($"Cost {sw.ElapsedMilliseconds}ms.");
輸出:
Creator ThreadId: 1.
Post 0
Post 1
Post 2
Post 3
Post 4
Executor ThreadId: 1 res:0.
Executor ThreadId: 1 res:2.
Executor ThreadId: 1 res:4.
Executor ThreadId: 1 res:6.
Executor ThreadId: 1 res:8.
Cost 2062ms.