巧妙地使用Interlocked的各個方法,再無鎖無阻塞的情況下判斷出所有線程的運行完成狀態。 昨晚耐著性子看完了clr via c#的第29章<<基元線程同步構造>>,儘管這本書不是第一次看了,但是之前看的都是一帶而過,沒有深入理解,甚至可以說是不理解,實習了之後發現自己的知識原來這麼錶面,很多的 ...
巧妙地使用Interlocked的各個方法,再無鎖無阻塞的情況下判斷出所有線程的運行完成狀態。
昨晚耐著性子看完了clr via c#的第29章<<基元線程同步構造>>,儘管這本書不是第一次看了,但是之前看的都是一帶而過,沒有深入理解,甚至可以說是不理解,實習了之後發現自己的知識原來這麼錶面,很多的實現都不能做出來,這很大程度上打擊了我,而且,春招也快來了,更需要打扎實基礎。引起我註意的是jeffrey在第29章說的:使用Interlocked,代碼很短,絕不阻塞任何線程,二期使用線程池線程來實現自動伸縮。下載了源碼,然後分析了下書中的示例,code如下:
using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; namespace vlr_via_cs { internal static class AsyncCoordinatorDemo { public static void Go() { const Int32 timeout = 50000; // Change to desired timeout MultiWebRequests act = new MultiWebRequests(timeout); Console.WriteLine("All operations initiated (Timeout={0}). Hit <Enter> to cancel.", (timeout == Timeout.Infinite) ? "Infinite" : (timeout.ToString() + "ms")); Console.ReadLine(); act.Cancel(); Console.WriteLine(); Console.WriteLine("Hit enter to terminate."); Console.ReadLine(); } private sealed class MultiWebRequests { // This helper class coordinates all the asynchronous operations private AsyncCoordinator m_ac = new AsyncCoordinator(); // Set of Web servers we want to query & their responses (Exception or Int32) private Dictionary<String, Object> m_servers = new Dictionary<String, Object> { { "http://cjjjs.com/", null }, { "http://cnblogs.com/", null }, { "http://www.jobbole.com/", null } }; public MultiWebRequests(Int32 timeout = Timeout.Infinite) { // Asynchronously initiate all the requests all at once var httpClient = new HttpClient(); foreach (var server in m_servers.Keys) { m_ac.AboutToBegin(1); //確保先做三次加法, 若是有Sleep,在調用完這個函數後,執行 httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task)); } // Tell AsyncCoordinator that all operations have been initiated and to call // AllDone when all operations complete, Cancel is called, or the timeout occurs m_ac.AllBegun(AllDone, timeout); } private void ComputeResult(String server, Task<Byte[]> task) { Object result; if (task.Exception != null) { result = task.Exception.InnerException; } else { // Process I/O completion here on thread pool thread(s) // Put your own compute-intensive algorithm here... result = task.Result.Length; // This example just returns the length } // Save result (exception/sum) and indicate that 1 operation completed m_servers[server] = result; m_ac.JustEnded(); } // Calling this method indicates that the results don't matter anymore public void Cancel() { m_ac.Cancel(); } // This method is called after all Web servers respond, // Cancel is called, or the timeout occurs private void AllDone(CoordinationStatus status) { switch (status) { case CoordinationStatus.Cancel: Console.WriteLine("Operation canceled."); break; case CoordinationStatus.Timeout: Console.WriteLine("Operation timed-out."); break; case CoordinationStatus.AllDone: Console.WriteLine("Operation completed; results below:"); foreach (var server in m_servers) { Console.Write("{0} ", server.Key); Object result = server.Value; if (result is Exception) { Console.WriteLine("failed due to {0}.", result.GetType().Name); } else { Console.WriteLine("returned {0:N0} bytes.", result); } } break; } } } private enum CoordinationStatus { AllDone, Timeout, Cancel }; private sealed class AsyncCoordinator { private Int32 m_opCount = 1; // Decremented when AllBegun calls JustEnded private Int32 m_statusReported = 0; // 0=false, 1=true private Action<CoordinationStatus> m_callback; private Timer m_timer; // This method MUST be called BEFORE initiating an operation public void AboutToBegin(Int32 opsToAdd = 1) { Interlocked.Add(ref m_opCount, opsToAdd); } // This method MUST be called AFTER an operations result has been processed public void JustEnded() { if (Interlocked.Decrement(ref m_opCount) == 0) ReportStatus(CoordinationStatus.AllDone); } // This method MUST be called AFTER initiating ALL operations public void AllBegun(Action<CoordinationStatus> callback, Int32 timeout = Timeout.Infinite) { m_callback = callback; if (timeout != Timeout.Infinite) { // 在指定的時間點(dueTime) 調用回調函數,隨後在指定的時間間隔(period)調用回調函數 m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite); } JustEnded(); } // 處理過時的線程 private void TimeExpired(Object o) { ReportStatus(CoordinationStatus.Timeout); } public void Cancel() { if (m_callback == null) throw new InvalidOperationException("Cancel cannot be called before AllBegun"); ReportStatus(CoordinationStatus.Cancel); } private void ReportStatus(CoordinationStatus status) { if (m_timer != null) { // If timer is still in play, kill it Timer timer = Interlocked.Exchange(ref m_timer, null); if (timer != null) timer.Dispose(); } // If status has never been reported, report it; else ignore it if (Interlocked.Exchange(ref m_statusReported, 1) == 0) m_callback(status); } } } class Program { static void Main(string[] args) { AsyncCoordinatorDemo.Go(); Console.Read(); } } }
的確是無鎖的操作,Interlocked方法是用戶模式下的原子操作,針對的是CPU,不是線程記憶體,而且它是自旋等待的,耗費的是CPU資源。分析了下AsyncCoordinator類,主要就是利用Interlocked的Add方法,實時計數線程的數量,隨後待一個線程運行的最後又調用Interlocked的Decrement方法自減。如果你留心的話,你會發現,目前絕大多數的併發判斷中都用到了Interlocked的這些方法,尤其是interlocked的anything模式下的compareexchange方法,在這裡提一嘴,除了compareexchange和exchange方法的返回值是返回ref類型原先的值之外,其餘的方法都是返回改變之後的值。最後我們可以通過AllBegun方法來判斷是不是所有的線程都執行完了,隨後將狀態變數m_statusReported設置為1,防止在進行狀態判斷。
這個類很好,之前寫併發的時候,老是煩惱怎麼判斷併發是否已經完事了,又不想用到阻塞,這個類很好,當然應用到具體項目中可能還需要改,但是基本的模型還是這個,不變的。
有點感慨:好東西需要我們自己去發掘,之前查生產者消費者模型的時候,java代碼一大堆,愣是沒有看到幾個C#,就算有也是簡易,儘管可以把java的改變為C#的,但有點感慨C#的技術棧和資源少