Java CountDownLatch解析(上)

来源:http://www.cnblogs.com/yanphet/archive/2016/08/25/5788260.html
-Advertisement-
Play Games

寫在前面的話 最近一直在邊工作邊學習分散式的東西,看到了構建Java中間件的基礎知識,裡面有提到Java多線程併發的工具類,例如ReentrantLock、CyclicBarrier、CountDownLatch... 以前在工作中也有用到過這些實用的工具類,但是瞭解不是特別深入,藉此機會打個卡,好 ...


  • 寫在前面的話

最近一直在邊工作邊學習分散式的東西,看到了構建Java中間件的基礎知識,裡面有提到Java多線程併發的工具類,例如ReentrantLock、CyclicBarrier、CountDownLatch...

以前在工作中也有用到過這些實用的工具類,但是瞭解不是特別深入,藉此機會打個卡,好記性不如爛博客,哈哈哈...

  • CountDownLatch簡介

CountDownLatch顧名思義,count + down + latch = 計數 + 減 + 門閂(這麼拆分也是便於記憶=_=) 可以理解這個東西就是個計數器,只能減不能加,同時它還有個門閂的作用,當計數器不為0時,門閂是鎖著的;當計數器減到0時,門閂就打開了。

如果你感到懵比的話,可以類比考生考試交卷,考生交一份試卷,計數器就減一。直到考生都交了試卷(計數器為0),監考老師(一個或多個)才能離開考場。至於考生是否做完試卷,監考老師並不關註。只要都交了試卷,他就可以做接下來的工作了。

  • CountDownLatch實用場景

既然知道了它的定義,那什麼時候使用它呢?筆者能想到的場景是:

有任務A和任務B,任務B必須在任務A完成之後再做。而任務A還能被分為n部分,並且這n部分之間的任務互不影響。為了加快任務完成進度,把這n部分任務分給不同的線程,當A任務完成了,然後通知做B任務的線程接著完成任務,至於完成B任務的線程,可以是一個,也可以是多個。

上圖:

 

  • CountDownLatch實現原理

接下來就跟筆者來扒一扒CountDownLatch的源碼,它到底是怎麼實現這個牛逼功能的。(源碼版本JDK1.8)

public class CountDownLatch {

    // 內部類 繼承AQS類
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    //AQS子類的實例對象
    private final Sync sync;

    // 有參構造器
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 等待
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 超時等待
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 計數減1
    public void countDown() {
        sync.releaseShared(1);
    }

    // 獲取計數器當前計數
    public long getCount() {
        return sync.getCount();
    }

    // 吐司就不多說了吧
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

熟悉ReentrantLock的讀者應該知道這種結構,它也是採用這種結構來完成功能的,只是ReentrantLock在Sync這個內部類下,它還分了NonfairSync(非公平鎖)和FairSync(公平鎖)這兩個類來繼承Sync這個父類。這種結構的好處在於我們不必關心AbstractQueuedSynchronizer(以下簡稱AQS)的同步狀態管理、線程排隊、等待與喚醒等底層操作,我們只需重寫我們想要的方法(例如:tryAcquireShared tryReleaseShared)然後調用繼承AQS的方法(例如:getState(),setState())來改變同步狀態,即可生成我們特定的併發業務工具類。

扯遠了哈,接下來筆者準備分兩條線來分析CountDownLatch,一是CountDownLatch.await()阻塞當前線程,二是CountDownLatch.countDown()當前線程把計數器減一

一、CountDownLatch.await()

猜想一下:

 提問:實現這個功能你能想到的方法有哪些?

 回答:第一 你可能會使用線程的join(),讓當前線程等待join線程執行結束。其原理是不停檢查join線程是否存活,如果join線程存活則當前線程永遠等待。

    第二 你可能會使用線程間wait/notify,進入synchronized同步塊或方法中,檢查計數器值不為0,然後調用Object.wait();直到值為0則調用notifyAll()喚醒等待線程。

 分析:方法一 如果只有兩三個線程還好,如果數量過多,那得寫多少join啊,而且提前結束任務還得捕獲InterruptException異常,繁瑣...

    方法二 大量synchronized同步塊,還可能存在假喚醒...

 結論:上面提到的方法或多或少都存在這樣那樣的弊端,那我們就猜想一下思路解決這些弊端

    其一 我們可能需要一個volatile變數來實時感知計數器的值,一旦計數器值為0則喚醒阻塞在該條件上的線程

    其二 因為volatile只有數據實時透明性,它並不能保證線程的順序執行,所以我們可能需要一個同步隊列來放置這些阻塞隊列,當計數器值為0時,從隊列中挨著一個個喚醒線程

下麵開始我們的驗證:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}

構造方法傳入了一個int變數,而我們跟進去發現,這個int變數是AQS中的state,類型是volatile的,它就是用來表示計數器值的。由此證明我們的猜想。註意:count值需要大於等於0

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

當我們調用await()的方法後,會預設調用sync這個實例的acquireSharedInterruptibly這個方法,並且參數為1,需要註意的是,這個方法聲明瞭一個InterruptedException異常,表示調用該方法的線程支持打斷操作。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

我們跟進源碼發現,acquireSharedInterruptibly這個方法是sync繼承AQS而來的,這個方法的調用是響應線程的打斷的,所以在前兩行會檢查線程是否被打斷。接著調用tryAcquireShared()方法來判斷返回值,根據值的大小決定是否執行doAcquireSharedInterruptibly()。

// AQS中的方法
protected
int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // Sync中的方法 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // AQS中的方法 protected final int getState() { return state;// state是volatile }

我們看到AQS把這個方法留給子類去實現,在子類sync的tryAcquireShared中它只驗證了計數器的值是否為0,如果為0則返回1,反之返回-1,根據上面可以看出,整數就不會執行doAcquireSharedInterruptibly(),該線程就結束方法,繼續執行自己的代碼去了。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);// 往同步隊列中添加節點
    boolean failed = true;
    try {
        for (;;) {// 一個死迴圈 跳出迴圈只有下麵兩個途徑
            final Node p = node.predecessor();// 當前線程的前一個節點
            if (p == head) {// 如果是首節點
                int r = tryAcquireShared(arg);// 這個是不是似曾相識 見上面
                if (r >= 0) {
                    setHeadAndPropagate(node, r);// 處理後續節點
                    p.next = null; // help GC 這個可以借鑒
                    failed = false;
                    return;// 計數值為0 並且為頭節點 跳出迴圈
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();// 響應打斷 跳出迴圈
        }
    } finally {
        if (failed)
            cancelAcquire(node);// 如果是打斷退出的 則移除同步隊列節點
    }
}

接著我們來看doAcquireSharedInterruptibly這個方法,因為計數器值不為0需要阻塞線程,所以在進入方法時,將該線程包裝成節點並加入到同步隊列尾部(如何添加源碼稍後展示),我們看到這個方法退出去的途徑直有兩個,一個是return,一個是throw InterruptedException。註意最後的finally的處理。

return退出方法有兩個條件,首先計數值為0,接著必須是同步節點首節點。

throw InterruptedException是響應打斷操作的,線程在阻塞期間,如果你不想在等待了,可以打斷線程讓它繼續運行後面的任務(註意異常處理)

接著我們看看添加節點的源碼:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);// 包裝節點
    Node pred = tail;// 同步隊列尾節點
    if (pred != null) {// 同步隊列有尾節點 將我們的節點通過cas方式添加到隊列後面
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {// 以cas原子方式添加尾節點
            pred.next = node;
            return node;// 退出該方法
        }
    }
    enq(node);// 兩種情況執行這個代碼 1.隊列尾節點為null 2.隊列尾節點不為null,但是我們原子添加尾節點失敗
    return node;
}

private Node enq(final Node node) {
    for (;;) {// 又是一個死迴圈
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))// cas形式添加頭節點  註意 是頭節點
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {// cas形式添加尾節點
                t.next = node;
                return t;// 結束這個方法的唯一齣口 添加尾節點成功
            }
        }
    }
}

至此,CountDownLatch.await()阻塞當前線程的基本功能已經梳理出來了,CountDownLatch.countDown()計數器減一功能以及CountDownLatch示例和它的優缺點將留在下部分梳理。

然後,限於篇幅更多的compareAndSetHead()和compareAndSetTail()這些末節方法未詳細列出,希望讀者能自行查看api瞭解。

最後,由於筆者水平有限,難免有不足之處,有不對之處,請不吝惜指教。謝謝!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 轉義字元 意義 ASCII碼值(十進位) \a 響鈴(BEL) 007 \b 退格(BS) ,將當前位置移到前一列 008 \f 換頁(FF),將當前位置移到下頁開頭 012 \n 換行(LF) ,將當前位置移到下一行開頭 010 \r 回車(CR) ,將當前位置移到本行開頭 013 \t 水平製表 ...
  • C數據類型 基本類型 數值類型 整型 短整型short 整型int 長整形long 浮點型 單精度型float 雙精度型double 字元類型char 構造類型 數組 結構體struct 共用體union 枚舉類型enum 指針類型 空類型void 符號屬性 長度屬性 基本型 位長(位元組) 取值範圍 ...
  • 首先盲寫的一個傳輸文件的方法,但測試發現了一個非常不容易發現的問題,這裡先說明一下。 錯誤的代碼如下: 有看出來問題麽,沒有,看著一點問題也沒有,但執行的時候就是報錯,而且報錯的位置著實很頭痛,在那附近找了很久也沒找到...... 下麵是所報Error的信息 指定的位置是35行寫入的問題,但是找了很 ...
  • 映射是鍵值對偶的集合。Scala有一個通用的叫法——元組:n個對象的聚集,並不一定要相同的類型。 構造映射 鍵A -> 值B scala> val scores = Map("wcc100"->100)//不可變映射 scores: scala.collection.immutable.Map[St ...
  • 以前在acm課上也講過一些關於背包的題,不過那些比較簡單,就是簡單的貪心問題,先排個序再處理就完了,而01背包,感覺就是比那個上了一個難度的問題,這個需要遍歷然後找其中合適的,簡單原理就是這樣。 例如:現在有容量為m的背包,還有重量為w,價值為v的k個不同的商品,問怎樣買才能使價值最大化? 思路:如 ...
  • 1、錯誤原因:系統函數與pcl中的max函數衝突導致的 2、兩種解決辦法: 1)錯誤中max和min函數用括弧括起來,例如"std::Max"修改為“(std::Max)”. 2) 在vs工程屬性中進行修改,“屬性-c++-預處理器-預處理器定義”中加入NOMINMAX ...
  • 概述 Tornado 是 FriendFeed 使用的可擴展的非阻塞式 web 伺服器及其相關工具的開源版本。這個 Web 框架看起來有些像web.py 或者 Google 的 webapp,不過為了能有效利用非阻塞式伺服器環境,這個 Web 框架還包含了一些相關的有用工具 和優化。 Tornado ...
  • 學習記錄 O(∩_∩)O 、 如果你恰巧路過,希望你能停下腳步瞧一瞧,不足之處望指出,感激不盡~ 使用工具: 1、eclipse 2、hibernate壓縮包(hibernate_4.3.11) 3、mysql 準備工作: 創建工程——>導入hibernate.jar包——>編寫 hibernate ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...