ConcurrentLinkedQueue詳解(圖文並茂)

来源:https://www.cnblogs.com/seven97-top/p/18425757
-Advertisement-
Play Games

前言 ConcurrentLinkedQueue是基於鏈接節點的無界線程安全隊列。此隊列按照FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中存在時間最長的元素,而隊列的尾部則是最近添加的元素。新的元素總是被插入到隊列的尾部,而隊列的獲取操作(例如poll或peek)則是從隊列頭部開始。 與 ...


前言

ConcurrentLinkedQueue是基於鏈接節點的無界線程安全隊列。此隊列按照FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中存在時間最長的元素,而隊列的尾部則是最近添加的元素。新的元素總是被插入到隊列的尾部,而隊列的獲取操作(例如pollpeek)則是從隊列頭部開始。

與傳統的LinkedList不同,ConcurrentLinkedQueue使用了一種高效的非阻塞演算法,被稱為無鎖編程(Lock-Free programming),它通過原子變數和CAS(Compare-And-Swap)操作來保證線程安全,而不是通過傳統的鎖機制。這使得它在高併發場景下具有出色的性能表現。

可以看做一個線程安全的LinkedList,是一個線程安全的無界隊列,但LinkedList是一個雙向鏈表,而ConcurrentLinkedQueue是單向鏈表。

ConcurrentLinkedQueue線程安全在於設置head、tail以及next指針時都用的cas操作,而且node里的item和next變數都是用volatile修飾,保證了多線程下變數的可見性。而ConcurrentLinkedQueue的所有讀操作都是無鎖的,所以可能讀會存在不一致性。

應用場景

如果對隊列加鎖的成本較高則適合使用無鎖的 ConcurrentLinkedQueue 來替代。適合在對性能要求相對較高,同時有多個線程對隊列進行讀寫的場景。

ConcurrentLinkedQueue通過無鎖來做到了更高的併發量,是個高性能的隊列,但是使用場景相對不如阻塞隊列常見,畢竟取數據也要不停的去迴圈,不如阻塞的設計,但是在併發量特別大的情況下,是個不錯的選擇,性能上好很多,而且這個隊列的設計也是特別費力,尤其的使用的改良演算法和對哨兵的處理。

主要方法

ConcurrentLinkedQueue提供了豐富的方法來操作隊列,包括:

  • offer(E e):將指定的元素插入此隊列的尾部。
  • add(E e):將指定的元素插入此隊列的尾部(與offer方法功能相同,但在失敗時拋出異常)。
  • poll():獲取並移除此隊列的頭部,如果此隊列為空,則返回null
  • peek():獲取但不移除此隊列的頭部,如果此隊列為空,則返回null
  • size():返回此隊列中的元素數量。需要註意的是,由於併發的原因,這個方法返回的結果可能並不准確。如果需要在併發環境下獲取準確的元素數量,建議使用java.util.concurrent.atomic包中的原子變數進行計數。
  • isEmpty():檢查此隊列是否為空。與size()方法類似,由於併發的原因,這個方法返回的結果也可能不准確。

需要註意的是,在併發環境下使用size()isEmpty()方法時需要特別小心,因為它們的結果可能並不准確。如果需要精確的元素數量或空隊列檢測,建議使用額外的同步機制或原子變數來實現。

底層源碼

類的內部類

private static class Node<E> {
    // 元素
    volatile E item;
    // next域
    volatile Node<E> next;

    /**
    * Constructs a new node.  Uses relaxed write because item can
    * only be seen after publication via casNext.
    */
    // 構造函數
    Node(E item) {
        // 設置item的值
        UNSAFE.putObject(this, itemOffset, item);
    }
    // 比較並替換item值
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    
    void lazySetNext(Node<E> val) {
        // 設置next域的值,並不會保證修改對其他線程立即可見
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    // 比較並替換next域的值
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    // 反射機制
    private static final sun.misc.Unsafe UNSAFE;
    // item域的偏移量
    private static final long itemOffset;
    // next域的偏移量
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

說明: Node類表示鏈表結點,用於存放元素,包含item域和next域,item域表示元素,next域表示下一個結點,其利用反射機制和CAS機制來更新item域和next域,保證原子性。

類的屬性

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    // 版本序列號        
    private static final long serialVersionUID = 196745693267521676L;
    // 反射機制
    private static final sun.misc.Unsafe UNSAFE;
    // head域的偏移量
    private static final long headOffset;
    // tail域的偏移量
    private static final long tailOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 頭節點
    private transient volatile Node<E> head;
    // 尾結點
    private transient volatile Node<E> tail;
}

說明: 屬性中包含了head域和tail域,表示鏈表的頭節點和尾結點,同時,ConcurrentLinkedQueue也使用了反射機制和CAS機制來更新頭節點和尾結點,保證原子性。

類的構造函數

  • ConcurrentLinkedQueue()
public ConcurrentLinkedQueue() {
    // 初始化頭節點與尾結點
    head = tail = new Node<E>(null);
}

說明: 該構造函數用於創建一個最初為空的 ConcurrentLinkedQueue,頭節點與尾結點指向同一個結點,該結點的item域為null,next域也為null。

  • ConcurrentLinkedQueue(Collection<? extends E>)
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) { // 遍歷c集合
        // 保證元素不為空
        checkNotNull(e);
        // 新生一個結點
        Node<E> newNode = new Node<E>(e);
        if (h == null) // 頭節點為null
            // 賦值頭節點與尾結點
            h = t = newNode;
        else {
            // 直接頭節點的next域
            t.lazySetNext(newNode);
            // 重新賦值頭節點
            t = newNode;
        }
    }
    if (h == null) // 頭節點為null
        // 新生頭節點與尾結點
        h = t = new Node<E>(null);
    // 賦值頭節點
    head = h;
    // 賦值尾結點
    tail = t;
}

說明: 該構造函數用於創建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。

核心函數分析

offer函數

public boolean offer(E e) {
    // 檢查e是不是null,是的話拋出NullPointerException異常。
    checkNotNull(e);
    // 創建新的節點
    final Node<E> newNode = new Node<E>(e);

    // 將“新的節點”添加到鏈表的末尾。
    for (Node<E> t = tail, p = t;;) {//這個for迴圈是個死迴圈,增加了兩個指針p,t。
        Node<E> q = p.next;
        // 情況1:q為空,p就是尾節點,新節點插入
        if (q == null) {
            // CAS操作:如果“p的下一個節點為null”(即p為尾節點),則設置p的下一個節點為newNode。
            // 如果該CAS操作成功的話,則比較“p和t”(若p不等於t,則設置newNode為新的尾節點),然後返回true。
            // 如果該CAS操作失敗,這意味著“其它線程對尾節點進行了修改”,則重新迴圈。
            if (p.casNext(null, newNode)) {
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        // 情況2:p和q相等
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 情況3:其它\\
        //這裡是移動p指針,意思就是此時如果p不是最後一個元素則把p指針指向tail,否則指向q,也就是指向p.next元素。
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

說明: offer函數用於將指定元素插入此隊列的尾部。下麵模擬offer函數的操作,隊列狀態的變化(假設單線程添加元素,連續添加10、20兩個元素)。

若ConcurrentLinkedQueue的初始狀態如上圖所示,即隊列為空。單線程添加元素,此時,添加元素10,則狀態如下所示

如上圖所示,添加元素10後,tail沒有變化,還是指向之前的結點,繼續添加元素20,則狀態如下所示

如上圖所示,添加元素20後,tail指向了最新添加的結點。

poll函數

public E poll() {
    restartFromHead:
    for (;;) { // 無限迴圈
        for (Node<E> h = head, p = h, q;;) { // 保存頭節點
            // item項
            E item = p.item;

            if (item != null && p.casItem(item, null)) { // item不為null並且比較並替換item成功
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // p不等於h    // hop two nodes at a time
                    // 更新頭節點
                    updateHead(h, ((q = p.next) != null) ? q : p); 
                // 返回item
                return item;
            }
            else if ((q = p.next) == null) { // q結點為null
                // 更新頭節點
                updateHead(h, p);
                return null;
            }
            else if (p == q) // p等於q
                // 繼續迴圈
                continue restartFromHead;
            else
                // p賦值為q
                p = q;
        }
    }
}

說明: 此函數用於獲取並移除此隊列的頭,如果此隊列為空,則返回null。下麵模擬poll函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20後的狀態,poll兩次)。

隊列初始狀態如上圖所示,在poll操作後,隊列的狀態如下圖所示

如上圖可知,poll操作後,head改變了,並且head所指向的結點的item變為了null。再進行一次poll操作,隊列的狀態如下圖所示。

如上圖可知,poll操作後,head結點沒有變化,只是指示的結點的item域變成了null。

remove函數

public boolean remove(Object o) {
    // 元素為null,返回
    if (o == null) return false;
    Node<E> pred = null;
    for (Node<E> p = first(); p != null; p = succ(p)) { // 獲取第一個存活的結點
        // 第一個存活結點的item值
        E item = p.item;
        if (item != null &&
            o.equals(item) &&
            p.casItem(item, null)) { // 找到item相等的結點,並且將該結點的item設置為null
            // p的後繼結點
            Node<E> next = succ(p);
            if (pred != null && next != null) // pred不為null並且next不為null
                // 比較並替換next域
                pred.casNext(p, next);
            return true;
        }
        // pred賦值為p
        pred = p;
    }
    return false;
}

說明: 此函數用於從隊列中移除指定元素的單個實例(如果存在)。其中,會調用到first函數和succ函數,first函數的源碼如下

Node<E> first() {
    restartFromHead:
    for (;;) { // 無限迴圈,確保成功
        for (Node<E> h = head, p = h, q;;) {
            // p結點的item域是否為null
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) { // item不為null或者next域為null
                // 更新頭節點
                updateHead(h, p);
                // 返回結點
                return hasItem ? p : null;
            }
            else if (p == q) // p等於q
                // 繼續從頭節點開始
                continue restartFromHead;
            else
                // p賦值為q
                p = q;
        }
    }
}

說明: first函數用於找到鏈表中第一個存活的結點。succ函數源碼如下

final Node<E> succ(Node<E> p) {
    // p結點的next域
    Node<E> next = p.next;
    // 如果next域為自身,則返回頭節點,否則,返回next
    return (p == next) ? head : next;
}

說明: succ用於獲取結點的下一個結點。如果結點的next域指向自身,則返回head頭節點,否則,返回next結點。下麵模擬remove函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20後的狀態,執行remove(10)、remove(20)操作)。

如上圖所示,為ConcurrentLinkedQueue的初始狀態,remove(10)後的狀態如下圖所示

如上圖所示,當執行remove(10)後,head指向了head結點之前指向的結點的下一個結點,並且head結點的item域置為null。繼續執行remove(20),狀態如下圖所示

如上圖所示,執行remove(20)後,head與tail指向同一個結點,item域為null。

size函數

public int size() {
    // 計數
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個存活的結點開始往後遍歷
        if (p.item != null) // 結點的item域不為null
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE) // 增加計數,若達到最大值,則跳出迴圈
                break;
    // 返回大小
    return count;
}

說明: 此函數用於返回ConcurrenLinkedQueue的大小,從第一個存活的結點(first)開始,往後遍歷鏈表,當結點的item域不為null時,增加計數,之後返回大小。

HOPS(延遲更新的策略)的設計

通過上面對offer和poll方法的分析,我們發現tail和head是延遲更新的,兩者更新觸發時機為:

  • tail更新觸發時機:當tail指向的節點的下一個節點不為null的時候,會執行定位隊列真正的隊尾節點的操作,找到隊尾節點後完成插入之後才會通過casTail進行tail更新;當tail指向的節點的下一個節點為null的時候,只插入節點不更新tail。

  • head更新觸發時機:當head指向的節點的item域為null的時候,會執行定位隊列真正的隊頭節點的操作,找到隊頭節點後完成刪除之後才會通過updateHead進行head更新;當head指向的節點的item域不為null的時候,只刪除節點不更新head。

從上面更新時的狀態圖可以看出,head和tail的更新是“跳著的”即中間總是間隔了一個。那麼這樣設計的意圖是什麼呢?

如果讓tail永遠作為隊列的隊尾節點,實現的代碼量會更少,而且邏輯更易懂。但是,這樣做有一個缺點,如果大量的入隊操作,每次都要執行CAS進行tail的更新,彙總起來對性能也會是大大的損耗。如果能減少CAS更新的操作,無疑可以大大提升入隊的操作效率,所以doug lea大師每間隔1次(tail和隊尾節點的距離為1)進行才利用CAS更新tail。對head的更新也是同樣的道理,雖然,這樣設計會多出在迴圈中定位隊尾節點,但總體來說讀的操作效率要遠遠高於寫的性能,因此,多出來的在迴圈中定位尾節點的操作的性能損耗相對而言是很小的。

關於作者

來自一線程式員Seven的探索與實踐,持續學習迭代中~

本文已收錄於我的個人博客:https://www.seven97.top

公眾號:seven97,歡迎關註~

本文來自線上網站:seven的菜鳥成長之路,作者:seven,轉載請註明原文鏈接:www.seven97.top


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 正文 跟靈分別後,我在椅子上發了很久的呆。可能有五分鐘吧。就這樣感受著微風吹過身體,聽著身邊熙熙攘攘的聲音。 我不知道為什麼要發呆。自然而然就這樣做了。 過了很久,我意識到可能必須要吃飯了,才起身找飯館。 今天情緒不太好。我不知道是不是因為沒睡夠。不過,並沒有從一開始就很糟糕,到晚上才開始。 先是發 ...
  • UGit 是一款為騰訊內部研發環境特點而定製的 Git 客戶端,旨在提高開發效率和團隊協作的流暢性。UGit 支持 macOS 10.11+、Apple Silicon 和 Win64 位系統。 官方地址:https://ugit.qq.com/zh/index.html 主要特性 Ugit 提供便 ...
  • 類型轉換 類型轉換是將一個值從一種類型更改為另一種類型的過程。例如, 可以將String類型的數據“457”轉換為數值型,也可以將任意類型的 數據轉換為String類型。 如果從低精度數據類型向高精度數據類型轉換,則永遠不會溢出, 並且總是成功的;而把高精度數據類型向低精度數據類型轉換時,則會 有信 ...
  • 本文梳理總結了一些 Java 互聯網項目中常見的 Redis 緩存應用場景,例如常見的 String 類型 Key-Value、對時效性要求高的場景、Hash 結構的場景以及對實時性要求高的場景等,全面涵蓋了 Redis 中所有的 5 種基本類型。 ...
  • 在 Python 中,Zope 提供了一種機制來定義和實現介面。Zope 的介面模塊通常用於創建可重用的組件,並確保組件遵循特定的介面規範。 ...
  • Delete、Drop 和 Truncate delete、truncate 僅僅刪除表裡面的數據,drop會把表的結構也刪除 delete 是 DML 語句,操作完成後,可以回滾,truncate 和 drop 是 DDL 語句,刪除之後立即生效,不能回滾 執行效率:drop > truncate ...
  • python基礎 軟體下載 1.python下載安裝 點擊此鏈接進入官網windows下載地址 點擊箭頭處鏈接下載最新版本,進入頁面後下拉 根據你的機器下載對應版本,一般人使用的是X86架構windos系統,下載箭頭所指即可 若是不知道CPU架構,可見查看cpu架構,x86還是arm 下載後根據指引 ...
  • 實踐環境 Python 3.9.13 paho-mqtt 2.1.0 簡介 Eclipse Paho MQTT Python客戶端類庫實現了MQTT 協議版本 5.0, 3.1.1, 和3.1。 該類庫提供一個客戶端類,允許應用連接到MQTT代理併發布消息,訂閱主題並檢索發佈的消息。同時還提供了一個 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...