死磕 java集合之SynchronousQueue源碼分析

来源:https://www.cnblogs.com/tong-yuan/archive/2019/04/25/SynchronousQueue.html
-Advertisement-
Play Games

SynchronousQueue的實現方式? SynchronousQueue真的是無緩衝的嗎? SynchronousQueue在高併發情景下會有什麼問題? ...


問題

(1)SynchronousQueue的實現方式?

(2)SynchronousQueue真的是無緩衝的嗎?

(3)SynchronousQueue在高併發情景下會有什麼問題?

簡介

SynchronousQueue是java併發包下無緩衝阻塞隊列,它用來在兩個線程之間移交元素,但是它有個很大的問題,你知道是什麼嗎?請看下麵的分析。

源碼分析

主要屬性

// CPU的數量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超時的情況自旋多少次,當CPU數量小於2的時候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 沒有超時的情況自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 針對有超時的情況,自旋了多少次後,如果剩餘時間大於1000納秒就使用帶時間的LockSupport.parkNanos()這個方法
static final long spinForTimeoutThreshold = 1000L;
// 傳輸器,即兩個線程交換元素使用的東西
private transient volatile Transferer<E> transferer;

通過屬性我們可以Get到兩個點:

(1)這個阻塞隊列裡面是會自旋的;

(2)它使用了一個叫做transferer的東西來交換元素;

主要內部類

// Transferer抽象類,主要定義了一個transfer方法用來傳輸元素
abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}
// 以棧方式實現的Transferer
static final class TransferStack<E> extends Transferer<E> {
    // 棧中節點的幾種類型:
    // 1. 消費者(請求數據的)
    static final int REQUEST    = 0;
    // 2. 生產者(提供數據的)
    static final int DATA       = 1;
    // 3. 二者正在匹配中
    static final int FULFILLING = 2;

    // 棧中的節點
    static final class SNode {
        // 下一個節點
        volatile SNode next;        // next node in stack
        // 匹配者
        volatile SNode match;       // the node matched to this
        // 等待著的線程
        volatile Thread waiter;     // to control park/unpark
        // 元素
        Object item;                // data; or null for REQUESTs
        // 模式,也就是節點的類型,是消費者,是生產者,還是正在匹配中
        int mode;
    }
    // 棧的頭節點
    volatile SNode head;
}
// 以隊列方式實現的Transferer
static final class TransferQueue<E> extends Transferer<E> {
    // 隊列中的節點
    static final class QNode {
        // 下一個節點
        volatile QNode next;          // next node in queue
        // 存儲的元素
        volatile Object item;         // CAS'ed to or from null
        // 等待著的線程
        volatile Thread waiter;       // to control park/unpark
        // 是否是數據節點
        final boolean isData;
    }

    // 隊列的頭節點
    transient volatile QNode head;
    // 隊列的尾節點
    transient volatile QNode tail;
}

(1)定義了一個抽象類Transferer,裡面定義了一個傳輸元素的方法;

(2)有兩種傳輸元素的方法,一種是棧,一種是隊列;

(3)棧的特點是後進先出,隊列的特點是先進行出;

(4)棧只需要保存一個頭節點就可以了,因為存取元素都是操作頭節點;

(5)隊列需要保存一個頭節點一個尾節點,因為存元素操作尾節點,取元素操作頭節點;

(6)每個節點中保存著存儲的元素、等待著的線程,以及下一個節點;

(7)棧和隊列兩種方式有什麼不同呢?請看下麵的分析。

主要構造方法

public SynchronousQueue() {
    // 預設非公平模式
    this(false);
}

public SynchronousQueue(boolean fair) {
    // 如果是公平模式就使用隊列,如果是非公平模式就使用棧
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

(1)預設使用非公平模式,也就是棧結構;

(2)公平模式使用隊列,非公平模式使用棧;

入隊

我們這裡主要介紹以棧方式實現的傳輸模式,以put(E e)方法為例。

public void put(E e) throws InterruptedException {
    // 元素不可為空
    if (e == null) throw new NullPointerException();
    // 直接調用傳輸器的transfer()方法
    // 三個參數分別是:傳輸的元素,是否需要超時,超時的時間
    if (transferer.transfer(e, false, 0) == null) {
        // 如果傳輸失敗,直接讓線程中斷並拋出中斷異常
        Thread.interrupted();
        throw new InterruptedException();
    }
}

調用transferer的transfer()方法,傳入元素e,說明是生產者

出隊

我們這裡主要介紹以棧方式實現的傳輸模式,以take()方法為例。

public E take() throws InterruptedException {
    // 直接調用傳輸器的transfer()方法
    // 三個參數分別是:null,是否需要超時,超時的時間
    // 第一個參數為null表示是消費者,要取元素
    E e = transferer.transfer(null, false, 0);
    // 如果取到了元素就返回
    if (e != null)
        return e;
    // 否則讓線程中斷並拋出中斷異常
    Thread.interrupted();
    throw new InterruptedException();
}

調用transferer的transfer()方法,傳入null,說明是消費者。

transfer()方法

transfer()方法同時實現了取元素和放元素的功能,下麵我再來看看這個transfer()方法里究竟幹了什麼。

// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    // 根據e是否為null決定是生產者還是消費者
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋+CAS,熟悉的套路,熟悉的味道
    for (;;) {
        // 棧頂元素
        SNode h = head;
        // 棧頂沒有元素,或者棧頂元素跟當前元素是一個模式的
        // 也就是都是生產者節點或者都是消費者節點
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 如果有超時而且已到期
            if (timed && nanos <= 0) {      // can't wait
                // 如果頭節點不為空且是取消狀態
                if (h != null && h.isCancelled())
                    // 就把頭節點彈出,併進入下一次迴圈
                    casHead(h, h.next);     // pop cancelled node
                else
                    // 否則,直接返回null(超時返回null)
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 入棧成功(因為是模式相同的,所以只能入棧)
                // 調用awaitFulfill()方法自旋+阻塞當前入棧的線程並等待被匹配到
                SNode m = awaitFulfill(s, timed, nanos);
                // 如果m等於s,說明取消了,那麼就把它清除掉,並返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    // 被取消了返回null
                    return null;
                }
                
                // 到這裡說明匹配到元素了
                // 因為從awaitFulfill()裡面出來要不被取消了要不就匹配到了
                
                // 如果頭節點不為空,並且頭節點的下一個節點是s
                // 就把頭節點換成s的下一個節點
                // 也就是把h和s都彈出了
                // 也就是把棧頂兩個元素都彈出了
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 根據當前節點的模式判斷返回m還是s中的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 到這裡說明頭節點和當前節點模式不一樣
            // 如果頭節點不是正在匹配中
            
            // 如果頭節點已經取消了,就把它彈出棧
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 頭節點沒有在匹配中,就讓當前節點先入隊,再讓他們嘗試匹配
                // 且s成為了新的頭節點,它的狀態是正在匹配中
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    // 如果m為null,說明除了s節點外的節點都被其它線程先一步匹配掉了
                    // 就清空棧並跳出內部迴圈,到外部迴圈再重新入棧判斷
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // 如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        // 返回匹配結果
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
                        // 就協助清除它
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            // 到這裡說明當前節點和頭節點模式不一樣
            // 且頭節點是正在匹配中
            
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                // 如果m為null,說明m已經被其它線程先一步匹配了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                // 協助匹配,如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
                if (m.tryMatch(h))          // help match
                    // 將棧頂的兩個元素彈出後,再讓s重新入棧
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
                    // 就協助清除它
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

// 三個參數:需要等待的節點,是否需要超時,超時時間
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 到期時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 當前線程
    Thread w = Thread.currentThread();
    // 自旋次數
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 當前線程中斷了,嘗試清除s
        if (w.isInterrupted())
            s.tryCancel();
        
        // 檢查s是否匹配到了元素m(有可能是其它線程的m匹配到當前線程的s)
        SNode m = s.match;
        // 如果匹配到了,直接返回m
        if (m != null)
            return m;
        
        // 如果需要超時
        if (timed) {
            // 檢查超時時間如果小於0了,嘗試清除s
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            // 如果還有自旋次數,自旋次數減一,併進入下一次自旋
            spins = shouldSpin(s) ? (spins-1) : 0;
        
        // 後面的elseif都是自旋次數沒有了
        else if (s.waiter == null)
            // 如果s的waiter為null,把當前線程註入進去,併進入下一次自旋
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // 如果不允許超時,直接阻塞,並等待被其它線程喚醒,喚醒後繼續自旋並查看是否匹配到了元素
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            // 如果允許超時且還有剩餘時間,就阻塞相應時間
            LockSupport.parkNanos(this, nanos);
    }
}

    // SNode裡面的方向,調用者m是s的下一個節點
    // 這時候m節點的線程應該是阻塞狀態的
    boolean tryMatch(SNode s) {
        // 如果m還沒有匹配者,就把s作為它的匹配者
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                // 喚醒m中的線程,兩者匹配完畢
                LockSupport.unpark(w);
            }
            // 匹配到了返回true
            return true;
        }
        // 可能其它線程先一步匹配了m,返回其是否是s
        return match == s;
    }

整個邏輯比較複雜,這裡為了簡單起見,屏蔽掉多線程處理的細節,只描述正常業務場景下的邏輯:

(1)如果棧中沒有元素,或者棧頂元素跟將要入棧的元素模式一樣,就入棧;

(2)入棧後自旋等待一會看有沒有其它線程匹配到它,自旋完了還沒匹配到元素就阻塞等待;

(3)阻塞等待被喚醒了說明其它線程匹配到了當前的元素,就返回匹配到的元素;

(4)如果兩者模式不一樣,且頭節點沒有在匹配中,就拿當前節點跟它匹配,匹配成功了就返回匹配到的元素;

(5)如果兩者模式不一樣,且頭節點正在匹配中,當前線程就協助去匹配,匹配完成了再讓當前節點重新入棧重新匹配;

如果直接閱讀這部分代碼還是比較困難的,建議寫個測試用例,打個斷點一步一步跟蹤調試。

下麵是我的測試用例,可以參考下,在IDEA中可以讓斷點只阻塞線程:

public class TestSynchronousQueue {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);

        new Thread(()->{
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();


        Thread.sleep(500);
        System.out.println(queue.take());
    }
}

修改斷點只阻塞線程的方法,右擊斷點,選擇Thread:

thread

交給你了

上面的源碼分析都是基於Stack的方式來分析的,那麼隊列是怎麼運行的呢?很簡單哦,測試用例中的false改成true就可以了,這就交給你了。

總結

(1)SynchronousQueue是java里的無緩衝隊列,用於在兩個線程之間直接移交元素;

(2)SynchronousQueue有兩種實現方式,一種是公平(隊列)方式,一種是非公平(棧)方式;

(3)棧方式中的節點有三種模式:生產者、消費者、正在匹配中;

(4)棧方式的大致思路是如果棧頂元素跟自己一樣的模式就入棧並等待被匹配,否則就匹配,匹配到了就返回;

(5)隊列方式的大致思路是……不告訴你^^(兩者的邏輯差別還是挺大的)

彩蛋

(1)SynchronousQueue真的是無緩衝的隊列嗎?

通過源碼分析,我們可以發現其實SynchronousQueue內部或者使用棧或者使用隊列來存儲包含線程和元素值的節點,如果同一個模式的節點過多的話,它們都會存儲進來,且都會阻塞著,所以,嚴格上來說,SynchronousQueue並不能算是一個無緩衝隊列。

(2)SynchronousQueue有什麼缺點呢?

試想一下,如果有多個生產者,但只有一個消費者,如果消費者處理不過來,是不是生產者都會阻塞起來?反之亦然。

這是一件很危險的事,所以,SynchronousQueue一般用於生產、消費的速度大致相當的情況,這樣才不會導致系統中過多的線程處於阻塞狀態。


歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 下麵是golang實現的簡單優先隊列,參考信息可以查看https://golang.org/pkg/container/heap/或者https://golang.google.cn/pkg/container/heap/,後面這個網址也是官方提供的網址,關於這個網頁的說明,可以參考https:// ...
  • 01、 來看網路上對介面的一番解釋: 介面(英文:Interface),在 Java 編程語言中是一個抽象類型,是抽象方法的集合。一個類通過繼承介面的方式,從而來繼承介面的抽象方法。 兄弟們,你們怎麼看,這段解釋把我繞得暈乎乎的,好像喝過一斤二鍋頭。到底是解釋抽象類呢還是介面呢?傻傻分不清楚。 搞不 ...
  • 昨天在星球的【從零單排】系列分享了一篇【字典存儲結構的實現方式】,我覺得這篇文章寫的還是蠻好的,就分享給大家了。 ...
  • php的會話控制,什麼是會話控制,http等。 什麼是會話控制思想,http協議。 cookie 和 session http是超文本傳輸協議,是網路上最廣泛的一種網路協議。 http最大特點是無連接無狀態,clinet到http request到server,server到http respons ...
  • 一、什麼是MVC MVC即Model-View-Controller(模型-視圖-控制器)是一種軟體設計模式,最早出現在Smalltalk語言中,後被Sun公司推薦為Java EE平臺的設計模式。 MVC把應用程式分成了上面3個核心模塊,這3個模塊又可被稱為業務層-視圖層-控制層。顧名思義,它們三者 ...
  • NumPy 數組操作: 1、修改數組形狀 a、numpy.reshape(arr, newshape, order='C') 在不改變數據的條件下修改形狀 b、numpy.ndarray.flat 是一個數組元素迭代器 c、numpy.ndarray.flatten(self, order) 返回一 ...
  • Python基礎之函數的基本使用,函數的參數與返回值,函數的嵌套調用,模塊的介紹;其中,函數的基本使用包括 函數定義的語法,函數的命名規則,函數的調用,函數的註意事項,函數的文檔註釋等;函數的參數與返回值包括 函數參數的使用,形參和實參,函數返回值;函數的嵌套調用包括 函數嵌套調用示例,給函數增加文... ...
  • 廢話不多說,直接開始 1.首先,導入所需的模塊: 關閉tensorflow輸出的一大堆硬體信息 2.寫一個函數generate_data(),用來生成我們所需要的數據,這裡使用的線性函數是y = 0.1*x + 0.3,具體解釋見註釋 說一下上面8,9兩行的操作,其實 與 等價,只是這樣寫比較方便。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...