深入理解阻塞隊列

来源:http://www.cnblogs.com/zhiyong-ITNote/archive/2017/12/03/7965661.html
-Advertisement-
Play Games

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元 ...


阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。

先放張圖:


根據前面的描述, 我們來考慮下阻塞隊列在程式中會出現的問題:
阻塞隊列 需要實現兩個功能: 使線程等待與喚醒線程. 具體介紹如下:
在極端條件下, 需要掛起線程, 等待隊列滿足條件後,再去執行添加或提取 操作
待隊列滿足了條件之後, 通知線程去繼續其掛起之前的操作....
涉及到的技術:
線程同步 與 線程間通信
可能產生死鎖的分析:
在某個時刻,隊列為空或者是已滿, 此時生產者未能存入數據或者還在存入數據到隊列中, 這就會產生使得隊列出錯
如果此時, 消費者對隊列在進行操作就會產生死鎖...由於之前的生產者的操作使得隊列出了問題並沒有釋放鎖, 此時就會造成死鎖
這是從預防死鎖的角度來解決死鎖問題
首先就是同步資源-隊列的鎖定,既然有鎖那麼就要考慮死鎖問題,最後就是線程間的通信。

也就是說,實現阻塞隊列需要考慮這三個點。

查了下資料,大多都是java的封裝好的類庫,不過沒事,反正思想,理論都是一樣的,不同的就是實現不同。但還是有個不錯的C#實現----<< http://www.cnblogs.com/samgk/p/4772806.html C# 實現生產者消費者隊列 >>。該文其實也道出了阻塞隊列在除去生產者-消費者模型外的應用,昨天查資料的時候,阿裡程式員寫了篇文章關於郵件接收下載的,就是使用阻塞隊列,但是我忘了原文在哪了。當時看的時候,想起來當初看<<C#高級編程>>第十章的管道。書上介紹的是:開一個task去讀取文件名,放到阻塞隊列中,然後開一個隊列根據文件名讀取內容,這個應用於郵件接收下載是一樣的。暫時先不說這個了,有興趣的可以自己去看看那本書。
那麼我們如何自己實現阻塞隊列呢?正如上面說到的考慮點,同步,線程通信,防止死鎖。看看代碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace SuiBao.Utility
{
    ///阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
    
    //阻塞隊列 需要實現兩個功能: 使線程等待與喚醒線程. 具體介紹如下:
    // 在極端條件下, 需要掛起線程, 等待隊列滿足條件後,再去執行添加或提取 操作
    // 待隊列滿足了條件之後, 通知線程去繼續其掛起之前的操作....
    //涉及到的技術:
    //線程同步(此實例用到了lock) 與 線程間通信(此示例用到了event)
    //
    
    // 可能產生死鎖的分析:
    // 在某個時刻,隊列為空或者是已滿, 此時生產者未能存入數據或者還在存入數據到隊列中, 這就會產生使得隊列出錯
    // 如果此時, 消費者對隊列在進行操作就會產生死鎖...由於之前的生產者的操作使得隊列出了問題並沒有釋放鎖, 此時就會造成死鎖
    // 這是從預防死鎖的角度來解決死鎖問題
    
    public class BlockQueue<T>
    {
        private Queue<T> _inner_queue = null;

        private ManualResetEvent _dequeue_wait = null;

        public int Count
        {
            get { return _inner_queue.Count; }
        }

        public BlockQueue(int capacity = -1)
        {
            this._inner_queue = capacity == -1 ? new Queue<T>() : new Queue<T>(capacity);
            this._dequeue_wait = new ManualResetEvent(false);
        }

        // 入隊加鎖
        public void EnQueue(T item)
        {
            if (this._IsShutdown == true) throw new InvalidOperationException("服務未開啟.[EnQueue]");

            lock (this._inner_queue)
            {
                this._inner_queue.Enqueue(item);
                this._dequeue_wait.Set();
            }
        }

        // 出隊加鎖
        public T DeQueue(int waitTime)
        {
            bool _queueEmpty = false;
            T item = default(T);
            while (true)
            {
                lock (this._inner_queue)
                {
                    // 判斷隊列中是否有元素....
                    if (this._inner_queue.Count > 0)
                    {
                        item = this._inner_queue.Dequeue();
                        this._dequeue_wait.Reset();
                        //break;
                    }
                    else
                    {
                        if (this._IsShutdown == true)
                        {
                            throw new InvalidOperationException("服務未開啟[DeQueue].");
                        }
                        else
                        {
                            _queueEmpty = true;
                        }
                    }
                }
                if (item != null)
                {
                    return item;
                }
                if (_queueEmpty)
                {
                    this._dequeue_wait.WaitOne(waitTime);
                }
            }

        }

        private bool _IsShutdown = false;

        public void Shutdown()
        {
            this._IsShutdown = true;
            this._dequeue_wait.Set();
        }

        public void Clear()
        {
            this._inner_queue.Clear();
        }
    }
}

那麼.net中有沒有封裝好的阻塞隊列?有啊!BlockingCollection<>類,其實我之前寫的好些關於線程的文章都說到了這個類庫,用到的地方也多。該類預設的容器是ConcurrentQueue,因此,同步就做好了,而且該類還實現了阻塞的功能:
多個線程或任務可同時向集合添加項,如果集合達到其指定最大容量,則製造線程將發生阻塞,直到移除集合中的某個項。 多個使用者可以同時移除項,如果集合變空,則使用線程將發生阻塞,直到製造者添加某個項。 製造線程可調用 CompleteAdding 來指示不再添加項。 使用者將監視 IsCompleted 屬性以瞭解集合何時為空且不再添加項。
原文說明 BlockingCollection 概述 (https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview )
感慨一句,微軟的好東西是真多,為什麼不能像java那樣輕易地被人發現使用呢?
沒錯,我們使用這個類就可以輕易地實現阻塞隊列了,而且是完美的實現.

更多的關於多線程與併發的博客,請移步我的博客


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡單寫一個 linux查看文件夾占用空間大小--du命令的一種用法 ...
  • yum(Yellowdog Update Modifier)rpm的前端程式,可解決軟體包相關依賴性 在搭建前先開啟掛載,並設置為開機啟動 將光碟掛載到/misc/cd yum的配置文件 在/etc/yum.repos.d目錄下把系統自帶的配置文件刪除或備份 vim新建base.repo配置文件並編 ...
  • 在整型陣列中,我們需要從中獲取陣列元素的最大值和最小值: 方法一:先是使用Array進行排序,然後從排序後數組中,最一個元素為最小,最後一個元素為最大。 public static int FindMaxNumber(params int[] stringValue) { Array.Sort(st ...
  • 有一個函數,從來沒有被其他任何類用到。將這個函數設為private。 ...
  • 資料庫中有一個City表 初始時數據: 實體類與Fluent Api配置映射 public class City { public int Id { get; set; } public string Name { get; set; } public int? ParentId { get; se ...
  • 獲取任意 VS 和 SQLServer 的 磁碟安裝目錄。 背景需求:如果磁碟電腦安裝了 VS 或者 SQLServer 則 認定這台電腦 的使用者 是一名 軟體研發人員,則讓程式 以最高許可權運行。 代碼如下:(基於註冊表讀取、exe版權信息校驗) static void Main(string[ ...
  • 安裝方式:使用vs自帶的nuget管理工具,搜索AutoMapper ,選擇第一個安裝到你的項目即可。 我從網上找了一些資料, 參考網址:http://blog.csdn.net/csethcrm/article/details/52934325 下載了個demo,然後自己又寫了一遍,我把AutoM ...
  • S#語言演變到如今,其實已經形成比較強的個性了,也有很多利弊。語言的個性是由其語法確定的,S#的基因就是公式系統,其設計、表達和核心庫在各種語言比較中絕對算的上是一流的,已支持近3800個函數或屬性或方法調用。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...