阻塞隊列的原理及應用

来源:https://www.cnblogs.com/star95/archive/2023/09/06/17681669.html
-Advertisement-
Play Games

阻塞隊列是一種常用的併發編程工具,它能夠在多線程環境下提供一種安全而高效的數據傳輸機制。本文將介紹阻塞隊列的原理和使用場景,並通過實例演示其在多線程編程中的應用。 # 一、什麼是阻塞隊列 阻塞隊列是一種特殊的隊列,它具有以下幾個特點: 1. 阻塞特性:當隊列為空時,從隊列中獲取元素的操作將會被阻塞, ...


阻塞隊列是一種常用的併發編程工具,它能夠在多線程環境下提供一種安全而高效的數據傳輸機制。本文將介紹阻塞隊列的原理和使用場景,並通過實例演示其在多線程編程中的應用。

一、什麼是阻塞隊列

阻塞隊列是一種特殊的隊列,它具有以下幾個特點:

  1. 阻塞特性:當隊列為空時,從隊列中獲取元素的操作將會被阻塞,直到隊列中有新的元素被添加;當隊列已滿時,向隊列中添加元素的操作將會被阻塞,直到隊列中有空的位置,這就是等待喚醒機制。
  2. 線程安全:阻塞隊列內部通過鎖或其他同步機制來保證多線程環境下的數據一致性。
  3. 有界性:阻塞隊列可以設置容量上限,當隊列滿時,後續的元素將無法添加。
  4. 公平性:阻塞隊列可以選擇公平或非公平的策略來決定線程的獲取順序。公平隊列會按照線程的請求順序進行處理(線程按先來後到順序排隊獲取元素),而非公平隊列則允許新的線程插隊執行(線程競爭)。比如:SynchronousQueue。

阻塞隊列常用於解決生產者-消費者問題,它能夠有效地銜接生產者和消費者之間的速度差異,提供一種協調和安全的數據交互方式。
阻塞隊列底層一般採用數組和鏈表這兩種數據結構存儲元素,ArrayBlockingQueue和PriorityBlockingQueue底層都是採用數組存儲的,但是ArrayBlockingQueue是必須指定數組大小,不能擴容,而PriorityBlockingQueue可以進行動態擴容(擴容的最大長度也是Integer.MAX_VALUE),LinkedBlockingQueue底層是鏈表結構存儲,雖然是鏈表,但是也有長度限制,預設是Integer.MAX_VALUE,一般認為的無界阻塞隊列,其實最大的隊列長度也就是Integer.MAX_VALUE。

二、阻塞隊列的核心方法

  • 添加
方法 描述 是否阻塞
add方法 往隊列尾部添加元素,內部是調用offer方法
put方法 往隊列尾部添加元素,如果隊列已滿,則阻塞等待
offer方法 往隊列尾部添加元素,如果隊列已滿,則返回false,不會阻塞
  • 獲取
方法 描述 是否阻塞
take方法 take方法:移除並返回隊列頭部的元素,如果隊列為空,則阻塞等待
poll方法 移除並返回隊列頭部的元素,如果隊列為空,則返回null,不會阻塞
peek方法 返回隊列頭部的元素(不移除),如果隊列為空,則返回null,不會阻塞

三、常見的阻塞隊列實現


通過圖中可以看到,BlockingQueue集成了Queue介面的功能,有多種子類實現,常用的如下:

  1. ArrayBlockingQueue:基於數組實現的有界阻塞隊列,它的容量在創建時指定,並且不能動態擴展。
  2. LinkedBlockingQueue:基於鏈表實現的有界阻塞隊列,鏈表的長度可以通過構造函數顯式指定,如果使用預設的構造函數,則預設大小是Integer.MAX_VALUE。
  3. PriorityBlockingQueue:基於優先順序堆排序實現的阻塞隊列(可擴容),元素按照優先順序順序進行排序。
  4. SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個相應的刪除操作,反之亦然。

四、阻塞隊列的原理

常用的阻塞隊列,比如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue底層都是採用ReentrantLock鎖來實現線程的互斥,而ReentrantLock底層採用了AQS框架實現線程隊列的同步,線程的阻塞是調用LockSupport.park實現,喚醒是調用LockSupport.unpark實現,具體可以看我之前的文章,SynchronousQueue底層雖然沒有用AQS框架,但也用的是LockSupport實現線程的阻塞與喚醒。
一文讀懂LockSupport
AQS源碼分析
阻塞隊列的原理可以通過兩個關鍵組件來解釋:鎖和條件變數。

阻塞隊列使用鎖來保護共用資源,控制線程的互斥訪問。在隊列為空或已滿時,線程需要等待相應的條件滿足才能繼續執行。

  • 條件變數

條件變數是鎖的一個補充,在某些特定的條件下,線程會進入等待狀態。當條件滿足時,其他線程會通過調用條件變數的喚醒方法(比如signal()或signalAll())來通知等待的線程進行下一步操作。
當一個線程試圖從空的阻塞隊列中獲取元素時,它會獲取隊列的鎖,並檢查隊列是否為空。如果為空,這個線程將進入等待狀態,直到其他線程向隊列中插入元素並通過條件變數喚醒它。當一個線程試圖向已滿的阻塞隊列插入元素時,它會獲取隊列的鎖,並檢查隊列是否已滿。如果已滿,這個線程將進入等待狀態,直到其他線程從隊列中獲取元素並通過條件變數喚醒它。
接下來我們看下阻塞隊列的獲取元素和插入元素的核心代碼:
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的帶阻塞的插入和獲取方法都是基於ReentrantLock鎖+條件變數的等待和通知來實現的。
主要看看ArrayBlockingQueue帶阻塞的插入和獲取元素的主要方法吧。

/**
 * 插入元素,帶阻塞
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    // 這裡使用的是ReentrantLock鎖
    final ReentrantLock lock = this.lock;
    // 獲取鎖並支持響應中斷,註意:獲取鎖的過程中不響應中斷,是在獲取到鎖後根據當前線程的中斷標識來處理。
    lock.lockInterruptibly();
    try {
        // 元素大小等於數組長度時阻塞,說明放滿了,生產者需要暫停,阻塞在條件變數上,等待被喚醒
        while (count == items.length)
            notFull.await();
        // 放入元素到數組指定的下標處
        enqueue(e);
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

/**
 * 插入元素,喚醒等待獲取元素的線程
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	// 放入元素後,通知消費線程繼續獲取元素
    notEmpty.signal();
}

/**
 * 獲取元素,帶阻塞
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 數組無元素時阻塞,阻塞在條件變數上,等待被喚醒
        // 元素大小等於0時阻塞,說明數組被取空了,消費者需要暫停,阻塞在條件變數上,等待被喚醒
        while (count == 0)
            notEmpty.await();
        // 移除元素並返回
        return dequeue();
    } finally {
        lock.unlock();
    }
}



/**
 * 移除元素並返回
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // 數組時迴圈使用的,取元素的index到達數組長度時,下次需要從第0個位置
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 移除元素後,通知消費者線程可以繼續放入元素
    notFull.signal();
    return x;
}

SynchronousQueue不存儲元素,插入和刪除是配套使用的,它的插入和刪除有公平和非公平之分,公平是通過內部類TransferQueue實現的,非公平是通過TransferStack實現的,具體可以看transfer方法,最終會調用LockSupport.park實現線程阻塞,LockSupport.unpark實現線程繼續執行,這個就不貼代碼了。

五、阻塞隊列的使用場景

  1. 生產者-消費者模型:阻塞隊列能夠很好地平衡生產者和消費者之間的速度差異,既能保護消費者不會消費到空數據,也能保護生產者不會造成隊列溢出,能夠有效地解耦生產者和消費者,提高系統的穩定性和吞吐量。
  2. 線程池:線上程池中,阻塞隊列可以作為任務緩衝區,將待執行的任務放入隊列中,由線程池中的工作線程按照一定的策略進行執行。
  3. 同步工具:阻塞隊列還可以作為一種同步工具,在多線程環境下實現線程之間的協作。
  4. 數據緩衝:阻塞隊列可以用作數據緩衝區,當生產者的速度大於消費者的速度時,數據可以先存儲在隊列中,等待消費者處理
  5. 事件驅動編程:阻塞隊列可以用於事件驅動的編程模型,當事件發生時,將事件對象放入隊列中,由消費者進行處理

六、阻塞隊列的使用

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 創建一個容量為10的ArrayBlockingQueue
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
//        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
//        BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10);
        // 創建生產者線程
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i <= 5; i++) {
                    // 將數據放入隊列
                    queue.put(i);
                    System.out.println(Thread.currentThread().getName() + "Produced: " + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 創建消費者線程
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i <= 5; i++) {
                    // 從隊列中取出數據
                    int num = queue.take();
                    System.out.println(Thread.currentThread().getName() + "Consumed: " + num);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 啟動生產者和消費者線程
        producerThread.start();
        consumerThread.start();
        // 等待線程執行完畢
        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

執行輸出:

Thread-0Produced: 0
Thread-1Consumed: 0
Thread-0Produced: 1
Thread-1Consumed: 1
Thread-0Produced: 2
Thread-0Produced: 3
Thread-1Consumed: 2
Thread-0Produced: 4
Thread-0Produced: 5
Thread-1Consumed: 3
Thread-1Consumed: 4
Thread-1Consumed: 5

阻塞隊列的使用比較簡單,這裡是個簡單的使用例子,可設置合適的隊列大小和生產者消費者休眠時間來調試阻塞等待和喚醒通知。使用阻塞隊列可解決多線程併發訪問數據安全問題,也能方便的實現線程間的協調工作。

總結

通過瞭解阻塞隊列的原理和使用場景,我們可以更好地應對多線程編程中的併發問題,提高代碼的可維護性和可擴展性。阻塞隊列作為一種常見的併發編程工具,能夠幫助我們實現高效的數據傳輸和線程協作,為我們的應用程式提供更好的性能和可靠性保障。希望本文能夠為讀者對阻塞隊列的理解和應用提供一些幫助。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ## Synchronized 本篇文章將圍繞synchronized關鍵字,使用大量圖片、案例深入淺出的描述CAS、synchronized Java層面和C++層面的實現、鎖升級的原理、源碼等 大概觀看時間17分鐘 可以帶著幾個問題去查看本文,如果認真看完,問題都會迎刃而解: 1、synchro ...
  • 小魚和魔法師繼續深入魔法森林。不久,他們來到了一個巨大的魔法石圈旁邊。石圈中心有一個閃閃發光的魔法水晶,周圍則是一些神秘的符號。但令人意外的是,水晶的旁邊還有一個巨大的石像怪,它的眼睛散髮著紅色的光芒,似乎正在守護著這片區域。 小魚好奇地問:“這是什麼地方?這些符號又是什麼意思?那個石像怪又是怎麼回 ...
  • 大家好,我是棧長。 昨天有粉絲反饋棧長《[Spring Boot 核心技術課](https://mp.weixin.qq.com/s/hJwHvmalVWJObyVkytfdsA)》中的一個問題: ![](https://img2023.cnblogs.com/other/1218593/20230 ...
  • 最近又是一輪代碼review , 發現了一些實現去重的代碼,在使用 list.contain ...... ![](https://img2023.cnblogs.com/other/1218593/202309/1218593-20230906151256318-1035375358.png) 我 ...
  • 在日常開發中,當需要給一個現有類添加附加職責,而又不能採用生成子類的方法進行擴充時。例如,該類被隱藏或者該類是終極類或者採用繼承方式會產生大量的子類。這時候,我們該怎麼辦呢?我們可以使用裝飾器器模式來解決這個問題,**本文將從以下四個方面講解裝飾器器模式**。 - 簡介 - 優缺點 - 應用場景 - ...
  • 掩碼是一串二進位代碼對目標欄位進行位與運算 # 掩碼 掩碼通常是一個用於屏蔽或隱藏某些位的值,以便在計算中只關註感興趣的位。掩碼通常是一個由二進位位組成的數,用於按位與操作,以清除或保留某些位的值。 常見的用途包括: 1. **位操作和位掩碼**:在電腦編程中,位掩碼用於執行位操作,如按位與(AN ...
  • > 關註TechLeadCloud,分享互聯網架構、雲服務技術的全維度知識。作者擁有10+年互聯網服務架構、AI產品研發經驗、團隊管理經驗,同濟本復旦碩,復旦機器人智能實驗室成員,阿裡雲認證的資深架構師,項目管理專業人士,上億營收AI產品研發負責人。 ![file](https://img2023. ...
  • [TOC](多線程) # 介紹: C++ 是一種支持多線程編程的編程語言,它提供了豐富的多線程支持來充分利用現代多核處理器的性能。 C++ 多線程編程通常使用標準庫中的 頭文件以及其他相關的標準庫組件來實現。 # 理論: 1. 常用的類: std::thread 類,用於創建和管理線程等等 std: ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...