ArrayBlockingQueue詳解

来源:https://www.cnblogs.com/chafry/archive/2022/10/11/16780051.html
-Advertisement-
Play Games

ArrayBlockingQueue介紹 ArrayBlockingQueue是最典型的有界阻塞隊列,其內部是用數組存儲元素的,初始化時需要指定容量大小,利用 ReentrantLock 實現線程安全。 在生產者-消費者模型中使用時,如果生產速度和消費速度基本匹配的情況下,使用ArrayBlocki ...


ArrayBlockingQueue介紹

  ArrayBlockingQueue是最典型的有界阻塞隊列,其內部是用數組存儲元素的,初始化時需要指定容量大小,利用 ReentrantLock 實現線程安全

  在生產者-消費者模型中使用時,如果生產速度和消費速度基本匹配的情況下,使用ArrayBlockingQueue是個不錯選擇;當如果生產速度遠遠大於消費速度,則會導致隊列填滿,大量生產線程被阻塞。

  使用獨占鎖ReentrantLock實現線程安全,入隊和出隊操作使用同一個鎖對象,也就是只能有一個線程可以進行入隊或者出隊操作;這也就意味著生產者和消費者無法並行操作,在高併發場景下會成為性能瓶頸。

 

ArrayBlockingQueue的源碼分析

  【1】屬性值

/** 隊列元素數組 */
final Object[] items;
/** 下一個被take,poll,peek,remove的元素位置 */
int takeIndex;
/** 插入位置包含put,offer,add */
int putIndex;
/** 隊列元素的數量 */
int count;
/** 重入鎖 */
final ReentrantLock lock;
/** 等待獲取的條件隊列 */
private final Condition notEmpty;
/** 等待插入的條件隊列 */
private final Condition notFull;
//迭代器的共用狀態
transient Itrs itrs = null;

 

  【2】構造函數

//預設採用非公平鎖
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}


public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
    //初始化阻塞隊列
    this(capacity, fair);
    //加鎖將數組元素填入阻塞隊列(主要是考慮到重排序和可見性問題,因為Object[] items 並沒有加上 volatile 屬性)
    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        //將插入位置下變更
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

 

  【3】核心方法分析

    1)入隊put方法

public void put(E e) throws InterruptedException {
    //檢查是否為空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //加鎖,如果線程中斷拋出異常 
    lock.lockInterruptibly();
    try {
       //阻塞隊列已滿,則將生產者掛起,等待消費者喚醒
       //設計註意點: 用while不用if是為了防止虛假喚醒
        while (count == items.length)
            notFull.await(); //隊列滿了,使用notFull等待(生產者阻塞)
        // 入隊
        enqueue(e);
    } finally {
        lock.unlock(); // 喚醒消費者線程
    }
}
    
private void enqueue(E x) {
    final Object[] items = this.items;
    //入隊   使用的putIndex
    items[putIndex] = x;
    if (++putIndex == items.length) 
        putIndex = 0;  //設計的精髓: 環形數組,putIndex指針到數組盡頭了,返回頭部
    count++;
    //notEmpty條件隊列轉同步隊列,準備喚醒消費者線程,因為入隊了一個元素,肯定不為空了
    notEmpty.signal();
}

 

    2)出隊take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加鎖,如果線程中斷拋出異常 
    lock.lockInterruptibly();
    try {
       //如果隊列為空,則消費者掛起
        while (count == 0)
            notEmpty.await();
        //出隊
        return dequeue();
    } finally {
        lock.unlock();// 喚醒生產者線程
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex]; //取出takeIndex位置的元素
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0; //設計的精髓: 環形數組,takeIndex 指針到數組盡頭了,返回頭部
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //notFull條件隊列轉同步隊列,準備喚醒生產者線程,此時隊列有空位
    notFull.signal();
    return x;
}

 

    3)其餘offer&poll&peek&remove方法

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

//本質區別在於設置了超時時間,超時後選擇不加入,返回false
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            //生產線程堵塞nanos時間,也有可能被喚醒,如果超過nanos時間還未被喚醒,則nanos=0,再次迴圈,就會返回false
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

//本質區別在於設置了超時時間,超時後選擇不獲取,返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //通過下標查找直接返回
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

 

ArrayBlockingQueue總結

  【1】有界阻塞隊列,先進先出,存取相互排斥

  【2】數據結構:靜態數組(容量固定須指定長度,沒有擴容機制,沒有元素的位置也占用空間,被null占位)

  【3】ReentrantLock鎖保證互斥性:存取都是同一把鎖,操作的是同一個數組對象,存取相互排斥

  【4】阻塞對象(notEmpty【出隊:隊列count=0,無元素可取時,阻塞在該對象上】,notFull【入隊:隊列count=length,放不進元素時,阻塞在該對象上】)

  【5】入隊,從隊首開始添加元素,記錄putIndex(到隊尾時設置為0),喚醒notEmpty

  【6】出隊,從隊首開始添加元素,記錄takeIndex(到隊尾時設置為0),喚醒notFull

  【7】兩個指針都是從隊首向隊尾移動,保證隊列的先進先出原則(亮點:利用指針和數組,形成環狀結構,重覆利用記憶體空間

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我們都知道Spring中IOC是使用的工廠模式,但是對於實現細節就一知半解了,今天這篇文章就帶大家解讀Spring中是如何使用工廠模式的。 ...
  • 組合設計模式(Composite Design Pattern)其應用場景非常特殊,主要用於處理樹形結構數據,它可以讓葉子對象和容器對象的使用具有一致性。 ...
  • Optional 類是一個可以為null的容器對象。如果值存在則isPresent()方法會返回true,調用get()方法會返回該對象。 ...
  • 什麼是阻塞隊列 【1】阻塞隊列:從定義上來說是隊列的一種,那麼肯定是一個先進先出(FIFO)的數據結構。與普通隊列不同的是,它支持兩個附加操作,即阻塞添加和阻塞刪除方法。 【2】阻塞添加:當阻塞隊列是滿時,往隊列里添加元素的操作將被阻塞。 【3】阻塞移除:當阻塞隊列是空時,從隊列中獲取元素/刪除元素 ...
  • LinkedBlockingDeque介紹 【1】LinkedBlockingDeque是一個基於鏈表實現的雙向阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,可以看做無界隊列,但也可以設置容量限制,作為有界隊列。 【2】相比於其他阻塞隊列,LinkedBlockingD ...
  • 羅馬數字包含以下七種字元: I, V, X, L,C,D 和 M。 字元 數值 I 1 V 5 X 10 L 50 C 100 D 500 M 1000 例如, 羅馬數字 2 寫做 II ,即為兩個併列的 1 。12 寫做 XII ,即為 X + II 。 27 寫做 XXVII, 即為 XX + ...
  • LinkedBlockingQueue介紹 【1】LinkedBlockingQueue是一個基於鏈表實現的阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,由於這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界隊列,代表它幾乎沒有界限,隊列可以隨著 ...
  • 演算法步驟 遍歷整個數組,找到最小(大)的元素,放到數組的起始位置。 再遍歷剩下的數組,找到剩下元素中的最小(大)元素,放到數組的第二個位置。 重覆以上步驟,直到排序完成。 一共需要遍曆數組元素個數-1次,當找到第二大(小)的元素時,可以停止。這時最後一個元素必是最大(小)元素。 代碼 import ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...