Java多線程之---用 CountDownLatch 說明 AQS 的實現原理

来源:https://www.cnblogs.com/fengzheng/archive/2018/06/08/9153720.html
-Advertisement-
Play Games

本文基於 jdk 1.8 。 CountDownLatch 的使用 "前面的文章" 中說到了 volatile 以及用 volatile 來實現自旋鎖,例如 java.util.concurrent.atomic 包下的工具類。但是 volatile 的使用場景畢竟有限,很多的情況下並不是適用,這個 ...


本文基於 jdk 1.8 。

CountDownLatch 的使用

前面的文章中說到了 volatile 以及用 volatile 來實現自旋鎖,例如 java.util.concurrent.atomic 包下的工具類。但是 volatile 的使用場景畢竟有限,很多的情況下並不是適用,這個時候就需要 synchronized 或者各種鎖實現了。今天就來說一下幾種鎖的實現原理。

先來看一個最簡單的 CountDownLatch 使用方法,例子很簡單,可以運行看一下效果。CountDownLatch 的作用是:當一個線程需要另外一個或多個線程完成後,再開始執行。比如主線程要等待一個子線程完成環境相關配置的載入工作,主線程才繼續執行,就可以利用 CountDownLatch 來實現。

例如下麵這個例子,首先實例化一個 CountDownLatch ,參數可以理解為一個計數器,這裡為 1,然後主線程執行,調用 worker 子線程,接著調用 CountDownLatch 的 await() 方法,表示阻塞主線程。當子線程執行完成後,在 finnaly 塊調用 countDown() 方法,表示一個等待已經完成,把計數器減一,直到減為 0,主線程又開始執行。

private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException{
        System.out.println("主線程開始......");
        Thread thread = new Thread(new Worker());
        thread.start();
        System.out.println("主線程等待......");
        System.out.println(latch.toString());
        latch.await();
        System.out.println(latch.toString());
        System.out.println("主線程繼續.......");
    }

    public static class Worker implements Runnable {

        @Override
        public void run() {
            System.out.println("子線程任務正在執行");
            try {
                Thread.sleep(2000);
            }catch (InterruptedException e){

            }finally {
                latch.countDown();
            }
        }
    }

執行結果如下:

主線程開始......
子線程任務正在執行
主線程等待......
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 1]
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 0]
主線程繼續.......

AQS 的原理

這麼好用的功能是怎麼實現的呢,下麵就來說一說實現它的核心技術原理 AQS。 AQS 全稱 AbstractQueuedSynchronizer,是 java.util.concurrent 中提供的一種高效且可擴展的同步機制。它可以用來實現可以依賴 int 狀態的同步器,獲取和釋放參數以及一個內部FIFO等待隊列,除了CountDownLatchReentrantLockSemaphore 等功能實現都使用了它。

接下來用 CountDownLatch 來分析一下 AQS 的實現。建議看文章的時候先大致看一下源碼,有助於理解下麵所說的內容。

在我們的方法中調用 awit()countDown()的時候,發生了幾個關鍵的調用關係,我畫了一個方法調用圖。

首先在 CountDownLatch 類內部定義了一個 Sync 內部類,這個內部類就是繼承自 AbstractQueuedSynchronizer 的。並且重寫了方法 tryAcquireSharedtryReleaseShared。例如當調用 awit()方法時,CountDownLatch 會調用內部類Sync 的 acquireSharedInterruptibly() 方法,然後在這個方法中會調用 tryAcquireShared 方法,這個方法就是 CountDownLatch 的內部類 Sync 里重寫的 AbstractQueuedSynchronizer 的方法。調用 countDown() 方法同理。

這種方式是使用 AbstractQueuedSynchronizer 的標準化方式,大致分為兩步:

1、內部持有繼承自 AbstractQueuedSynchronizer 的對象 Sync;

2、併在 Sync 內重寫 AbstractQueuedSynchronizer protected 的部分或全部方法,這些方法包括如下幾個:

之所以要求子類重寫這些方法,是為了讓使用者(這裡的使用者指 CountDownLatch 等)可以在其中加入自己的判斷邏輯,例如 CountDownLatch 在 tryAcquireShared中加入了判斷,判斷 state 是否不為0,如果不為0,才符合調用條件。

tryAcquiretryRelease是對應的,前者是獨占模式獲取,後者是獨占模式釋放。

tryAcquireSharedtryReleaseShared是對應的,前者是共用模式獲取,後者是共用模式釋放。

我們看到 CountDownLatch 重寫的方法 tryAcquireShared 實現如下:

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

判斷 state 值是否為0,為0 返回1,否則返回 -1。state 值是 AbstractQueuedSynchronizer 類中的一個 volatile 變數。

private volatile int state;

在 CountDownLatch 中這個 state 值就是計數器,在調用 await 方法的時候,將值賦給 state 。

等待線程入隊

根據上面的邏輯,調用 await() 方法時,先去獲取 state 的值,當計數器不為0的時候,說明還有需要等待的線程在運行,則調用 doAcquireSharedInterruptibly 方法,進來執行的第一個動作就是嘗試加入等待隊列 ,即調用 addWaiter()方法, 源碼如下:

到這裡就走到了 AQS 的核心部分,AQS 用內部的一個 Node 類維護一個 CHL Node FIFO 隊列。將當前線程加入等待隊列,並通過 parkAndCheckInterrupt()方法實現當前線程的阻塞。下麵一大部分都是在說明 CHL 隊列的實現,裡面用 CAS 實現隊列出入不會發生阻塞。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //加入等待隊列                      
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        // 進入 CAS 迴圈
        try {
            for (;;) {
                //當一個節點(關聯一個線程)進入等待隊列後, 獲取此節點的 prev 節點 
                final Node p = node.predecessor();
                // 如果獲取到的 prev 是 head,也就是隊列中第一個等待線程
                if (p == head) {
                    // 再次嘗試申請 反應到 CountDownLatch 就是查看是否還有線程需要等待(state是否為0)
                    int r = tryAcquireShared(arg);
                    // 如果 r >=0 說明 沒有線程需要等待了 state==0
                    if (r >= 0) {
                        //嘗試將第一個線程關聯的節點設置為 head 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //經過自旋tryAcquireShared後,state還不為0,就會到這裡,第一次的時候,waitStatus是0,那麼node的waitStatus就會被置為SIGNAL,第二次再走到這裡,就會用LockSupport的park方法把當前線程阻塞住
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

我看看到上面先執行了 addWaiter() 方法,就是將當前線程加入等待隊列,源碼如下:

/** Marker to indicate a node is waiting in shared mode */
 static final Node SHARED = new Node();
 /** Marker to indicate a node is waiting in exclusive mode */
 static final Node EXCLUSIVE = null;

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 嘗試快速入隊操作,因為大多數時候尾節點不為 null
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果尾節點為空(也就是隊列為空) 或者嘗試CAS入隊失敗(由於併發原因),進入enq方法
        enq(node);
        return node;
    }

上面是向等待隊列中添加等待者(waiter)的方法。首先構造一個 Node 實體,參數為當前線程和一個mode,這個mode有兩種形式,一個是 SHARED ,一個是 EXCLUSIVE,請看上面的代碼。然後執行下麵的入隊操作 addWaiter,和 enq() 方法的 else 分支操作是一樣的,這裡的操作如果成功了,就不用再進到 enq() 方法的迴圈中去了,可以提高性能。如果沒有成功,再調用 enq() 方法。

private Node enq(final Node node) {
        // 死迴圈+CAS保證所有節點都入隊
        for (;;) {
            Node t = tail;
            // 如果隊列為空 設置一個空節點作為 head
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //加入隊尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

說明:迴圈加 CAS 操作是實現樂觀鎖的標準方式,CAS 是為了實現原子操作而出現的,所謂的原子操作指操作執行期間,不會受其他線程的干擾。Java 實現的 CAS 是調用 unsafe 類提供的方法,底層是調用 c++ 方法,直接操作記憶體,在 cpu 層面加鎖,直接對記憶體進行操作。

上面是 AQS 等待隊列入隊方法,操作在無限迴圈中進行,如果入隊成功則返回新的隊尾節點,否則一直自旋,直到入隊成功。假設入隊的節點為 node ,上來直接進入迴圈,在迴圈中,先拿到尾節點。

1、if 分支,如果尾節點為 null,說明現在隊列中還沒有等待線程,則嘗試 CAS 操作將頭節點初始化,然後將尾節點也設置為頭節點,因為初始化的時候頭尾是同一個,這和 AQS 的設計實現有關, AQS 預設要有一個虛擬節點。此時,尾節點不在為空,迴圈繼續,進入 else 分支;

2、else 分支,如果尾節點不為 null, node.prev = t ,也就是將當前尾節點設置為待入隊節點的前置節點。然後又是利用 CAS 操作,將待入隊的節點設置為隊列的尾節點,如果 CAS 返回 false,表示未設置成功,繼續迴圈設置,直到設置成功,接著將之前的尾節點(也就是倒數第二個節點)的 next 屬性設置為當前尾節點,對應 t.next = node 語句,然後返回當前尾節點,退出迴圈。

setHeadAndPropagate 方法負責將自旋等待或被 LockSupport 阻塞的線程喚醒。

private void setHeadAndPropagate(Node node, int propagate) {
        //備份現在的 head
        Node h = head;  
        //搶到鎖的線程被喚醒 將這個節點設置為head
        setHead(node)
        // propagate 一般都會大於0 或者存在可被喚醒的線程
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 只有一個節點 或者是共用模式 釋放所有等待線程 各自嘗試搶占鎖
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

Node 對象中有一個屬性是 waitStatus ,它有四種狀態,分別是:

//線程已被 cancelled ,這種狀態的節點將會被忽略,並移出隊列
static final int CANCELLED =  1;
// 表示當前線程已被掛起,並且後繼節點可以嘗試搶占鎖
static final int SIGNAL    = -1;
//線程正在等待某些條件
static final int CONDITION = -2;
//共用模式下 無條件所有等待線程嘗試搶占鎖
static final int PROPAGATE = -3;

等待線程被喚醒

當執行 CountDownLatch 的 countDown()方法,將計數器減一,也就是state減一,當減到0的時候,等待隊列中的線程被釋放。是調用 AQS 的 releaseShared 方法來實現的,下麵代碼中的方法是按順序調用的,摘到了一起,方便查看:

// AQS類
public final boolean releaseShared(int arg) {
        // arg 為固定值 1
        // 如果計數器state 為0 返回true,前提是調用 countDown() 之前不能已經為0
        if (tryReleaseShared(arg)) {
            // 喚醒等待隊列的線程
            doReleaseShared();
            return true;
        }
        return false;
    }

// CountDownLatch 重寫的方法
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 依然是迴圈+CAS配合 實現計數器減1
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

/// AQS類
 private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 如果節點狀態為SIGNAL,則他的next節點也可以嘗試被喚醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 將節點狀態設置為PROPAGATE,表示要向下傳播,依次喚醒
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

因為這是共用型的,當計數器為 0 後,會喚醒等待隊列里的所有線程,所有調用了 await() 方法的線程都被喚醒,併發執行。這種情況對應到的場景是,有多個線程需要等待一些動作完成,比如一個線程完成初始化動作,其他5個線程都需要用到初始化的結果,那麼在初始化線程調用 countDown 之前,其他5個線程都處在等待狀態。一旦初始化線程調用了 countDown ,其他5個線程都被喚醒,開始執行。

總結

1、AQS 分為獨占模式和共用模式,CountDownLatch 使用了它的共用模式。

2、AQS 當第一個等待線程(被包裝為 Node)要入隊的時候,要保證存在一個 head 節點,這個 head 節點不關聯線程,也就是一個虛節點。

3、當隊列中的等待節點(關聯線程的,非 head 節點)搶到鎖,將這個節點設置為 head 節點。

4、第一次自旋搶鎖失敗後,waitStatus 會被設置為 -1(SIGNAL),第二次再失敗,就會被 LockSupport 阻塞掛起。

5、如果一個節點的前置節點為 SIGNAL 狀態,則這個節點可以嘗試搶占鎖。

不妨到我的公眾號里互動一下 :古時的風箏

掃碼關註


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 請求報文:請求行+請求頭+空行+請求體 響應報文:響應行+響應頭+響應體 如何打開谷歌的開發者工具? ...
  • 什麼是伺服器? 伺服器也是電腦,只不過是比我們的電腦配置更高的電腦,並且24小時不斷電,不關機的電腦 伺服器是專門用於存儲數據的電腦,訪問者可以訪問伺服器獲得伺服器上存儲的數據 伺服器一旦關機,訪問者就無法訪問 3.瀏覽器訪問網頁原理(理解) 所以綜合我們的第二點和第三點,我們得出一個結論,這些系 ...
  • body{ font family: SimSun,PingFang SC; } ubuntu搭建nodejs生產環境——快速部署手冊 為什麼不用CentOS而用Ubuntu作為生產環境的運行平臺?這個我也比較好奇,公司訂的只能沿用傳統,從使用成本的角度來說,此舉也是值得肯定的。 測試環境 騰訊雲 ...
  • 本書是Eric Evans對他自己寫的《領域驅動設計-軟體核心複雜性應對之道》的一本字典式的參考書,可用於快速查找《領域驅動設計》中的諸多概念及其簡明解釋。 其它本系列其它文章地址: [譯文]Domain Driven Design Reference(一)—— 前言 [譯文]Domain Driv ...
  • 約定 還記得上版本的第二十四篇的約定嘛?現在出來履行啦~ 為什麼要重製? 之前寫的專欄都是按照心情寫的,在最初的時候筆者什麼都不懂,而且文章的發佈是按照很隨性的一個順序。結果就是說,大家都看完了,都還對框架沒有一個感覺,感覺很亂。而現在,經過兩年多的摸索,筆者已經對框架的體系有了一個瞭解,所以希望再 ...
  • Eclipse作為軟體開發的常用工具,被很多的人所歡迎,尤其是豐富的快捷鍵,可以極大的提高編碼的效率,下麵將常用的快捷鍵做了整理,便於大家學習和使用。 Eclipse常用快捷鍵 1代碼提示 Alt+/; 2代碼複製 ctrl+alt+方向鍵(上下) 複製快捷鍵 3單行註釋 方式一:ctr+/ 方式二 ...
  • 轉自:https://my.oschina.net/editorial-story/blog/1808757 本文是學習大型分散式網站架構的技術總結。對架構一個高性能、高可用、可伸縮及可擴展的分散式網站進行了概要性描述,並給出一個架構參考。文中一部分為讀書筆記,一部分是個人經驗總結,對大型分散式網站 ...
  • Java開源生鮮電商平臺-定時器,定時任務quartz的設計與架構(源碼可下載) 說明:任何業務有時候需要系統在某個定點的時刻執行某些任務,比如:凌晨2點統計昨天的報表,早上6點抽取用戶下單的佣金。 對於Java開源生鮮電商平臺而言,有定時推送客戶備貨,定時計算賣家今日的收益,定時提醒每日的提現金額 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...