# 前言 在面試這一篇我們介紹過[CountDownLatch和CyclicBarrier](https://github.com/jmilktea/jtea/blob/master/%E9%9D%A2%E8%AF%95/CountDownLatch%E5%92%8CCyclicBarrier.md ...
前言
在面試這一篇我們介紹過CountDownLatch和CyclicBarrier,它們都是jdk1.5提供的多線程併發控制類,內部都是用AQS這個同步框架實現。
在我們的實際項目中,有很多場景是需要從資料庫查詢一批數據,多線池執行某些操作,並且要統計結果,我們對這個過程做了一些封裝,由於要統計結果,所以需要等所有任務都處理完成,我們用到了CountDownLatch實現同步。偽代碼如下:
ExecuteInstance ei = ExecuteInstance.build(myExecutor); //線程池
//迴圈
LoopShutdown.build("myTask").loop(() -> {
//不斷從數據獲取數據
List<Task> list = getFromDb();
//設置countdownlatch
ei.setCountDownSize(list.size());
list.forEach(item -> ei.execute(() -> {
//提交到線程池執行,並且統計
}));
//等待這一批做完
ei.await();
});
//內部使用了CountDownLatch await()
return ei.awaitResult();
代碼很簡單,容易理解。不過後來有同學提到每次都要setCountDownSize() + await() 這套組合太麻煩,能不能省略這兩步呢。另外也不夠靈活,有些場景不能提前知道要處理的數據總數,例如從迭代器遍曆數據,Iterator介面並沒有size方法可以獲取到總數。
那怎麼實現這個功能呢?就是本篇要介紹的Phaser。
Phaser原理
Phaser類是jdk7提供的,可重用的,同步的,在功能上和CountDownLatch,CyclicBarrier類似,但更加靈活的類。
"phaser" google翻譯一下是:"移相器"的意思,完全不知道是什麼~,不過"phase"是階段的意思,還是能從名字瞭解到一些信息。
Phaser運行機制:
-
Registration(註冊)
跟其他barrier不同,在phaser上註冊的parties會隨著時間的變化而變化。任務可以隨時註冊(使用方法register,bulkRegister註冊,或者由構造器確定初始parties),並且在任何抵達點可以隨意地撤銷註冊(方法arriveAndDeregister)。就像大多數基本的同步結構一樣,註冊和撤銷隻影響內部計數;不會創建更深的內部記錄,所以任務不能查詢他們是否已經註冊。(不過,可以通過繼承來實現類似的記錄)
可以動態的註冊是它的特點之一,我們知道CountDownLatch之類的在開始就需要指定一個計數,並且不能更改,而Phaser可以開始指定,也可以運行時更改。 -
Synchronization(同步機制)
和CyclicBarrier一樣,Phaser也可以重覆await。方法arriveAndAwaitAdvance的效果類似CyclicBarrier.await。phaser的每一代都有一個相關的phase number,初始值為0,當所有註冊的任務都到達phaser時phase+1,到達最大值(Integer.MAX_VALUE)之後清零。使用phase number可以獨立控制到達phaser和等待其他線程的動作,通過下麵兩種類型的方法:Arrival(到達機制) arrive和arriveAndDeregister方法記錄到達狀態。這些方法不會阻塞,但是會返回一個相關的arrival phase number;也就是說,phase number用來確定到達狀態。當所有任務都到達給定phase時,可以執行一個可選的函數,這個函數通過重寫onAdvance方法實現,通常可以用來控制終止狀態。重寫此方法類似於為CyclicBarrier提供一個barrierAction,但比它更靈活。
Waiting(等待機制) awaitAdvance方法需要一個表示arrival phase number的參數,並且在phaser前進到與給定phase不同的phase時返回。和CyclicBarrier不同,即使等待線程已經被中斷,awaitAdvance方法也會一直等待。中斷狀態和超時時間同樣可用,但是當任務等待中斷或超時後未改變phaser的狀態時會遭遇異常。如果有必要,在方法forceTermination之後可以執行這些異常的相關的handler進行恢復操作,Phaser也可能被ForkJoinPool中的任務使用,這樣在其他任務阻塞等待一個phase時可以保證足夠的並行度來執行任務。
-
Termination(終止機制)
可以用isTerminated方法檢查phaser的終止狀態。在終止時,所有同步方法立刻返回一個負值。在終止時嘗試註冊也沒有效果。當調用onAdvance返回true時Termination被觸發。當deregistration操作使已註冊的parties變為0時,onAdvance的預設實現就會返回true。也可以重寫onAdvance方法來定義終止動作。forceTermination方法也可以釋放等待線程並且允許它們終止。 -
Tiering(分層結構)
Phaser支持分層結構(樹狀構造)來減少競爭。註冊了大量parties的Phaser可能會因為同步競爭消耗很高的成本, 因此可以設置一些子Phaser來共用一個通用的parent。這樣的話即使每個操作消耗了更多的開銷,但是會提高整體吞吐量。在一個分層結構的phaser里,子節點phaser的註冊和取消註冊都通過父節點管理。子節點phaser通過構造或方法register、bulkRegister進行首次註冊時,在其父節點上註冊。子節點phaser通過調用arriveAndDeregister進行最後一次取消註冊時,也在其父節點上取消註冊。
這也是它的主要亮點之一,這一點很像ConcurrentHashMap(對HashTable)和LongAdder(對AtomicLong),通過分散熱點來降低資源競爭,提升併發效率。
-
Monitoring(狀態監控)
由於同步方法可能只被已註冊的parties調用,所以phaser的當前狀態也可能被任何調用者監控。在任何時候,可以通過getRegisteredParties獲取parties數,其中getArrivedParties方法返回已經到達當前phase的parties數。當剩餘的parties(通過方法getUnarrivedParties獲取)到達時,phase進入下一代。這些方法返回的值可能只表示短暫的狀態,所以一般來說在同步結構里並沒有啥卵用。
CountDownLatch和CyclicBarrier都非常簡單,從Phaser提供的api數量就可以看出為什麼說它更加靈活,show me the code,接下來我們通過幾個例子感受一下。
Phaser例子
例子1:子線程會等全部子線程達到後才開始執行,實現類似CyclicBarrier的效果。
@Test
public void test1() throws InterruptedException {
List<Runnable> list = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
final int j = i;
list.add(() -> System.out.println(j));
}
final Phaser phaser = new Phaser(); // "1" to register self
// create and start threads
int i = 0;
for (final Runnable task : list) {
i++;
final int j = i;
phaser.register();
new Thread(() -> {
try {
Thread.sleep(j * 1000);
} catch (InterruptedException e) {
}
//全部子線程到達後才開始執行
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}).start();
}
Thread.sleep(15000);
}
例子2:task會迴圈做3次,通過重寫onAdvance可以控制phaser結束的條件。
@Test
public void test2() throws InterruptedException {
//重覆做3次
int iterations = 3;
List<Runnable> list = Lists.newArrayList();
for (int i = 0; i < 2; i++) {
final int j = i;
list.add(() -> System.out.println(j));
}
final Phaser phaser = new Phaser() {
//每做一次,phase+1,該方法返回true,就會結束
protected boolean onAdvance(int phase, int registeredParties) {
return phase > iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : list) {
phaser.register();
new Thread(() -> {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
Thread.sleep(5000);
}
例子3:創建多個phaser,並關聯到父phaser上,就是上面提到的分層結構。
@Test
public void test3() {
Phaser parent = new Phaser(1);
Phaser phaser1 = new Phaser(parent);
Phaser phaser2 = new Phaser(parent);
for (int i = 0; i < 20; i++) {
final int j = i;
if (i < 10) {
phaser1.register();
new Thread(() -> {
try {
Thread.sleep(1000);
phaser1.arriveAndAwaitAdvance(); // await all creation
System.out.println(j);
} catch (InterruptedException e) {
}
}).start();
} else if (i < 20) {
phaser2.register();
new Thread(() -> {
try {
Thread.sleep(10000);
phaser2.arriveAndAwaitAdvance(); // await all creation
System.out.println(j);
} catch (InterruptedException e) {
}
}).start();
}
}
parent.arriveAndAwaitAdvance();
System.out.println("done");
}
例子4:使用Phaser改寫我們的代碼,如下:
//維護一個Phaser
public static ExecuteInstance buildWithPhaser(Executor executor) {
ExecuteInstance ei = new ExecuteInstance();
ei.executor = executor;
ei.phaser = new Phaser(1);
return ei;
}
//提交線程池前註冊一下
public void executeRR(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
phaser.register();
executor.execute(() -> executeStatistics(task, exceptionHandler, batch));
}
//執行後deregister一下
private void executeStatistics(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
ReturnResult result = ReturnResult.NONE;
try {
//任務處理
result = task.call();
} catch (Exception e) {
if (statistics) {
counter.incrException(batch);
}
if (exceptionHandler != null) {
//自定義異常處理
try {
exceptionHandler.accept(e);
} catch (Exception he) {
}
}
} finally {
phaser.arriveAndDeregister(); //deregister
if (statistics) {
if (ReturnResult.SUCCESS.equals(result)) {
counter.incrSuccess(batch);
} else if (ReturnResult.FAIL.equals(result)) {
counter.incrFail(batch);
} else if (ReturnResult.FILTER.equals(result)) {
counter.incrFilter(batch);
}
}
}
}
//等待結果
public ExecuteResult awaitResult() {
phaser.arriveAndAwaitAdvance();
return getExecuteResult();
}
使用就非常簡單了
ExecuteInstance ei = ExecuteInstance.buildWithPhaser(myExecutor); //線程池
//迴圈
LoopShutdown.build("myTask").loop(() -> {
//不斷從數據獲取數據
List<Task> list = getFromDb();
list.forEach(item -> ei.execute(() -> {
//提交到線程池執行,並且統計
}));
});
return ei.awaitResult();
總結
Phaser是jkd7後提供的同步工具類,它底層並沒有使用AQS同步工具。相比CountDownLatch等它提供了更豐富的功能,但也意味著它更複雜,需要更多的資源,一些簡單的場景CountDownLatch等工具類能滿足的就使用它們即可,考慮性能,還有靈活性時才考慮使用Phaser,如筆者的場景使用Phaser就更加適合。
更多分享,歡迎關註我的github:https://github.com/jmilktea/jtea