詳解Condition的await和signal等待/通知機制

来源:https://www.cnblogs.com/yuxiang1/archive/2019/11/29/11960691.html
-Advertisement-
Play Games

任何一個java對象都天然繼承於Object類,線上程間實現通信的往往會應用到Object的幾個方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()幾個方法實現等待/通知機制,同樣的, 在j... ...


本人免費整理了Java高級資料,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高併發分散式等教程,一共30G,需要自己領取。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q


1.Condition簡介

任何一個java對象都天然繼承於Object類,線上程間實現通信的往往會應用到Object的幾個方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()幾個方法實現等待/通知機制,同樣的, 在java Lock體系下依然會有同樣的方法實現等待/通知機制。從整體上來看Object的wait和notify/notify是與對象監視器配合完成線程間的等待/通知機制,而Condition與Lock配合完成等待通知機制,前者是java底層級別的,後者是語言級別的,具有更高的可控制性和擴展性。兩者除了在使用方式上不同外,在功能特性上還是有很多的不同:

  1. Condition能夠支持不響應中斷,而通過使用Object方式不支持;
  2. Condition能夠支持多個等待隊列(new 多個Condition對象),而Object方式只能支持一個;
  3. Condition能夠支持超時時間的設置,而Object不支持

參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:

針對Object的wait方法
  1. void await() throws InterruptedException:當前線程進入等待狀態,如果其他線程調用condition的signal或者signalAll方法並且當前線程獲取Lock從await方法返回,如果在等待狀態中被中斷會拋出被中斷異常;
  2. long awaitNanos(long nanosTimeout):當前線程進入等待狀態直到被通知,中斷或者超時;
  3. boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種,支持自定義時間單位
  4. boolean awaitUntil(Date deadline) throws InterruptedException:當前線程進入等待狀態直到被通知,中斷或者到了某個時間
針對Object的notify/notifyAll方法
  1. void signal():喚醒一個等待在condition上的線程,將該線程從等待隊列中轉移到同步隊列中,如果在同步隊列中能夠競爭到Lock則可以從等待方法中返回。
  2. void signalAll():與1的區別在於能夠喚醒所有等待在condition上的線程

2.Condition實現原理分析

2.1 等待隊列

要想能夠深入的掌握condition還是應該知道它的實現原理,現在我們一起來看看condiiton的源碼。創建一個condition對象是通過lock.newCondition(),而這個方法實際上是會new出一個ConditionObject對象,該類是AQS的一個內部類,有興趣可以去看看。前面我們說過,condition是要和lock配合使用的也就是condition和Lock是綁定在一起的,而lock的實現原理又依賴於AQS,自然而然ConditionObject作為AQS的一個內部類無可厚非。

我們知道在鎖機制的實現上,AQS內部維護了一個同步隊列,如果是獨占式鎖的話,所有獲取鎖失敗的線程的尾插入到同步隊列,同樣的,condition內部也是使用同樣的方式,內部維護了一個 等待隊列,所有調用condition.await方法的線程會加入到等待隊列中,並且線程狀態轉換為等待狀態。另外註意到ConditionObject中有兩個成員變數:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

 

這樣我們就可以看出來ConditionObject通過持有等待隊列的頭尾指針來管理等待隊列。主要註意的是Node類復用了在AQS中的Node類,其節點狀態和相關屬性可以去看,如果您仔細看完這篇文章對condition的理解易如反掌,對lock體系的實現也會有一個質的提升。Node類有這樣一個屬性:

//後繼節點
Node nextWaiter;

 

進一步說明,等待隊列是一個單向隊列,而在之前說AQS時知道同步隊列是一個雙向隊列。接下來我們用一個demo,通過debug進去看是不是符合我們的猜想:

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        });
        thread.start();
    }
}

 

這段代碼沒有任何實際意義,甚至很臭,只是想說明下我們剛纔所想的。新建了10個線程,沒有線程先獲取鎖,然後調用condition.await方法釋放鎖將當前線程加入到等待隊列中,通過debug控制當走到第10個線程的時候查看firstWaiter即等待隊列中的頭結點,debug模式下情景圖如下:

 

 

從這個圖我們可以很清楚的看到這樣幾點:
1. 調用condition.await方法後線程依次尾插入到等待隊列中,如圖隊列中的線程引用依次為Thread-0,Thread-1,Thread-2....Thread-8;
2. 等待隊列是一個單向隊列。通過我們的猜想然後進行實驗驗證,我們可以得出等待隊列的示意圖如下圖所示:

 

 

同時還有一點需要註意的是:我們可以多次調用lock.newCondition()方法創建多個condition對象,也就是一個lock可以持有多個等待隊列。而在之前利用Object的方式實際上是指在對象Object對象監視器上只能擁有一個同步隊列和一個等待隊列,而併發包中的Lock擁有一個同步隊列和多個等待隊列。示意圖如下:

 

 

如圖所示,ConditionObject是AQS的內部類,因此每個ConditionObject能夠訪問到AQS提供的方法,相當於每個Condition都擁有所屬同步器的引用。

2.2 await實現原理

當調用condition.await()方法後會使得當前獲取lock的線程進入到等待隊列,如果該線程能夠從await()方法返回的話一定是該線程獲取了與condition相關聯的lock。接下來,我們還是從源碼的角度去看,只有熟悉了源碼的邏輯我們的理解才是最深的。await()方法源碼為:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1\. 將當前線程包裝成Node,尾插入到等待隊列中
    Node node = addConditionWaiter();
    // 2\. 釋放當前線程所占用的lock,在釋放的過程中會喚醒同步隊列中的下一個節點
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 3\. 當前線程進入到等待狀態
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4\. 自旋等待獲取到同步狀態(即獲取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 5\. 處理被中斷的情況
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

 

代碼的主要邏輯請看註釋,我們都知道噹噹前線程調用condition.await()方法後,會使得當前線程釋放lock然後加入到等待隊列中,直至被signal/signalAll後會使得當前線程從等待隊列中移至到同步隊列中去,直到獲得了lock後才會從await方法返回,或者在等待時被中斷會做中斷處理。那麼關於這個實現過程我們會有這樣幾個問題:1. 是怎樣將當前線程添加到等待隊列中去的?2.釋放鎖的過程?3.怎樣才能從await方法退出?而這段代碼的邏輯就是告訴我們這三個問題的答案。具體請看註釋,在第1步中調用addConditionWaiter將當前線程添加到等待隊列中,該方法源碼為:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //將當前線程包裝成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        //尾插入
        t.nextWaiter = node;
    //更新lastWaiter
    lastWaiter = node;
    return node;
}

 

這段代碼就很容易理解了,將當前節點包裝成Node,如果等待隊列的firstWaiter為null的話(等待隊列為空隊列),則將firstWaiter指向當前的Node,否則,更新lastWaiter(尾節點)即可。就是通過尾插入的方式將當前線程封裝的Node插入到等待隊列中即可,同時可以看出等待隊列是一個不帶頭結點的鏈式隊列,之前我們學習AQS時知道同步隊列是一個帶頭結點的鏈式隊列,這是兩者的一個區別。將當前節點插入到等待對列之後,會使當前線程釋放lock,由fullyRelease方法實現,fullyRelease源碼為:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            //成功釋放同步狀態
            failed = false;
            return savedState;
        } else {
            //不成功釋放同步狀態拋出異常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

 

這段代碼就很容易理解了,調用AQS的模板方法release方法釋放AQS的同步狀態並且喚醒在同步隊列中頭結點的後繼節點引用的線程,如果釋放成功則正常返回,若失敗的話就拋出異常。到目前為止,這兩段代碼已經解決了前面的兩個問題的答案了,還剩下第三個問題,怎樣從await方法退出?現在回過頭再來看await方法有這樣一段邏輯:

while (!isOnSyncQueue(node)) {
    // 3\. 當前線程進入到等待狀態
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

 

很顯然,當線程第一次調用condition.await()方法時,會進入到這個while()迴圈中,然後通過LockSupport.park(this)方法使得當前線程進入等待狀態,那麼要想退出這個await方法第一個前提條件自然而然的是要先退出這個while迴圈,出口就只剩下兩個地方:1. 邏輯走到break退出while迴圈;2. while迴圈中的邏輯判斷為false。

再看代碼出現第1種情況的條件是當前等待的線程被中斷後代碼會走到break退出,第二種情況是當前節點被移動到了同步隊列中(即另外線程調用的condition的signal或者signalAll方法),while中邏輯判斷為false後結束while迴圈。總結下,就是當前線程被中斷或者調用condition.signal/condition.signalAll方法當前節點移動到了同步隊列後 ,這是當前線程退出await方法的前提條件。

當退出while迴圈後就會調用acquireQueued(node, savedState),這個方法在介紹AQS的底層實現時說過了,若感興趣的話可以去,該方法的作用是在自旋過程中線程不斷嘗試獲取同步狀態,直至成功(線程獲取到lock)。這樣也說明瞭退出await方法必須是已經獲得了condition引用(關聯)的lock。到目前為止,開頭的三個問題我們通過閱讀源碼的方式已經完全找到了答案,也對await方法的理解加深。await方法示意圖如下圖:

 

 

如圖,調用condition.await方法的線程必須是已經獲得了lock,也就是當前線程是同步隊列中的頭結點。調用該方法後會使得當前線程所封裝的Node尾插入到等待隊列中。

超時機制的支持

condition還額外支持了超時機制,使用者可調用方法awaitNanos,awaitUtil。這兩個方法的實現原理,基本上與AQS中的tryAcquire方法如出一轍,關於tryAcquire可以仔細閱讀。

不響應中斷的支持

要想不響應中斷可以調用condition.awaitUninterruptibly()方法,該方法的源碼為:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

 

這段方法與上面的await方法基本一致,只不過減少了對中斷的處理,並省略了reportInterruptAfterWait方法拋被中斷的異常。

2.3 signal/signalAll實現原理

調用condition的signal或者signalAll方法可以將等待隊列中等待時間最長的節點移動到同步隊列中,使得該節點能夠有機會獲得lock。按照等待隊列是先進先出(FIFO)的,所以等待隊列的頭節點必然會是等待時間最長的節點,也就是每次調用condition的signal方法是將頭節點移動到同步隊列中。我們來通過看源碼的方式來看這樣的猜想是不是對的,signal方法源碼為:

public final void signal() {
    //1\. 先檢測當前線程是否已經獲取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2\. 獲取等待隊列中第一個節點,之後的操作都是針對這個節點
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

 

signal方法首先會檢測當前線程是否已經獲取lock,如果沒有獲取lock會直接拋出異常,如果獲取的話再得到等待隊列的頭指針引用的節點,之後的操作的doSignal方法也是基於該節點。下麵我們來看看doSignal方法做了些什麼事情,doSignal方法源碼為:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //1\. 將頭結點從等待隊列中移除
        first.nextWaiter = null;
        //2\. while中transferForSignal方法對頭結點做真正的處理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

 

具體邏輯請看註釋,真正對頭節點做處理的邏輯在transferForSignal放,該方法源碼為:

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //1\. 更新狀態為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //2.將該節點移入到同步隊列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

 

關鍵邏輯請看註釋,這段代碼主要做了兩件事情1.將頭結點的狀態更改為CONDITION;2.調用enq方法,將該節點尾插入到同步隊列中,關於enq方法請看AQS的底層實現這篇文章。現在我們可以得出結論:調用condition的signal的前提條件是當前線程已經獲取了lock,該方法會使得等待隊列中的頭節點即等待時間最長的那個節點移入到同步隊列,而移入到同步隊列後才有機會使得等待線程被喚醒,即從await方法中的LockSupport.park(this)方法中返回,從而才有機會使得調用await方法的線程成功退出。signal執行示意圖如下圖:

 

 

signalAll

sigllAll與sigal方法的區別體現在doSignalAll方法上,前面我們已經知道doSignal方法只會對等待隊列的頭節點進行操作,,而doSignalAll的源碼為:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

 

該方法只不過時間等待隊列中的每一個節點都移入到同步隊列中,即“通知”當前調用condition.await()方法的每一個線程。

3. await與signal/signalAll的結合思考

文章開篇提到等待/通知機制,通過使用condition提供的await和signal/signalAll方法就可以實現這種機制,而這種機制能夠解決最經典的問題就是“生產者與消費者問題”,關於“生產者消費者問題”之後會用單獨的一篇文章進行講解,這也是面試的高頻考點。await和signal和signalAll方法就像一個開關控制著線程A(等待方)和線程B(通知方)。它們之間的關係可以用下麵一個圖來表現得更加貼切:

 

 

如圖,線程awaitThread先通過lock.lock()方法獲取鎖成功後調用了condition.await方法進入等待隊列,而另一個線程signalThread通過lock.lock()方法獲取鎖成功後調用了condition.signal或者signalAll方法,使得線程awaitThread能夠有機會移入到同步隊列中,當其他線程釋放lock後使得線程awaitThread能夠有機會獲取lock,從而使得線程awaitThread能夠從await方法中退出執行後續操作。如果awaitThread獲取lock失敗會直接進入到同步隊列。

3. 一個例子

我們用一個很簡單的例子說說condition的用法:

public class AwaitSignal {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new waiter());
        waiter.start();
        Thread signaler = new Thread(new signaler());
        signaler.start();
    }

    static class waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "當前條件不滿足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知條件滿足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

 

輸出結果為:

Thread-0當前條件不滿足等待
Thread-0接收到通知,條件滿足

開啟了兩個線程waiter和signaler,waiter線程開始執行的時候由於條件不滿足,執行condition.await方法使該線程進入等待狀態同時釋放鎖,signaler線程獲取到鎖之後更改條件,並通知所有的等待線程後釋放鎖。這時,waiter線程獲取到鎖,並由於signaler線程更改了條件此時相對於waiter來說條件滿足,繼續執行。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 最近我在做angularjs程式時遇到了一個問題 1.頁面有很多選擇框,一個選擇框裡面有眾多的選擇項,和一個預設選定的項,像下麵這樣(很多選擇框,不只一個): 2.眾多的選項要從後臺介面得到,預設項從另一個後臺介面得到,這就需要$promise.then()操作 3.而多個$promise.then ...
  • Helm是一款非常流行的k8s包管理工具。以前就一直想用它,但看到它產生的文件比k8s要複雜許多,就一直猶豫,不知道它的好處能不能抵消掉它的複雜度。但如果不用,而是用Kubectl來進行調式真的很麻煩。正好最近Helm3正式版出來了,比原來的Helm2簡單了不少,就決定還是試用一下。結果證明確實很復 ...
  • 慕課網 實戰班 就業班 2019年11月30號 更新資料整理 300套 只讀模式打開 百度網盤資料鏈接: 鏈接:https://pan.baidu.com/s/1qORPsgM6ukDPOSjU5ck5yA 提取碼:qnlu 複製這段內容後打開百度網盤手機App,操作更方便哦 微雲鏈接: https ...
  • 一、介面的作用 1.可以使項目分層,所有層都面向介面開發,開發效率提高了。 2.介面使代碼和代碼之間的耦合度降低,就像記憶體條和主板的關係,變得“可插拔”,可以隨意切換。 ​總結:介面和抽象類能夠完成某個功能,優先選擇介面。因為介面可以多實現、​多繼承。並且一個類除了實現介面之外,還可以去繼承其他類( ...
  • 整合思路 將工程的三層結構中的JavaBean分別使用Spring容器(通過XML方式)進行管理。 整合持久層mapper,包括數據源、會話工程及mapper代理對象的整合; 整合業務層Service,包括事務及service的bean的配置; 整合表現層Controller,直接使用springm ...
  • AQS是併發編程中非常重要的概念,它是juc包下的許多併發工具類,如CountdownLatch,CyclicBarrier,Semaphore 和鎖, 如ReentrantLock, ReaderWriterLock的實現基礎,提供了一個基於int狀態碼和隊列來實現的併發框架。本文將對AQS框架的 ...
  • ``` # 迴文單詞是從左到右和從右到左讀相同的單詞。 # 例如:“detartrated”和“evitative”是迴文 str_in = input('Input:') # 方法一 count = 0 for i in range(len(str_in)): if str_in[i] == st... ...
  • 一、效果圖 二、具體效果實現代碼 1 public static void main(String[] args) { 2 int[][] array = new int[10][10]; 3 int num = 1; 4 for(int i=0;i<array.length;i++){ 5 6 i ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...