AbstractQueuedSynchronizer 隊列同步器源碼分析

来源:https://www.cnblogs.com/hxlzpnyist/archive/2019/03/14/10534220.html
-Advertisement-
Play Games

AbstractQueuedSynchronizer 隊列同步器(AQS) 隊列同步器 (AQS), 是用來構建鎖或其他同步組件的基礎框架,它通過使用 int 變數表示同步狀態,通過內置的 FIFO 的隊列完成資源獲取的排隊工作。(摘自《Java併發編程的藝術》) 我們知道獲取同步狀態有獨占和共用兩 ...


AbstractQueuedSynchronizer 隊列同步器(AQS)

隊列同步器 (AQS), 是用來構建鎖或其他同步組件的基礎框架,它通過使用 int 變數表示同步狀態,通過內置的 FIFO 的隊列完成資源獲取的排隊工作。(摘自《Java併發編程的藝術》)

我們知道獲取同步狀態有獨占和共用兩種模式,本文先針對獨占模式進行分析。

變數定義

private transient volatile Node head;

head 同步隊列頭節點

private transient volatile Node tail;

tail 同步隊列尾節點

private volatile int state;

state 同步狀態值

Node - 同步隊列節點定義

volatile int waitStatus;

waitStatus 節點的等待狀態,可取值如下 :

  • 0 : 初始狀態
  • -1 : SIGNAL 處於該狀態的節點,說明其後置節點處於等待狀態; 若當前節點釋放了鎖可喚醒後置節點
  • -2 : CONDITION 該狀態與 Condition 操作有關後續在說明
  • -3 : PROPAGATE 該狀態與共用式獲取同步狀態操作有關後續在說明
  • 1 : CANCELLED 處於該狀態的節點會取消等待,從隊列中移除
volatile Node prev;

prev 指向當前節點的前置節點

volatile Node next;

next 指向當前節點的後置節點

volatile Thread thread;

thread 節點對應的線程也是指當前獲取鎖失敗的線程

Node nextWaiter;

acquire()

獨占模式下獲取同步狀態, 既是當前只允許一個線程獲取到同步狀態

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

從 acquire 方法中我們可以大概猜測下,獲取鎖的過程如下:

  • tryAcquire 嘗試獲取同步狀態, 具體如何判定獲取到同步狀態由子類實現
  • 當獲取同步狀態失敗時,執行 addWaiter 創建獨占模式下的 Node 並將其添加到同步隊列尾部
  • 加入同步隊列之後,再次嘗試獲取同步狀態,當達到某種條件的時候將當前線程掛起等待喚醒

下麵具體看下各個階段如何實現:

private Node addWaiter(Node mode) {
    // 綁定當前線程 創建 Node 節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 判斷同步隊列尾節點是否為空
    if (pred != null) {
        // node 的前置節點指向隊列尾部
        node.prev = pred;
        // 將同步隊列的 tail 移動指向 node
        if (compareAndSetTail(pred, node)) {
            // 將原同步隊列的尾部後置節點指向 node
            pred.next = node;
            return node;
        }
    }
    // tail 為空說明同步隊列還未初始化
    // 此時調用 enq 完成隊列的初始化及 node 入隊
    enq(node);
    return node;
}
private Node enq(final Node node) {
    // 輪詢的方式執行
    // 成功入隊後退出
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 創建 Node, 並將 head 指向該節點
            // 同時將 tail 指向該節點
            // 完成隊列的初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // node 的前置節點指向隊列尾部
            node.prev = t;
            // 將同步隊列的 tail 移動指向 node
            if (compareAndSetTail(t, node)) {
                // 將原同步隊列的尾部後置節點指向 node
                t.next = node;
                return t;
            }
        }
    }
}

從代碼中可以看出通過 CAS 操作保證節點入隊的有序安全,其入隊過程中如下圖所示:

AQS節點入隊過程

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 
        for (;;) {
            // 獲取當前節點的前置節點
            final Node p = node.predecessor();
            // 判斷前置節點是否為 head 頭節點
            // 若前置節點為 head 節點,則再次嘗試獲取同步狀態
            if (p == head && tryAcquire(arg)) {
                // 若獲取同步狀態成功
                // 則將隊列的 head 移動指向當前節點
                setHead(node);
                // 將原頭部節點的 next 指向為空,便於對象回收
                p.next = null; // help GC
                failed = false;
                // 退出輪詢過程
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        // 若前置節點狀態為 -1 ,則說明後置節點 node 可以安全掛起了
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            // ws > 0 說明前置節點狀態為 CANCELLED , 也就是說前置節點為無效節點
            // 此時從前置節點開始向隊列頭節點方向尋找有效的前置節點
            // 此操作也即是將 CANCELLED 節點從隊列中移除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 若前置節點狀態為初始狀態 則將其狀態設為 -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    // 將當前線程掛起
    LockSupport.park(this);
    // 被喚醒後檢查當前線程是否被掛起
    return Thread.interrupted();
}

從 acquireQueued 的實現可以看出,節點在入隊後會採用輪詢的方式(自旋)重覆執行以下過程:

  • 判斷前置節點是否為 head, 若為 head 節點則嘗試獲取同步狀態; 若獲取同步狀態成功則移動 head 指向當前節點並退出迴圈
  • 若前置節點非 head 節點或者獲取同步狀態失敗,則將前置節點狀態修改為 -1, 並掛起當前線程,等待被喚醒重覆執行以上過程

如下圖所示:

AQS-節點自旋活動圖

接下來我們看看同步狀態釋放的實現。

release

釋放同步狀態

public final boolean release(int arg) {
    // 嘗試釋放同步狀態
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 喚醒後置節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        // 將 head 節點狀態改為 0
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 獲取後置節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒後置節點上所阻塞的線程
        LockSupport.unpark(s.thread);
}

從上述代碼,我們可以明白釋放同步狀態的過程如下:

  • 調用 tryRelease 嘗試釋放同步狀態,同樣其具體的實現由子類控制
  • 成功釋放同步狀態後,將 head 節點狀態改為 0
  • 喚醒後置節點上阻塞的線程

如下圖所示(紅色曲線表示節點自旋過程) :

AQS-釋放鎖

acquireInterruptibly()

獨占模式下獲取同步狀態,不同於 acquire 方法,該方法對中斷操作敏感; 也就是說當前線程在獲取同步狀態的過程中,若被中斷則會拋出中斷異常

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        // 檢查線程是否被中斷
        // 中斷則拋出中斷異常由調用方處理
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 不同於 acquire 的操作,此處在喚醒後檢查是否中斷,若被中斷直接拋出中斷異常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 拋出中斷異常後最終執行 cancelAcquire
            cancelAcquire(node);
    }
}
private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 若當前節點為 tail 節點,則將 tail 移動指向 node 的前置節點
    if (node == tail && compareAndSetTail(node, pred)) {
        // 同時將node 前置節點的 next 指向 null
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 當前節點位於隊列中部    
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                // 將前置節點的 next 指向 node 的後置節點
                compareAndSetNext(pred, predNext, next);
        } else {
            // 若 node 的前置節點為 head 節點則喚醒 node 節點的後置節點
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

從 acquireInterruptibly 的實現可以看出,若線程在獲取同步狀態的過程中出現中斷操作,則會將當前線程對應的同步隊列等待節點從隊列中移除並喚醒可獲取同步狀態的線程。

tryAcquireNanos()

獨占模式超時獲取同步狀態,該操作與acquireInterruptibly一樣對中斷操作敏感,不同在於超過等待時間若未獲取到同步狀態將會返回

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 計算等待到期時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                // 超時時間到期直接返回
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 按指定時間掛起s
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

節點的狀態

同步隊列中的節點在自旋獲取同步狀態的過程中,會將前置節點的狀態由 0 初始狀態改為 -1 SIGNAL, 若是中斷敏感的操作則會將狀態由 0 改為 1

同步隊列中的節點在釋放同步狀態的過程中會將同步隊列的 head 節點的狀態改為 0, 也即是由 -1 變為 0;

小結

本文主要分析了獨占模式獲取同步狀態的操作,其大概流程如下:

  • 在獲取同步狀態時,AQS 內部維護了一個同步隊列,獲取狀態失敗的線程會被構造一個節點加入到隊列中併進行一系列自旋操作
  • 在釋放同步狀態時,喚醒 head 的後置節點去獲取同步狀態

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Hystrix 說到Hystrix就得先說一下產生的背景等等,那就是雪崩效應. 在微服務中肯定存在多個服務層之間的調用,基礎服務的故障可能會導致級聯故障,進而造成整個系統不可用的情況,這種現象被稱為服務雪崩效應. 簡單的來說就是一種因"服務提供者"的不可用導致"服務消費者"的不可用,並將不可用逐漸放 ...
  • GitHub代碼練習地址:https://github.com/Neo-ML/PythonPractice/blob/master/SpiderPrac09_ProxyHandler.pyProxyHandler處理(代理伺服器) 使用代理IP,是爬蟲的常用手段提供代理伺服器的地址有很多,例如: w ...
  • Web框架 什麼是框架? 協助開發者快速開發web應程式的一套功能代碼 開發者只需要按照框架約定要求,在指定位置寫上自己的業務邏輯代碼即可 為什麼要用web框架? 使用web框架的主要目的就是避免重覆造輪子! web網站發展至今,特別是伺服器端,涉及知識內容非常廣泛!隨之,對程式員的技能要求也越來越 ...
  • GitHub代碼練習地址:https://github.com/Neo-ML/PythonPractice/blob/master/SpiderPrac08_useragent.py ...
  • (1)turtle使用pen來繪製圖形 pendown() 放下畫筆,移動到指定點後繼續繪製 penup() 提起畫筆,用於另起一個地方繪製時使用 pensize(width) 設置畫筆線條的粗細為指定大小 (2)turtle運動方法 forward() 沿著當前方向前進指定距離 backward( ...
  • 函數1 (1)定義: (2)參數傳遞: 在python中,一切都是對象,類型也屬於對象,變數是沒有類型的。 a = [1,2,3] a = "helloworld" 以上代碼中,[1,2,3]是list類型,"helloworld"是string類型,而變數a是沒有類型的,它僅僅是一個對象的引用(一 ...
  • 哈哈,今天開始我也是學車人了~ ...
  • <<<模板變數>>> (1)定義視圖函數 通過context傳遞參數來渲染模板,context要是個字典 當模板變數為可調用對象的時候,函數不傳遞參數 (2)配置模板文件 模板裡面引入模板變數用{{ }} 【"."可以用於取方法,屬性,字典的鍵值以及索引】 (3)訪問 模板變數不限於上面舉例的,有興 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...