阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元 ...
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
先放張圖:
根據前面的描述, 我們來考慮下阻塞隊列在程式中會出現的問題:
阻塞隊列 需要實現兩個功能: 使線程等待與喚醒線程. 具體介紹如下:
在極端條件下, 需要掛起線程, 等待隊列滿足條件後,再去執行添加或提取 操作
待隊列滿足了條件之後, 通知線程去繼續其掛起之前的操作....
涉及到的技術:
線程同步 與 線程間通信
可能產生死鎖的分析:
在某個時刻,隊列為空或者是已滿, 此時生產者未能存入數據或者還在存入數據到隊列中, 這就會產生使得隊列出錯
如果此時, 消費者對隊列在進行操作就會產生死鎖...由於之前的生產者的操作使得隊列出了問題並沒有釋放鎖, 此時就會造成死鎖
這是從預防死鎖的角度來解決死鎖問題
首先就是同步資源-隊列的鎖定,既然有鎖那麼就要考慮死鎖問題,最後就是線程間的通信。
也就是說,實現阻塞隊列需要考慮這三個點。
查了下資料,大多都是java的封裝好的類庫,不過沒事,反正思想,理論都是一樣的,不同的就是實現不同。但還是有個不錯的C#實現----<< http://www.cnblogs.com/samgk/p/4772806.html C# 實現生產者消費者隊列 >>。該文其實也道出了阻塞隊列在除去生產者-消費者模型外的應用,昨天查資料的時候,阿裡程式員寫了篇文章關於郵件接收下載的,就是使用阻塞隊列,但是我忘了原文在哪了。當時看的時候,想起來當初看<<C#高級編程>>第十章的管道。書上介紹的是:開一個task去讀取文件名,放到阻塞隊列中,然後開一個隊列根據文件名讀取內容,這個應用於郵件接收下載是一樣的。暫時先不說這個了,有興趣的可以自己去看看那本書。
那麼我們如何自己實現阻塞隊列呢?正如上面說到的考慮點,同步,線程通信,防止死鎖。看看代碼:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace SuiBao.Utility { ///阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。 //阻塞隊列 需要實現兩個功能: 使線程等待與喚醒線程. 具體介紹如下: // 在極端條件下, 需要掛起線程, 等待隊列滿足條件後,再去執行添加或提取 操作 // 待隊列滿足了條件之後, 通知線程去繼續其掛起之前的操作.... //涉及到的技術: //線程同步(此實例用到了lock) 與 線程間通信(此示例用到了event) // // 可能產生死鎖的分析: // 在某個時刻,隊列為空或者是已滿, 此時生產者未能存入數據或者還在存入數據到隊列中, 這就會產生使得隊列出錯 // 如果此時, 消費者對隊列在進行操作就會產生死鎖...由於之前的生產者的操作使得隊列出了問題並沒有釋放鎖, 此時就會造成死鎖 // 這是從預防死鎖的角度來解決死鎖問題 public class BlockQueue<T> { private Queue<T> _inner_queue = null; private ManualResetEvent _dequeue_wait = null; public int Count { get { return _inner_queue.Count; } } public BlockQueue(int capacity = -1) { this._inner_queue = capacity == -1 ? new Queue<T>() : new Queue<T>(capacity); this._dequeue_wait = new ManualResetEvent(false); } // 入隊加鎖 public void EnQueue(T item) { if (this._IsShutdown == true) throw new InvalidOperationException("服務未開啟.[EnQueue]"); lock (this._inner_queue) { this._inner_queue.Enqueue(item); this._dequeue_wait.Set(); } } // 出隊加鎖 public T DeQueue(int waitTime) { bool _queueEmpty = false; T item = default(T); while (true) { lock (this._inner_queue) { // 判斷隊列中是否有元素.... if (this._inner_queue.Count > 0) { item = this._inner_queue.Dequeue(); this._dequeue_wait.Reset(); //break; } else { if (this._IsShutdown == true) { throw new InvalidOperationException("服務未開啟[DeQueue]."); } else { _queueEmpty = true; } } } if (item != null) { return item; } if (_queueEmpty) { this._dequeue_wait.WaitOne(waitTime); } } } private bool _IsShutdown = false; public void Shutdown() { this._IsShutdown = true; this._dequeue_wait.Set(); } public void Clear() { this._inner_queue.Clear(); } } }
那麼.net中有沒有封裝好的阻塞隊列?有啊!BlockingCollection<>類,其實我之前寫的好些關於線程的文章都說到了這個類庫,用到的地方也多。該類預設的容器是ConcurrentQueue,因此,同步就做好了,而且該類還實現了阻塞的功能:
多個線程或任務可同時向集合添加項,如果集合達到其指定最大容量,則製造線程將發生阻塞,直到移除集合中的某個項。 多個使用者可以同時移除項,如果集合變空,則使用線程將發生阻塞,直到製造者添加某個項。 製造線程可調用 CompleteAdding 來指示不再添加項。 使用者將監視 IsCompleted 屬性以瞭解集合何時為空且不再添加項。
原文說明 BlockingCollection 概述 (https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview )
感慨一句,微軟的好東西是真多,為什麼不能像java那樣輕易地被人發現使用呢?
沒錯,我們使用這個類就可以輕易地實現阻塞隊列了,而且是完美的實現.
更多的關於多線程與併發的博客,請移步我的博客