LinkedBlockingQueue詳解

来源:https://www.cnblogs.com/chafry/archive/2022/10/11/16782730.html
-Advertisement-
Play Games

LinkedBlockingQueue介紹 【1】LinkedBlockingQueue是一個基於鏈表實現的阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,由於這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界隊列,代表它幾乎沒有界限,隊列可以隨著 ...


LinkedBlockingQueue介紹

  【1】LinkedBlockingQueue是一個基於鏈表實現的阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,由於這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界隊列,代表它幾乎沒有界限,隊列可以隨著元素的添加而動態增長,但是如果沒有剩餘記憶體,則隊列將拋出OOM錯誤。所以為了避免隊列過大造成機器負載或者記憶體爆滿的情況出現,我們在使用的時候建議手動傳一個隊列的大小

  【2】LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。LinkedBlockingQueue採用兩把鎖的鎖分離技術實現入隊出隊互不阻塞,添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以並行執行。

 

LinkedBlockingQueue使用

//指定隊列的大小創建有界隊列
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
//無界隊列
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();

 

LinkedBlockingQueue的源碼分析

  【1】屬性值

// 容量,指定容量就是有界隊列
private final int capacity;
// 元素數量,用原子操作類的原因在於有兩個線程都會操作需要保證可見性
private final AtomicInteger count = new AtomicInteger();
// 鏈表頭  本身是不存儲任何元素的,初始化時item指向null
transient Node<E> head;
// 鏈表尾
private transient Node<E> last;
// take鎖   鎖分離,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty條件
// 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
private final Condition notEmpty = takeLock.newCondition();
// put鎖
private final ReentrantLock putLock = new ReentrantLock();
// notFull條件
// 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
private final Condition notFull = putLock.newCondition();

//典型的單鏈表結構
static class Node<E> {
    E item;  //存儲元素
    Node<E> next;  //後繼節點    單鏈表結構
    Node(E x) { item = x; }
}

 

  【2】構造函數

public LinkedBlockingQueue() {
    // 如果沒傳容量,就使用最大int值初始化其容量
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化head和last指針為空值節點
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 為保證可見性而加的鎖
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

 

  【3】核心方法分析

    1)入隊put方法

public void put(E e) throws InterruptedException {    
    // 不允許null元素
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 新建一個節點
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 使用put鎖加鎖
    putLock.lockInterruptibly();
    try {
        // 如果隊列滿了,就阻塞在notFull上等待被其它線程喚醒(阻塞生產者線程)
        while (count.get() == capacity) {
            notFull.await();
        }  
        // 隊列不滿,就入隊
        enqueue(node);
        c = count.getAndIncrement();// 隊列長度加1,返回原值
        // 如果現隊列長度小於容量,notFull條件隊列轉同步隊列,準備喚醒一個阻塞在notFull條件上的線程(可以繼續入隊) 
        // 這裡為啥要喚醒一下呢?因為存在情況是,沒人獲取時,隊列滿了而且還不斷有人塞數據,此時會一大批線程被阻塞,現在有空餘位置了,應該被喚醒
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock(); // 真正喚醒生產者線程
    }  
    // 如果原隊列長度為0,現在加了一個元素後立即喚醒阻塞在notEmpty上的線程
    if (c == 0)
        signalNotEmpty();
}
private void enqueue(Node<E> node) { 
    // 直接加到last後面,last指向入隊元素
    last = last.next = node;
}    
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock; 
    takeLock.lock();// 加take鎖
    try {  
        notEmpty.signal();// notEmpty條件隊列轉同步隊列,準備喚醒阻塞在notEmpty上的線程
    } finally {
        takeLock.unlock();  // 真正喚醒消費者線程
    }
}

 

    2)出隊take方法

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 使用takeLock加鎖
    takeLock.lockInterruptibly();
    try {
        // 如果隊列無元素,則阻塞在notEmpty條件上(消費者線程阻塞)
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 否則,出隊
        x = dequeue();
        c = count.getAndDecrement();//長度-1,返回原值
        if (c > 1)// 如果取之前隊列長度大於1,notEmpty條件隊列轉同步隊列,準備喚醒阻塞在notEmpty上的線程,原因與入隊同理
            notEmpty.signal();
    } finally {
        takeLock.unlock(); // 真正喚醒消費者線程
    }
    // 為什麼隊列是滿的才喚醒阻塞在notFull上的線程呢?
    // 因為喚醒是需要加putLock的,這是為了減少鎖的次數,所以,這裡索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程,
    // 這也是鎖分離帶來的代價
    // 如果取之前隊列長度等於容量(已滿),則喚醒阻塞在notFull的線程
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
     // head節點本身是不存儲任何元素的
    // 這裡把head刪除,並把head下一個節點作為新的值
    // 並把其值置空,返回原來的值
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // 方便GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();// notFull條件隊列轉同步隊列,準備喚醒阻塞在notFull上的線程
    } finally {
        putLock.unlock(); // 解鎖,這才會真正的喚醒生產者線程
    }
}

 

LinkedBlockingQueue總結

  【1】無界阻塞隊列,可以指定容量,預設為 Integer.MAX_VALUE,先進先出,存取互不幹擾

  【2】數據結構:鏈表(可以指定容量,預設為 Integer.MAX_VALUE,內部類Node存儲元素)

  【3】鎖分離:存取互不幹擾,存取操作的是不同的Node對象(takeLock【取Node節點保證前驅後繼不亂】,putLock【存Node節點保證前驅後繼不亂】,刪除時則兩個鎖一起加)【這是最大的亮點

  【4】阻塞對象(notEmpty【出隊:隊列count=0,無元素可取時,阻塞在該對象上】,notFull【入隊:隊列count=capacity,放不進元素時,阻塞在該對象上】)

  【5】入隊,從隊尾入隊,由last指針記錄。

  【6】出隊,從隊首出隊,由head指針記錄。

  【7】線程池中採用LinkedBlockingQueue而不採用ArrayBlockingQueue的原因便是因為鎖分離帶來了性能的提升,大大提高隊列的吞吐量


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 下午我被叫去參加“合作服務商資金安全解決方案”項目的codereview。對程式實現邏輯上存疑。簡單聽他們瞭解了一下需求邏輯。然後,果然發現邏輯有疏漏。為了表達清楚我的意思,上草圖。邊“畫”邊講。然後,大家點頭默許:還是戰哥想的全面! 在我們日常開發討論過程中,總會出現這樣的情況,你在講的頭頭是道, ...
  • 決策引擎是風控的大腦,而決策樹的編排能力和體驗是構建大腦的手段,如何構建高效、絲滑、穩定可靠的決策樹編排能力,是對風控決策引擎的一大挑戰,本篇文章和大家分享一下過往構建心得。 ...
  • 我們都知道Spring中IOC是使用的工廠模式,但是對於實現細節就一知半解了,今天這篇文章就帶大家解讀Spring中是如何使用工廠模式的。 ...
  • 組合設計模式(Composite Design Pattern)其應用場景非常特殊,主要用於處理樹形結構數據,它可以讓葉子對象和容器對象的使用具有一致性。 ...
  • Optional 類是一個可以為null的容器對象。如果值存在則isPresent()方法會返回true,調用get()方法會返回該對象。 ...
  • 什麼是阻塞隊列 【1】阻塞隊列:從定義上來說是隊列的一種,那麼肯定是一個先進先出(FIFO)的數據結構。與普通隊列不同的是,它支持兩個附加操作,即阻塞添加和阻塞刪除方法。 【2】阻塞添加:當阻塞隊列是滿時,往隊列里添加元素的操作將被阻塞。 【3】阻塞移除:當阻塞隊列是空時,從隊列中獲取元素/刪除元素 ...
  • LinkedBlockingDeque介紹 【1】LinkedBlockingDeque是一個基於鏈表實現的雙向阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,可以看做無界隊列,但也可以設置容量限制,作為有界隊列。 【2】相比於其他阻塞隊列,LinkedBlockingD ...
  • 羅馬數字包含以下七種字元: I, V, X, L,C,D 和 M。 字元 數值 I 1 V 5 X 10 L 50 C 100 D 500 M 1000 例如, 羅馬數字 2 寫做 II ,即為兩個併列的 1 。12 寫做 XII ,即為 X + II 。 27 寫做 XXVII, 即為 XX + ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...