生產環境中,存在需要等待多個線程都達到某種狀態後,才繼續運行的情景。併發工具CyclicBarrier就能夠完成這種功能。本篇從源碼方面,簡要分析CyclicBarrier的實現原理。 使用示例 執行結果如下: 可以看到線程1,2,3在同一個時間結束。 源碼分析 主要成員: CyclicBarrie ...
生產環境中,存在需要等待多個線程都達到某種狀態後,才繼續運行的情景。併發工具CyclicBarrier就能夠完成這種功能。本篇從源碼方面,簡要分析CyclicBarrier的實現原理。
使用示例
public class CyclicBarrierTest { public static void main(String[] args) { //屏障,阻攔3個線程 CyclicBarrier cyclicBarrier = new CyclicBarrier(3); new Thread(new Runnable() { @Override public void run() { System.out.println("線程1正在執行"); try { // 等待 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("線程1運行結束,時間: " + System.currentTimeMillis()); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println("線程2正在執行"); try { // 等待 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("線程2運行結束,時間: " + System.currentTimeMillis()); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println("線程3正在執行"); try { //線程3阻塞2秒,測試效果 Thread.sleep(2000); // 等待 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("線程3運行結束,時間: " + System.currentTimeMillis()); } }).start(); } }
執行結果如下:
線程1正在執行 線程2正在執行 線程3正在執行 線程1運行結束,時間: 1550324116837 線程3運行結束,時間: 1550324116837 線程2運行結束,時間: 1550324116837
可以看到線程1,2,3在同一個時間結束。
源碼分析
主要成員:
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private int count;
CyclicBarrier主要藉助重入鎖ReentrantLock和Condition實現。count初始值等於CyclicBarrier實例化指明的等待線程數量,用於等待線程計數。
主要方法await()
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); // 1 try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 2 if (index == 0) { // 3 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); // 4 return 0; } finally { if (!ranAction) breakBarrier(); // 5 } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); // 6 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); // 7 } }
- 對當前對象加鎖
- 每個線程獲得鎖,執行這部分代碼時,都把count - 1,記做index
- 如果index為0,執行第4步,代表CyclicBarrier屏障已經攔截了足夠數量(count)的線程,線程可以接著往下執行了。不為0,說明當前線程還沒有達到屏障CyclicBarrier攔截的數量,執行第6步
- 調用nextGeneration()方法,喚醒所有等待線程
- breakBarrier()確保一定能執行喚醒動作
- 調用Condition的await()方法,將當前線程放入等待隊列,釋放鎖
- 一定執行的釋放鎖動作。
nextGeneration()的代碼如下:
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
使用Condition的signalAll()方法,喚醒全部等待線程
說完CyclicBarrier的原理之後,再對本篇的使用示例做一下描述:
- 線程1開始執行,調用await()方法,獲得鎖。此時count為3,count--,故count為2,index為2,調用Condition.await()方法,線程1進入等待隊列,釋放鎖
- 線程2開始執行,過程與第一步相同,只是count減為1
- 線程3開始執行,獲得鎖,count減為0,達到攔截數量,調用nextGeneration()方法喚醒全部線程,釋放自己持有的鎖
- 線程1,2都被喚醒,根據鎖競爭結果,依次執行完await()方法,最後釋放鎖
- 3個線程再往下執行自己的run()方法