CyclicBarrier是什麼? CyclicBarrier具有什麼特性? CyclicBarrier與CountDownLatch的對比? ...
問題
(1)CyclicBarrier是什麼?
(2)CyclicBarrier具有什麼特性?
(3)CyclicBarrier與CountDownLatch的對比?
簡介
CyclicBarrier,迴環柵欄,它會阻塞一組線程直到這些線程同時達到某個條件才繼續執行。它與CountDownLatch很類似,但又不同,CountDownLatch需要調用countDown()方法觸發事件,而CyclicBarrier不需要,它就像一個柵欄一樣,當一組線程都到達了柵欄處才繼續往下走。
使用方法
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(()->{
System.out.println("before");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("after");
}).start();
}
}
}
這段方法很簡單,使用一個CyclicBarrier使得三個線程保持同步,當三個線程同時到達cyclicBarrier.await();
處大家再一起往下運行。
源碼分析
主要內部類
private static class Generation {
boolean broken = false;
}
Generation,中文翻譯為代,一代人的代,用於控制CyclicBarrier的迴圈使用。
比如,上面示例中的三個線程完成後進入下一代,繼續等待三個線程達到柵欄處再一起執行,而CountDownLatch則做不到這一點,CountDownLatch是一次性的,無法重置其次數。
主要屬性
// 重入鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件鎖,名稱為trip,絆倒的意思,可能是指線程來了先絆倒,等達到一定數量了再喚醒
private final Condition trip = lock.newCondition();
// 需要等待的線程數量
private final int parties;
// 當喚醒的時候執行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 當前這一代還需要等待的線程數
private int count;
通過屬性可以看到,CyclicBarrier內部是通過重入鎖的條件鎖來實現的,那麼你可以腦補一下這個場景嗎?
彤哥來腦補一下:假如初始時count = parties = 3
,當第一個線程到達柵欄處,count減1,然後把它加入到Condition的隊列中,第二個線程到達柵欄處也是如此,第三個線程到達柵欄處,count減為0,調用Condition的signalAll()通知另外兩個線程,然後把它們加入到AQS的隊列中,等待當前線程運行完畢,調用lock.unlock()的時候依次從AQS的隊列中喚醒一個線程繼續運行,也就是說實際上三個線程先依次(排隊)到達柵欄處,再依次往下運行。
以上純屬彤哥腦補的內容,真實情況是不是如此呢,且往後看。
構造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 初始化parties
this.parties = parties;
// 初始化count等於parties
this.count = parties;
// 初始化都到達柵欄處執行的命令
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
構造方法需要傳入一個parties變數,也就是需要等待的線程數。
await()方法
每個需要在柵欄處等待的線程都需要顯式地調用await()方法等待其它線程的到來。
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 調用dowait方法,不需要超時
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 當前代
final Generation g = generation;
// 檢查
if (g.broken)
throw new BrokenBarrierException();
// 中斷檢查
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count的值減1
int index = --count;
// 如果數量減到0了,走這段邏輯(最後一個線程走這裡)
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果初始化的時候傳了命令,這裡執行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 調用下一代方法
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 這個迴圈只有非最後一個線程可以走到
for (;;) {
try {
if (!timed)
// 調用condition的await()方法
trip.await();
else if (nanos > 0L)
// 超時等待方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 檢查
if (g.broken)
throw new BrokenBarrierException();
// 正常來說這裡肯定不相等
// 因為上面打破柵欄的時候調用nextGeneration()方法時generation的引用已經變化了
if (g != generation)
return index;
// 超時檢查
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// 調用condition的signalAll()將其隊列中的等待者全部轉移到AQS的隊列中
trip.signalAll();
// 重置count
count = parties;
// 進入下一代
generation = new Generation();
}
dowait()方法里的整個邏輯分成兩部分:
(1)最後一個線程走上面的邏輯,當count減為0的時候,打破柵欄,它調用nextGeneration()方法通知條件隊列中的等待線程轉移到AQS的隊列中等待被喚醒,併進入下一代。
(2)非最後一個線程走下麵的for迴圈邏輯,這些線程會阻塞在condition的await()方法處,它們會加入到條件隊列中,等待被通知,當它們喚醒的時候已經更新換“代”了,這時候返回。
圖解
學習過前面的章節,看這個圖很簡單了,看不懂的同學還需要把推薦的內容好好看看哦^^
總結
(1)CyclicBarrier會使一組線程阻塞在await()處,當最後一個線程到達時喚醒(只是從條件隊列轉移到AQS隊列中)前面的線程大家再繼續往下走;
(2)CyclicBarrier不是直接使用AQS實現的一個同步器;
(3)CyclicBarrier基於ReentrantLock及其Condition實現整個同步邏輯;
彩蛋
CyclicBarrier與CountDownLatch的異同?
(1)兩者都能實現阻塞一組線程等待被喚醒;
(2)前者是最後一個線程到達時自動喚醒;
(3)後者是通過顯式地調用countDown()實現的;
(4)前者是通過重入鎖及其條件鎖實現的,後者是直接基於AQS實現的;
(5)前者具有“代”的概念,可以重覆使用,後者只能使用一次;
(6)前者只能實現多個線程到達柵欄處一起運行;
(7)後者不僅可以實現多個線程等待一個線程條件成立,還能實現一個線程等待多個線程條件成立(詳見CountDownLatch那章使用案例);
推薦閱讀
3、死磕 java同步系列之JMM(Java Memory Model)
8、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖
9、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖
10、死磕 java同步系列之ReentrantLock VS synchronized
11、死磕 java同步系列之ReentrantReadWriteLock源碼解析
13、死磕 java同步系列之CountDownLatch源碼解析
15、死磕 java同步系列之StampedLock源碼解析
歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。