前言 CountDownLatch是一個閉鎖實現,它可以使一個或者多個線程等待一組事件發生。它包含一個計數器,用來表示需要等待的事件數量,coutDown方法用於表示一個事件發生,計數器隨之遞減,而await方法等待計數器為0之前一直阻塞。它是基於AQS的共用鎖來實現的,其中使用了較多的AQS的方法 ...
前言
CountDownLatch是一個閉鎖實現,它可以使一個或者多個線程等待一組事件發生。它包含一個計數器,用來表示需要等待的事件數量,coutDown方法用於表示一個事件發生,計數器隨之遞減,而await方法等待計數器為0之前一直阻塞。它是基於AQS的共用鎖來實現的,其中使用了較多的AQS的方法,所以在這之前最好閱讀過AQS的源碼,不嫌棄也可以查看本人之前AQS的源碼分析,有些AQS方法沒有在之前分析過的這裡涉及到了會進行分析。
源碼
我們先看它的屬性和構造器,
// Sync為其內部類 private final Sync sync; // 唯一的一個構造器 // 構造參數count就是需要等待事件的數量 public CountDownLatch(int count) { // 為了保證count >= 0 if (count < 0) throw new IllegalArgumentException("count < 0"); // 構造sync this.sync = new Sync(count); }
現在來看內部類Sync,它繼承了AQS,實現了共用鎖方法,下麵來看其源碼,代碼行數不多很好理解
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // setState 為AQS更改其state變數的方法 // 將AQS state變數設置成count setState(count); } int getCount() { // AQS的獲取state鎖狀態值 return getState(); } // 嘗試獲取共用鎖 protected int tryAcquireShared(int acquires) { // 返回1表示此時鎖狀態值為0表示鎖已釋放 // -1表示此時鎖狀態值大於0,表示出於鎖定狀態 return (getState() == 0) ? 1 : -1; } // 嘗試釋放共用鎖(計數器遞減releases次) protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 等待鎖狀態值為0或者更改鎖狀態值成功 for (;;) { // 將state賦值給變數c int c = getState(); if (c == 0) // 此時鎖已清除 return false; // 遞減 int nextc = c-1; // 比較state的狀態值是否等於C,等於將state狀態值改為nextc if (compareAndSetState(c, nextc)) // 更改成功後,如果nextc為0則返回true return nextc == 0; } } }
await方法
await方法就是當state狀態值不為0時將當前線程阻塞,然後等待喚醒
public void await() throws InterruptedException { //調用的AQS獲取共用鎖可中斷方法 sync.acquireSharedInterruptibly(1); }
我們來看看AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 此方法調用的是CountDownLatch內部類Sync的方法 // 如果鎖狀態不為0,則執行doAcquireSharedInterruptibly方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
doAcquireSharedInterruptibly方法也是由AQS實現的
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 添加一個共用鎖節點到隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 直到線程被喚醒或者線程被中斷時跳出迴圈 for (;;) { // node節點的前驅節點 final Node p = node.predecessor(); if (p == head) { // 調用CountDownLatch內部類Sync的方法 // 如果鎖狀態值為0,則返回值大於0 int r = tryAcquireShared(arg); if (r >= 0) { // 當鎖狀態值為0,開始將note節點設置為頭節點並喚醒後繼節點 // 也就是隊列不斷的出列,然後喚醒後繼節點,後繼節點被喚醒後由於前驅節點被設置成頭節點,又會調用該方法進行後繼節點的喚醒 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } /* shouldParkAfterFailedAcquire用於清除已中斷/或者取消的線程以及判斷此次迴圈是否需要掛起線程 parkAndCheckInterrupt 掛機當前線程 shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 在AQS之前博文里分析過這裡就不再分析了 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) // 表示當前線程中斷,取消獲取鎖 // 之前分析過,略過源碼分析 cancelAcquire(node); } }
setHeadAndPropagate方法,主要作用是喚醒後繼節點線程
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 當前節點設置為頭節點,節點關聯的線程設置為空 setHead(node) if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 節點等待狀態為signal時,喚醒後繼節點線程 doReleaseShared(); } }
doReleaseShared很巧妙,噹噹前節點等待狀態為signal時,喚醒後繼節點線程
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 當前線程等待狀態為signal時表示後繼節點需要喚醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 表示h節點的狀態替換失敗,會再次迴圈判斷h節點的狀態 continue; // loop to recheck cases // 喚醒後繼節點 unparkSuccessor(h); } // 狀態為0時,將其改成PROPAGATE,更改失敗會再次迴圈判斷h節點的狀態
// 這種情況發生在一個線程調用await方法,節點的等待狀態還是初始值0未來得及被修改,剛好state被置為0然後調用了doReleaseShared方法
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
countDown方法
countDown方法遞減state值,當值為0時,依次喚醒等待的線程
public void countDown() { // 遞減一次state值,知道state為0時喚醒等待中的線程 sync.releaseShared(1); }
public final boolean releaseShared(int arg) { // 嘗試將state減去arg if (tryReleaseShared(arg)) { // state為0時喚醒線程 doReleaseShared(); return true; } return false; }
到此分析完畢。
總結
- 通過源碼知道CountDownLatch 不能像CyclicBarrier那樣使用完畢後還可以復用;
- CountDownLatch 是通過共用鎖來實現的,它的構造參數就是AQS state的值;
- 由於內部類繼承了AQS,所以它內部也是FIFO隊列,同時也一樣是前驅節點喚醒後繼節點。