ConcurrentLinkedQueue詳解(圖文並茂)

来源:https://www.cnblogs.com/seven97-top/p/18425757
-Advertisement-
Play Games

前言 ConcurrentLinkedQueue是基於鏈接節點的無界線程安全隊列。此隊列按照FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中存在時間最長的元素,而隊列的尾部則是最近添加的元素。新的元素總是被插入到隊列的尾部,而隊列的獲取操作(例如poll或peek)則是從隊列頭部開始。 與 ...


前言

ConcurrentLinkedQueue是基於鏈接節點的無界線程安全隊列。此隊列按照FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中存在時間最長的元素,而隊列的尾部則是最近添加的元素。新的元素總是被插入到隊列的尾部,而隊列的獲取操作(例如pollpeek)則是從隊列頭部開始。

與傳統的LinkedList不同,ConcurrentLinkedQueue使用了一種高效的非阻塞演算法,被稱為無鎖編程(Lock-Free programming),它通過原子變數和CAS(Compare-And-Swap)操作來保證線程安全,而不是通過傳統的鎖機制。這使得它在高併發場景下具有出色的性能表現。

可以看做一個線程安全的LinkedList,是一個線程安全的無界隊列,但LinkedList是一個雙向鏈表,而ConcurrentLinkedQueue是單向鏈表。

ConcurrentLinkedQueue線程安全在於設置head、tail以及next指針時都用的cas操作,而且node里的item和next變數都是用volatile修飾,保證了多線程下變數的可見性。而ConcurrentLinkedQueue的所有讀操作都是無鎖的,所以可能讀會存在不一致性。

應用場景

如果對隊列加鎖的成本較高則適合使用無鎖的 ConcurrentLinkedQueue 來替代。適合在對性能要求相對較高,同時有多個線程對隊列進行讀寫的場景。

ConcurrentLinkedQueue通過無鎖來做到了更高的併發量,是個高性能的隊列,但是使用場景相對不如阻塞隊列常見,畢竟取數據也要不停的去迴圈,不如阻塞的設計,但是在併發量特別大的情況下,是個不錯的選擇,性能上好很多,而且這個隊列的設計也是特別費力,尤其的使用的改良演算法和對哨兵的處理。

主要方法

ConcurrentLinkedQueue提供了豐富的方法來操作隊列,包括:

  • offer(E e):將指定的元素插入此隊列的尾部。
  • add(E e):將指定的元素插入此隊列的尾部(與offer方法功能相同,但在失敗時拋出異常)。
  • poll():獲取並移除此隊列的頭部,如果此隊列為空,則返回null
  • peek():獲取但不移除此隊列的頭部,如果此隊列為空,則返回null
  • size():返回此隊列中的元素數量。需要註意的是,由於併發的原因,這個方法返回的結果可能並不准確。如果需要在併發環境下獲取準確的元素數量,建議使用java.util.concurrent.atomic包中的原子變數進行計數。
  • isEmpty():檢查此隊列是否為空。與size()方法類似,由於併發的原因,這個方法返回的結果也可能不准確。

需要註意的是,在併發環境下使用size()isEmpty()方法時需要特別小心,因為它們的結果可能並不准確。如果需要精確的元素數量或空隊列檢測,建議使用額外的同步機制或原子變數來實現。

底層源碼

類的內部類

private static class Node<E> {
    // 元素
    volatile E item;
    // next域
    volatile Node<E> next;

    /**
    * Constructs a new node.  Uses relaxed write because item can
    * only be seen after publication via casNext.
    */
    // 構造函數
    Node(E item) {
        // 設置item的值
        UNSAFE.putObject(this, itemOffset, item);
    }
    // 比較並替換item值
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    
    void lazySetNext(Node<E> val) {
        // 設置next域的值,並不會保證修改對其他線程立即可見
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    // 比較並替換next域的值
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    // 反射機制
    private static final sun.misc.Unsafe UNSAFE;
    // item域的偏移量
    private static final long itemOffset;
    // next域的偏移量
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

說明: Node類表示鏈表結點,用於存放元素,包含item域和next域,item域表示元素,next域表示下一個結點,其利用反射機制和CAS機制來更新item域和next域,保證原子性。

類的屬性

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    // 版本序列號        
    private static final long serialVersionUID = 196745693267521676L;
    // 反射機制
    private static final sun.misc.Unsafe UNSAFE;
    // head域的偏移量
    private static final long headOffset;
    // tail域的偏移量
    private static final long tailOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 頭節點
    private transient volatile Node<E> head;
    // 尾結點
    private transient volatile Node<E> tail;
}

說明: 屬性中包含了head域和tail域,表示鏈表的頭節點和尾結點,同時,ConcurrentLinkedQueue也使用了反射機制和CAS機制來更新頭節點和尾結點,保證原子性。

類的構造函數

  • ConcurrentLinkedQueue()
public ConcurrentLinkedQueue() {
    // 初始化頭節點與尾結點
    head = tail = new Node<E>(null);
}

說明: 該構造函數用於創建一個最初為空的 ConcurrentLinkedQueue,頭節點與尾結點指向同一個結點,該結點的item域為null,next域也為null。

  • ConcurrentLinkedQueue(Collection<? extends E>)
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) { // 遍歷c集合
        // 保證元素不為空
        checkNotNull(e);
        // 新生一個結點
        Node<E> newNode = new Node<E>(e);
        if (h == null) // 頭節點為null
            // 賦值頭節點與尾結點
            h = t = newNode;
        else {
            // 直接頭節點的next域
            t.lazySetNext(newNode);
            // 重新賦值頭節點
            t = newNode;
        }
    }
    if (h == null) // 頭節點為null
        // 新生頭節點與尾結點
        h = t = new Node<E>(null);
    // 賦值頭節點
    head = h;
    // 賦值尾結點
    tail = t;
}

說明: 該構造函數用於創建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。

核心函數分析

offer函數

public boolean offer(E e) {
    // 檢查e是不是null,是的話拋出NullPointerException異常。
    checkNotNull(e);
    // 創建新的節點
    final Node<E> newNode = new Node<E>(e);

    // 將“新的節點”添加到鏈表的末尾。
    for (Node<E> t = tail, p = t;;) {//這個for迴圈是個死迴圈,增加了兩個指針p,t。
        Node<E> q = p.next;
        // 情況1:q為空,p就是尾節點,新節點插入
        if (q == null) {
            // CAS操作:如果“p的下一個節點為null”(即p為尾節點),則設置p的下一個節點為newNode。
            // 如果該CAS操作成功的話,則比較“p和t”(若p不等於t,則設置newNode為新的尾節點),然後返回true。
            // 如果該CAS操作失敗,這意味著“其它線程對尾節點進行了修改”,則重新迴圈。
            if (p.casNext(null, newNode)) {
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        // 情況2:p和q相等
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 情況3:其它\\
        //這裡是移動p指針,意思就是此時如果p不是最後一個元素則把p指針指向tail,否則指向q,也就是指向p.next元素。
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

說明: offer函數用於將指定元素插入此隊列的尾部。下麵模擬offer函數的操作,隊列狀態的變化(假設單線程添加元素,連續添加10、20兩個元素)。

若ConcurrentLinkedQueue的初始狀態如上圖所示,即隊列為空。單線程添加元素,此時,添加元素10,則狀態如下所示

如上圖所示,添加元素10後,tail沒有變化,還是指向之前的結點,繼續添加元素20,則狀態如下所示

如上圖所示,添加元素20後,tail指向了最新添加的結點。

poll函數

public E poll() {
    restartFromHead:
    for (;;) { // 無限迴圈
        for (Node<E> h = head, p = h, q;;) { // 保存頭節點
            // item項
            E item = p.item;

            if (item != null && p.casItem(item, null)) { // item不為null並且比較並替換item成功
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // p不等於h    // hop two nodes at a time
                    // 更新頭節點
                    updateHead(h, ((q = p.next) != null) ? q : p); 
                // 返回item
                return item;
            }
            else if ((q = p.next) == null) { // q結點為null
                // 更新頭節點
                updateHead(h, p);
                return null;
            }
            else if (p == q) // p等於q
                // 繼續迴圈
                continue restartFromHead;
            else
                // p賦值為q
                p = q;
        }
    }
}

說明: 此函數用於獲取並移除此隊列的頭,如果此隊列為空,則返回null。下麵模擬poll函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20後的狀態,poll兩次)。

隊列初始狀態如上圖所示,在poll操作後,隊列的狀態如下圖所示

如上圖可知,poll操作後,head改變了,並且head所指向的結點的item變為了null。再進行一次poll操作,隊列的狀態如下圖所示。

如上圖可知,poll操作後,head結點沒有變化,只是指示的結點的item域變成了null。

remove函數

public boolean remove(Object o) {
    // 元素為null,返回
    if (o == null) return false;
    Node<E> pred = null;
    for (Node<E> p = first(); p != null; p = succ(p)) { // 獲取第一個存活的結點
        // 第一個存活結點的item值
        E item = p.item;
        if (item != null &&
            o.equals(item) &&
            p.casItem(item, null)) { // 找到item相等的結點,並且將該結點的item設置為null
            // p的後繼結點
            Node<E> next = succ(p);
            if (pred != null && next != null) // pred不為null並且next不為null
                // 比較並替換next域
                pred.casNext(p, next);
            return true;
        }
        // pred賦值為p
        pred = p;
    }
    return false;
}

說明: 此函數用於從隊列中移除指定元素的單個實例(如果存在)。其中,會調用到first函數和succ函數,first函數的源碼如下

Node<E> first() {
    restartFromHead:
    for (;;) { // 無限迴圈,確保成功
        for (Node<E> h = head, p = h, q;;) {
            // p結點的item域是否為null
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) { // item不為null或者next域為null
                // 更新頭節點
                updateHead(h, p);
                // 返回結點
                return hasItem ? p : null;
            }
            else if (p == q) // p等於q
                // 繼續從頭節點開始
                continue restartFromHead;
            else
                // p賦值為q
                p = q;
        }
    }
}

說明: first函數用於找到鏈表中第一個存活的結點。succ函數源碼如下

final Node<E> succ(Node<E> p) {
    // p結點的next域
    Node<E> next = p.next;
    // 如果next域為自身,則返回頭節點,否則,返回next
    return (p == next) ? head : next;
}

說明: succ用於獲取結點的下一個結點。如果結點的next域指向自身,則返回head頭節點,否則,返回next結點。下麵模擬remove函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20後的狀態,執行remove(10)、remove(20)操作)。

如上圖所示,為ConcurrentLinkedQueue的初始狀態,remove(10)後的狀態如下圖所示

如上圖所示,當執行remove(10)後,head指向了head結點之前指向的結點的下一個結點,並且head結點的item域置為null。繼續執行remove(20),狀態如下圖所示

如上圖所示,執行remove(20)後,head與tail指向同一個結點,item域為null。

size函數

public int size() {
    // 計數
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個存活的結點開始往後遍歷
        if (p.item != null) // 結點的item域不為null
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE) // 增加計數,若達到最大值,則跳出迴圈
                break;
    // 返回大小
    return count;
}

說明: 此函數用於返回ConcurrenLinkedQueue的大小,從第一個存活的結點(first)開始,往後遍歷鏈表,當結點的item域不為null時,增加計數,之後返回大小。

HOPS(延遲更新的策略)的設計

通過上面對offer和poll方法的分析,我們發現tail和head是延遲更新的,兩者更新觸發時機為:

  • tail更新觸發時機:當tail指向的節點的下一個節點不為null的時候,會執行定位隊列真正的隊尾節點的操作,找到隊尾節點後完成插入之後才會通過casTail進行tail更新;當tail指向的節點的下一個節點為null的時候,只插入節點不更新tail。

  • head更新觸發時機:當head指向的節點的item域為null的時候,會執行定位隊列真正的隊頭節點的操作,找到隊頭節點後完成刪除之後才會通過updateHead進行head更新;當head指向的節點的item域不為null的時候,只刪除節點不更新head。

從上面更新時的狀態圖可以看出,head和tail的更新是“跳著的”即中間總是間隔了一個。那麼這樣設計的意圖是什麼呢?

如果讓tail永遠作為隊列的隊尾節點,實現的代碼量會更少,而且邏輯更易懂。但是,這樣做有一個缺點,如果大量的入隊操作,每次都要執行CAS進行tail的更新,彙總起來對性能也會是大大的損耗。如果能減少CAS更新的操作,無疑可以大大提升入隊的操作效率,所以doug lea大師每間隔1次(tail和隊尾節點的距離為1)進行才利用CAS更新tail。對head的更新也是同樣的道理,雖然,這樣設計會多出在迴圈中定位隊尾節點,但總體來說讀的操作效率要遠遠高於寫的性能,因此,多出來的在迴圈中定位尾節點的操作的性能損耗相對而言是很小的。

關於作者

來自一線程式員Seven的探索與實踐,持續學習迭代中~

本文已收錄於我的個人博客:https://www.seven97.top

公眾號:seven97,歡迎關註~

本文來自線上網站:seven的菜鳥成長之路,作者:seven,轉載請註明原文鏈接:www.seven97.top


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 正文 跟靈分別後,我在椅子上發了很久的呆。可能有五分鐘吧。就這樣感受著微風吹過身體,聽著身邊熙熙攘攘的聲音。 我不知道為什麼要發呆。自然而然就這樣做了。 過了很久,我意識到可能必須要吃飯了,才起身找飯館。 今天情緒不太好。我不知道是不是因為沒睡夠。不過,並沒有從一開始就很糟糕,到晚上才開始。 先是發 ...
  • UGit 是一款為騰訊內部研發環境特點而定製的 Git 客戶端,旨在提高開發效率和團隊協作的流暢性。UGit 支持 macOS 10.11+、Apple Silicon 和 Win64 位系統。 官方地址:https://ugit.qq.com/zh/index.html 主要特性 Ugit 提供便 ...
  • 類型轉換 類型轉換是將一個值從一種類型更改為另一種類型的過程。例如, 可以將String類型的數據“457”轉換為數值型,也可以將任意類型的 數據轉換為String類型。 如果從低精度數據類型向高精度數據類型轉換,則永遠不會溢出, 並且總是成功的;而把高精度數據類型向低精度數據類型轉換時,則會 有信 ...
  • 本文梳理總結了一些 Java 互聯網項目中常見的 Redis 緩存應用場景,例如常見的 String 類型 Key-Value、對時效性要求高的場景、Hash 結構的場景以及對實時性要求高的場景等,全面涵蓋了 Redis 中所有的 5 種基本類型。 ...
  • 在 Python 中,Zope 提供了一種機制來定義和實現介面。Zope 的介面模塊通常用於創建可重用的組件,並確保組件遵循特定的介面規範。 ...
  • Delete、Drop 和 Truncate delete、truncate 僅僅刪除表裡面的數據,drop會把表的結構也刪除 delete 是 DML 語句,操作完成後,可以回滾,truncate 和 drop 是 DDL 語句,刪除之後立即生效,不能回滾 執行效率:drop > truncate ...
  • python基礎 軟體下載 1.python下載安裝 點擊此鏈接進入官網windows下載地址 點擊箭頭處鏈接下載最新版本,進入頁面後下拉 根據你的機器下載對應版本,一般人使用的是X86架構windos系統,下載箭頭所指即可 若是不知道CPU架構,可見查看cpu架構,x86還是arm 下載後根據指引 ...
  • 實踐環境 Python 3.9.13 paho-mqtt 2.1.0 簡介 Eclipse Paho MQTT Python客戶端類庫實現了MQTT 協議版本 5.0, 3.1.1, 和3.1。 該類庫提供一個客戶端類,允許應用連接到MQTT代理併發布消息,訂閱主題並檢索發佈的消息。同時還提供了一個 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...