SynchronousQueue詳解

来源:https://www.cnblogs.com/chafry/archive/2022/10/13/16782932.html
-Advertisement-
Play Games

SynchronousQueue介紹 【1】SynchronousQueue是一個沒有數據緩衝的BlockingQueue,生產者線程對其的插入操作put必須等待消費者的移除操作take。 【2】如圖所示,SynchronousQueue 最大的不同之處在於,它的容量為 0,所以沒有一個地方來暫存元 ...


SynchronousQueue介紹

  【1】SynchronousQueue是一個沒有數據緩衝的BlockingQueue,生產者線程對其的插入操作put必須等待消費者的移除操作take。

             

  【2】如圖所示,SynchronousQueue 最大的不同之處在於,它的容量為 0,所以沒有一個地方來暫存元素,導致每次取數據都要先阻塞,直到有數據被放入;同理,每次放數據的時候也會阻塞,直到有消費者來取

  【3】需要註意的是,SynchronousQueue 的容量不是 1 而是 0,因為 SynchronousQueue 不需要去持有元素,它所做的就是直接傳遞(direct handoff)。由於每當需要傳遞的時候,SynchronousQueue 會把元素直接從生產者傳給消費者,在此期間並不需要做存儲,所以如果運用得當,它的效率是很高的。

SynchronousQueue的源碼分析

  【1】構造函數

//預設採用非公平
public SynchronousQueue() {
    this(false);
}
//可以選擇模式
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

  【2】核心方法分析

//這些方法本質上都是調用屬性值transferer的transfer方法
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    return transferer.transfer(null, true, 0);
}

s

Transferer分析

  【1】Transferer是SynchronousQueue的內部抽象類,雙棧和雙隊列演算法共用該類。他只有一個transfer方法,用於轉移元素,從生產者轉移到消費者;或者消費者調用該方法從生產者取數據。

  【2】Transferer有兩個實現類:TransferQueue和TransferStack。

  【3】這兩個類的區別就在於是否公平。TransferQueue是公平的,TransferStack非公平。

  【4】源碼展示

// 堆棧和隊列共同的介面,負責執行 put or take
abstract static class Transferer<E> {
    // e 為空的,會直接返回特殊值,不為空會傳遞給消費者
    // timed 為 true,說明會有超時時間
    abstract E transfer(E e, boolean timed, long nanos);
}

 

TransferQueue分析

  【1】節點元素

//隊列節點元素
static final class QNode {
    // 當前元素的下一個元素
    volatile QNode next;          
    // 當前元素的值,如果當前元素被阻塞住了,等其他線程來喚醒自己時,其他線程會把自己 set 到 item 裡面
    volatile Object item;         
    // 可以阻塞住的當前線程
    volatile Thread waiter;       
    // 節點類型:true是 put,false是 take
    final boolean isData;         

   ....
}

  【2】構造方法

//隊列頭結點指針
transient volatile QNode head;
//隊列尾結點指針
transient volatile QNode tail;

TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

  【3】核心方法

@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

    QNode s = null; 
    //根據是否傳入數據 判斷是獲取還是存放 
    boolean isData = (e != null);

    for (;;) {
        // 隊列頭和尾的臨時變數,隊列是空的時候,t=h
        QNode t = tail;
        QNode h = head;
        // tail 和 head 沒有初始化時,無限迴圈,雖然這種 continue 非常耗cpu,但感覺不會碰到這種情況
        // 因為 tail 和 head 在 TransferQueue 初始化的時候,就已經被賦值空節點了
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin
        // 首尾節點相同,說明是空隊列
        // 或者尾節點的操作和當前節點操作一致
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  //直至拿到尾節點
                continue;
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            //超時直接返回 null
            if (timed && nanos <= 0)        // can't wait
                return null;
            //構建新節點
            if (s == null)
                s = new QNode(e, isData);
            //將新建節點塞入隊列
            if (!t.casNext(null, s))        // failed to link in
                continue;

            advanceTail(t, s);             
            // 阻塞住自己
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } 
        // 隊列不為空,並且當前操作和隊尾不一致,也就是說當前操作是隊尾是對應的操作
        // 比如說隊尾是因為 take 被阻塞的,那麼當前操作必然是 put
        else {
            // 也就是這行代碼體現出隊列的公平,每次操作時,從頭開始按照順序進行操作
            QNode m = h.next;              
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // 當前操作放到隊頭
            advanceHead(h, m);           
            // 釋放隊頭阻塞節點
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

 

TransferStack分析

  【1】節點元素

// 棧中節點的幾種類型:
// 1. 消費者(請求數據的)
static final int REQUEST    = 0;
// 2. 生產者(提供數據的)
static final int DATA       = 1;
// 3. 二者正在匹配中
static final int FULFILLING = 2;

// 棧中的節點
static final class SNode {
    // 下一個節點
    volatile SNode next;        
    volatile SNode match;       // the node matched to this
    // 等待著的線程
    volatile Thread waiter;    
    Object item;                
    // 模式,也就是節點的類型,是消費者,是生產者,還是正在匹配中
    int mode;
...
}

 

  【2】核心方法

// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    // 根據e是否為null決定是生產者還是消費者
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋+CAS
    for (;;) {
        // 棧頂元素
        SNode h = head;
        // 棧頂沒有元素,或者棧頂元素跟當前元素是一個模式的
        // 也就是都是生產者節點或者都是消費者節點
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 如果有超時而且已到期
            if (timed && nanos <= 0) {      // can't wait
                // 如果頭節點不為空且是取消狀態
                if (h != null && h.isCancelled())
                    // 就把頭節點彈出,併進入下一次迴圈
                    casHead(h, h.next);     // pop cancelled node
                else
                    // 否則,直接返回null(超時返回null)
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 入棧成功(因為是模式相同的,所以只能入棧)
                // 調用awaitFulfill()方法自旋+阻塞當前入棧的線程並等待被匹配到
                SNode m = awaitFulfill(s, timed, nanos);
                // 如果m等於s,說明取消了,那麼就把它清除掉,並返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    // 被取消了返回null
                    return null;
                }
                
                // 到這裡說明匹配到元素了
                // 因為從awaitFulfill()裡面出來要不被取消了要不就匹配到了
                // 如果頭節點不為空,並且頭節點的下一個節點是s
                // 就把頭節點換成s的下一個節點
                // 也就是把h和s都彈出了
                // 也就是把棧頂兩個元素都彈出了
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 根據當前節點的模式判斷返回m還是s中的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 到這裡說明頭節點和當前節點模式不一樣
            // 如果頭節點不是正在匹配中
            
            // 如果頭節點已經取消了,就把它彈出棧
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 頭節點沒有在匹配中,就讓當前節點先入隊,再讓他們嘗試匹配
                // 且s成為了新的頭節點,它的狀態是正在匹配中
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    // 如果m為null,說明除了s節點外的節點都被其它線程先一步匹配掉了
                    // 就清空棧並跳出內部迴圈,到外部迴圈再重新入棧判斷
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // 如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        // 返回匹配結果
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
                        // 就協助清除它
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            // 到這裡說明當前節點和頭節點模式不一樣
            // 且頭節點是正在匹配中
            
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                // 如果m為null,說明m已經被其它線程先一步匹配了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                // 協助匹配,如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
                if (m.tryMatch(h))          // help match
                    // 將棧頂的兩個元素彈出後,再讓s重新入棧
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
                    // 就協助清除它
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

// 三個參數:需要等待的節點,是否需要超時,超時時間
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 到期時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 當前線程
    Thread w = Thread.currentThread();
    // 自旋次數
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 當前線程中斷了,嘗試清除s
        if (w.isInterrupted())
            s.tryCancel();
        
        // 檢查s是否匹配到了元素m(有可能是其它線程的m匹配到當前線程的s)
        SNode m = s.match;
        // 如果匹配到了,直接返回m
        if (m != null)
            return m;
        
        // 如果需要超時
        if (timed) {
            // 檢查超時時間如果小於0了,嘗試清除s
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            // 如果還有自旋次數,自旋次數減一,併進入下一次自旋
            spins = shouldSpin(s) ? (spins-1) : 0;
        
        // 後面的elseif都是自旋次數沒有了
        else if (s.waiter == null)
            // 如果s的waiter為null,把當前線程註入進去,併進入下一次自旋
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // 如果不允許超時,直接阻塞,並等待被其它線程喚醒,喚醒後繼續自旋並查看是否匹配到了元素
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            // 如果允許超時且還有剩餘時間,就阻塞相應時間
            LockSupport.parkNanos(this, nanos);
    }
}

// SNode裡面的方向,調用者m是s的下一個節點
// 這時候m節點的線程應該是阻塞狀態的
boolean tryMatch(SNode s) {
    // 如果m還沒有匹配者,就把s作為它的匹配者
    if (match == null &&
        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // waiters need at most one unpark
            waiter = null;
            // 喚醒m中的線程,兩者匹配完畢
            LockSupport.unpark(w);
        }
        // 匹配到了返回true
        return true;
    }
    // 可能其它線程先一步匹配了m,返回其是否是s
    return match == s;
}

 

SynchronousQueue總結

  【1】是一個沒有數據緩衝的BlockingQueue,容量為0,它不會為隊列中元素維護存儲空間,它只是多個線程之間數據交換的媒介。

  【2】數據結構:鏈表,在其內部類中維護了數據

      先消費(take),後生產(put);

        第一個線程Thread0是消費者訪問,此時隊列為空,則入隊(創建Node結點並賦值)

        第二個線程Thread1也是消費者訪問,與隊尾模式相同,繼續入隊

        第三個線程Thread2是生產者,攜帶了數據e,與隊尾模式不同,不進行入隊操作。直接將該線程攜帶的數據e返回給隊首的消費者,並喚醒隊首線程Thread1(預設非公平策略是棧結構),出隊。

      反之,先生產(put)後消費(take),原理一樣

  【3】鎖:CAS+自旋(無鎖)【阻塞:自旋了一定次數後調用 LockSupport.park()

  【4】存取調用同一個方法:transfer()

      put、offer 為生產者,攜帶了數據 e,為 Data 模式,設置到 SNode或QNode 屬性中。

      take、poll 為消費者,不攜帯數據,為 Request 模式,設置到 SNode或QNode屬性中。

  【5】過程

      線程訪問阻塞隊列,先判斷隊尾節點或者棧頂節點的 Node 與當前入隊模式是否相同

      相同則構造節點 Node 入隊,並阻塞當前線程,元素 e 和線程賦值給 Node 屬性

      不同則將元素 e(不為 null) 返回給取數據線程,隊首或棧頂線程被喚醒,出隊

  【6】公平模式:TransferQueue,隊尾匹配(判斷模式),隊頭出隊,先進先出

  【7】非公平模式(預設策略):TransferStack,棧頂匹配,棧頂出棧,後進先出

  【8】應用場景

      SynchronousQueue非常適合傳遞性場景做交換工作,生產者的線程和消費者的線程同步傳遞某些信息、事件或者任務。

      SynchronousQueue的一個使用場景是線上程池裡。如果我們不確定來自生產者請求數量,但是這些請求需要很快的處理掉,那麼配合SynchronousQueue為每個生產者請求分配一個消費線程是處理效率最高的辦法。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重覆使用,線程空閑了60秒後會被回收。

Transferer

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 快速上手iOS UIKit UIKit是蘋果官方的framework, 其中包含了各種UI組件, window和view, 事件處理, 交互, 動畫, 資源管理等基礎設施支持. 按照前面的介紹, 用UIKit寫UI可以用storyboard(Interface Builder)和代碼兩種方式. 大體 ...
  • 一、打包過程 1、創建唯一標識符 首先,申請蘋果開發者賬號。 沒有蘋果開發者賬號是無法進行 ios 打包上線的。 (2) 進入 https://developer.apple.com 這個網址,點擊“account” 並輸入蘋果開發者賬號進入用戶界面。 ​ (3) 點擊證書文件 ​ (4) 進入到這 ...
  • 概念 performance.now():返回值表示為從time origin之後到當前調用時經過的時間, time origin: 時間源, 時間源是一個可以被認定為當前文檔生命周期的開始節點的標準時間,計算方法如下: 如果腳本的 global object 是 Window, 則時間源的確定方式 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 方法一 adb connect連接調試 前提條件: 電腦已安裝adb工具 手機和電腦連接的同一個WIFI CMD進入到adb工具所在目錄,可以使用HBuilder自帶adb,如:D:\Tools\HBuilderX\plugins\lau ...
  • 前言 從2015年左右開始,Google、Baidu、Facebook 等互聯網巨頭,不謀而合地開始大力推行 HTTPS, 國內外的大型互聯網公司很多也都已經啟用了全站 HTTPS 為鼓勵全球網站的 HTTPS 實現,一些互聯網公司都提出了自己的要求: 1)Google 已調整搜索引擎演算法,讓採用 ...
  • 就像我們經常所說的:沒有最好的架構,只有最合適的架構。一個好的架構師,可以根據具體的需求、所擁有的資源等因素綜合考慮而設計出最優的架構方案。特別是現在,業務的飛速變化、數據無處不在等這些因素的影響下,技術和框架也需要在變化的過程中不斷地打磨和提升以適應新的業務需要。可能當時是最好的架構,但是後來我們... ...
  • 關於架構師的成長之路,還存在著一個誤區,就是把架構師預設為軟體架構師。因為今天我們所遇到的架構師,大多數都是圍繞著軟體研發。事實上這個認識有一定的片面性。誠然,現今我們所構建的系統都是軟體系統,但是在實際的工作過程中,隨著信息技術在深度和廣度上的快速發展,除了軟體研發以外,測試、網路、安全、配置、系... ...
  • 享元設計模式(Flyweight Design Pattern)通過共用技術實現相同或相似對象的重用,節省記憶體,前提是享元對象是不可變對象。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...