本文將介紹常用的線程間通信工具CountDownLatch、CyclicBarrier和Phaser的用法,並結合實例介紹它們各自的適用場景及相同點和不同點。 ...
原創文章,同步發自作者個人博客,轉載請在文章開頭處以超鏈接註明出處 http://www.jasongj.com/java/thread_communication/
CountDownLatch
CountDownLatch適用場景
Java多線程編程中經常會碰到這樣一種場景——某個線程需要等待一個或多個線程操作結束(或達到某種狀態)才開始執行。比如開發一個關發測試工具時,主線程需要等到所有測試線程均執行完成再開始統計總共耗費的時間,此時可以通過CountDownLatch輕鬆實現。
CountDownLatch實例
package com.test.thread;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int totalThread = 3;
long start = System.currentTimeMillis();
CountDownLatch countDown = new CountDownLatch(totalThread);
for(int i = 0; i < totalThread; i++) {
final String threadName = "Thread " + i;
new Thread(() -> {
System.out.println(String.format("%s\t%s %s", new Date(), threadName, "started"));
try {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
countDown.countDown();
System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
}).start();;
}
countDown.await();
long stop = System.currentTimeMillis();
System.out.println(String.format("Total time : %sms", (stop - start)));
}
}
執行結果
Sun Jun 19 20:34:31 CST 2016 Thread 1 started
Sun Jun 19 20:34:31 CST 2016 Thread 0 started
Sun Jun 19 20:34:31 CST 2016 Thread 2 started
Sun Jun 19 20:34:32 CST 2016 Thread 2 ended
Sun Jun 19 20:34:32 CST 2016 Thread 1 ended
Sun Jun 19 20:34:32 CST 2016 Thread 0 ended
Total time : 1072ms
可以看到,主線程等待所有3個線程都執行結束後才開始執行。
CountDownLatch主要介面分析
CountDownLatch工作原理相對簡單,可以簡單看成一個倒計時器,在構造方法中指定初始值,每次調用countDown()方法時講計數器減1,而await()會等待計數器變為0。CountDownLatch關鍵介面如下
- countDown() 如果當前計數器的值大於1,則將其減1;若當前值為1,則將其置為0並喚醒所有通過await等待的線程;若當前值為0,則什麼也不做直接返回。
- await() 等待計數器的值為0,若計數器的值為0則該方法返回;若等待期間該線程被中斷,則拋出InterruptedException並清除該線程的中斷狀態。
- await(long timeout, TimeUnit unit) 在指定的時間內等待計數器的值為0,若在指定時間內計數器的值變為0,則該方法返回true;若指定時間內計數器的值仍未變為0,則返回false;若指定時間內計數器的值變為0之前當前線程被中斷,則拋出InterruptedException並清除該線程的中斷狀態。
- getCount() 讀取當前計數器的值,一般用於調試或者測試。
CyclicBarrier
CyclicBarrier適用場景
在《當我們說線程安全時,到底在說什麼》一文中講過記憶體屏障,它能保證屏障之前的代碼一定在屏障之後的代碼之前被執行。CyclicBarrier可以譯為迴圈屏障,也有類似的功能。CyclicBarrier可以在構造時指定需要在屏障前執行await的個數,所有對await的調用都會等待,只到調用await的次數達到預定指,所有等待都會立即被喚醒。
從使用場景上來說,CyclicBarrier是讓多個線程互相等待某一事件的發生,然後同時被喚醒。而上文講的CountDownLatch是讓某一線程等待多個線程的狀態,然後該線程被喚醒。
CyclicBarrier實例
package com.test.thread;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int totalThread = 5;
CyclicBarrier barrier = new CyclicBarrier(totalThread);
for(int i = 0; i < totalThread; i++) {
String threadName = "Thread " + i;
new Thread(() -> {
System.out.println(String.format("%s\t%s %s", new Date(), threadName, " is waiting"));
try {
barrier.await();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
}).start();
}
}
}
執行結果如下
Sun Jun 19 21:04:49 CST 2016 Thread 1 is waiting
Sun Jun 19 21:04:49 CST 2016 Thread 0 is waiting
Sun Jun 19 21:04:49 CST 2016 Thread 3 is waiting
Sun Jun 19 21:04:49 CST 2016 Thread 2 is waiting
Sun Jun 19 21:04:49 CST 2016 Thread 4 is waiting
Sun Jun 19 21:04:49 CST 2016 Thread 4 ended
Sun Jun 19 21:04:49 CST 2016 Thread 0 ended
Sun Jun 19 21:04:49 CST 2016 Thread 2 ended
Sun Jun 19 21:04:49 CST 2016 Thread 1 ended
Sun Jun 19 21:04:49 CST 2016 Thread 3 ended
從執行結果可以看到,每個線程都不會在其它所有線程執行await()方法前繼續執行,而等所有線程都執行await()方法後所有線程的等待都被喚醒從而繼續執行。
CyclicBarrier主要介面分析
CyclicBarrier提供的關鍵方法如下
- await() 等待其它參與方的到來(調用await())。如果當前調用是最後一個調用,則喚醒所有其它的線程的等待並且如果在構造CyclicBarrier時指定了action,當前線程會去執行該action,然後該方法返回該線程調用await的次序(getParties()-1說明該線程是第一個調用await的,0說明該線程是最後一個執行await的),接著該線程繼續執行await後的代碼;如果該調用不是最後一個調用,則阻塞等待;如果等待過程中,當前線程被中斷,則拋出InterruptedException;如果等待過程中,其它等待的線程被中斷,或者其它線程等待超時,或者該barrier被reset,或者當前線程在執行barrier構造時註冊的action時因為拋出異常而失敗,則拋出BrokenBarrierException。
- await(long timeout, TimeUnit unit) 與await()唯一的不同點在於設置了等待超時時間,等待超時時會拋出TimeoutException。
- reset() 該方法會將該barrier重置為它的初始狀態,並使得所有對該barrier的await調用拋出BrokenBarrierException。
Phaser
Phaser適用場景
CountDownLatch和CyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能與r
CountDownLatch和CyclicBarrier有部分重疊,同時也提供了更豐富的語義和更靈活的用法。
Phaser顧名思義,與階段相關。Phaser比較適合這樣一種場景,一種任務可以分為多個階段,現希望多個線程去處理該批任務,對於每個階段,多個線程可以併發進行,但是希望保證只有前面一個階段的任務完成之後才能開始後面的任務。這種場景可以使用多個CyclicBarrier來實現,每個CyclicBarrier負責等待一個階段的任務全部完成。但是使用CyclicBarrier的缺點在於,需要明確知道總共有多少個階段,同時並行的任務數需要提前預定義好,且無法動態修改。而Phaser可同時解決這兩個問題。
Phaser實例
public class PhaserDemo {
public static void main(String[] args) throws IOException {
int parties = 3;
int phases = 4;
final Phaser phaser = new Phaser(parties) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("====== Phase : " + phase + " ======");
return registeredParties == 0;
}
};
for(int i = 0; i < parties; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
for(int phase = 0; phase < phases; phase++) {
System.out.println(String.format("Thread %s, phase %s", threadId, phase));
phaser.arriveAndAwaitAdvance();
}
});
thread.start();
}
}
}
執行結果如下
Thread 0, phase 0
Thread 1, phase 0
Thread 2, phase 0
====== Phase : 0 ======
Thread 2, phase 1
Thread 0, phase 1
Thread 1, phase 1
====== Phase : 1 ======
Thread 1, phase 2
Thread 2, phase 2
Thread 0, phase 2
====== Phase : 2 ======
Thread 0, phase 3
Thread 1, phase 3
Thread 2, phase 3
====== Phase : 3 ======
從上面的結果可以看到,多個線程必須等到其它線程的同一階段的任務全部完成才能進行到下一個階段,並且每當完成某一階段任務時,Phaser都會執行其onAdvance方法。
Phaser主要介面分析
Phaser主要介面如下
- arriveAndAwaitAdvance() 當前線程當前階段執行完畢,等待其它線程完成當前階段。如果當前線程是該階段最後一個未到達的,則該方法直接返回下一個階段的序號(階段序號從0開始),同時其它線程的該方法也返回下一個階段的序號。
- arriveAndDeregister() 該方法立即返回下一階段的序號,並且其它線程需要等待的個數減一,並且把當前線程從之後需要等待的成員中移除。如果該Phaser是另外一個Phaser的子Phaser(層次化Phaser會在後文中講到),並且該操作導致當前Phaser的成員數為0,則該操作也會將當前Phaser從其父Phaser中移除。
- arrive() 該方法不作任何等待,直接返回下一階段的序號。
- awaitAdvance(int phase) 該方法等待某一階段執行完畢。如果當前階段不等於指定的階段或者該Phaser已經被終止,則立即返回。該階段數一般由arrive()方法或者arriveAndDeregister()方法返回。返回下一階段的序號,或者返回參數指定的值(如果該參數為負數),或者直接返回當前階段序號(如果當前Phaser已經被終止)。
- awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)相當,唯一的不同在於若該線程在該方法等待時被中斷,則該方法拋出InterruptedException。
- awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)相當,區別在於如果超時則拋出TimeoutException。
- bulkRegister(int parties) 註冊多個party。如果當前phaser已經被終止,則該方法無效,並返回負數。如果調用該方法時,onAdvance方法正在執行,則該方法等待其執行完畢。如果該Phaser有父Phaser則指定的party數大於0,且之前該Phaser的party數為0,那麼該Phaser會被註冊到其父Phaser中。
- forceTermination() 強制讓該Phaser進入終止狀態。已經註冊的party數不受影響。如果該Phaser有子Phaser,則其所有的子Phaser均進入終止狀態。如果該Phaser已經處於終止狀態,該方法調用不造成任何影響。