本文基於 jdk 1.8 。 CountDownLatch 的使用 "前面的文章" 中說到了 volatile 以及用 volatile 來實現自旋鎖,例如 java.util.concurrent.atomic 包下的工具類。但是 volatile 的使用場景畢竟有限,很多的情況下並不是適用,這個 ...
本文基於 jdk 1.8 。
CountDownLatch 的使用
前面的文章中說到了 volatile 以及用 volatile 來實現自旋鎖,例如 java.util.concurrent.atomic 包下的工具類。但是 volatile 的使用場景畢竟有限,很多的情況下並不是適用,這個時候就需要 synchronized 或者各種鎖實現了。今天就來說一下幾種鎖的實現原理。
先來看一個最簡單的 CountDownLatch 使用方法,例子很簡單,可以運行看一下效果。CountDownLatch 的作用是:當一個線程需要另外一個或多個線程完成後,再開始執行。比如主線程要等待一個子線程完成環境相關配置的載入工作,主線程才繼續執行,就可以利用 CountDownLatch 來實現。
例如下麵這個例子,首先實例化一個 CountDownLatch ,參數可以理解為一個計數器,這裡為 1,然後主線程執行,調用 worker 子線程,接著調用 CountDownLatch 的 await() 方法,表示阻塞主線程。當子線程執行完成後,在 finnaly 塊調用 countDown() 方法,表示一個等待已經完成,把計數器減一,直到減為 0,主線程又開始執行。
private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException{
System.out.println("主線程開始......");
Thread thread = new Thread(new Worker());
thread.start();
System.out.println("主線程等待......");
System.out.println(latch.toString());
latch.await();
System.out.println(latch.toString());
System.out.println("主線程繼續.......");
}
public static class Worker implements Runnable {
@Override
public void run() {
System.out.println("子線程任務正在執行");
try {
Thread.sleep(2000);
}catch (InterruptedException e){
}finally {
latch.countDown();
}
}
}
執行結果如下:
主線程開始......
子線程任務正在執行
主線程等待......
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 1]
java.util.concurrent.CountDownLatch@1d44bcfa[Count = 0]
主線程繼續.......
AQS 的原理
這麼好用的功能是怎麼實現的呢,下麵就來說一說實現它的核心技術原理 AQS。 AQS 全稱 AbstractQueuedSynchronizer
,是 java.util.concurrent 中提供的一種高效且可擴展的同步機制。它可以用來實現可以依賴 int 狀態的同步器,獲取和釋放參數以及一個內部FIFO等待隊列,除了CountDownLatch
,ReentrantLock
、Semaphore
等功能實現都使用了它。
接下來用 CountDownLatch 來分析一下 AQS 的實現。建議看文章的時候先大致看一下源碼,有助於理解下麵所說的內容。
在我們的方法中調用 awit()
和countDown()
的時候,發生了幾個關鍵的調用關係,我畫了一個方法調用圖。
首先在 CountDownLatch 類內部定義了一個 Sync 內部類,這個內部類就是繼承自 AbstractQueuedSynchronizer 的。並且重寫了方法 tryAcquireShared
和tryReleaseShared
。例如當調用 awit()
方法時,CountDownLatch 會調用內部類Sync 的 acquireSharedInterruptibly()
方法,然後在這個方法中會調用 tryAcquireShared
方法,這個方法就是 CountDownLatch 的內部類 Sync 里重寫的 AbstractQueuedSynchronizer 的方法。調用 countDown()
方法同理。
這種方式是使用 AbstractQueuedSynchronizer 的標準化方式,大致分為兩步:
1、內部持有繼承自 AbstractQueuedSynchronizer 的對象 Sync;
2、併在 Sync 內重寫 AbstractQueuedSynchronizer protected 的部分或全部方法,這些方法包括如下幾個:
之所以要求子類重寫這些方法,是為了讓使用者(這裡的使用者指 CountDownLatch 等)可以在其中加入自己的判斷邏輯,例如 CountDownLatch 在 tryAcquireShared
中加入了判斷,判斷 state 是否不為0,如果不為0,才符合調用條件。
tryAcquire
和tryRelease
是對應的,前者是獨占模式獲取,後者是獨占模式釋放。
tryAcquireShared
和tryReleaseShared
是對應的,前者是共用模式獲取,後者是共用模式釋放。
我們看到 CountDownLatch 重寫的方法 tryAcquireShared 實現如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
判斷 state 值是否為0,為0 返回1,否則返回 -1。state 值是 AbstractQueuedSynchronizer 類中的一個 volatile 變數。
private volatile int state;
在 CountDownLatch 中這個 state 值就是計數器,在調用 await 方法的時候,將值賦給 state 。
等待線程入隊
根據上面的邏輯,調用 await() 方法時,先去獲取 state 的值,當計數器不為0的時候,說明還有需要等待的線程在運行,則調用 doAcquireSharedInterruptibly 方法,進來執行的第一個動作就是嘗試加入等待隊列 ,即調用 addWaiter()方法, 源碼如下:
到這裡就走到了 AQS 的核心部分,AQS 用內部的一個 Node 類維護一個 CHL Node FIFO 隊列。將當前線程加入等待隊列,並通過 parkAndCheckInterrupt()方法實現當前線程的阻塞。下麵一大部分都是在說明 CHL 隊列的實現,裡面用 CAS 實現隊列出入不會發生阻塞。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入等待隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
// 進入 CAS 迴圈
try {
for (;;) {
//當一個節點(關聯一個線程)進入等待隊列後, 獲取此節點的 prev 節點
final Node p = node.predecessor();
// 如果獲取到的 prev 是 head,也就是隊列中第一個等待線程
if (p == head) {
// 再次嘗試申請 反應到 CountDownLatch 就是查看是否還有線程需要等待(state是否為0)
int r = tryAcquireShared(arg);
// 如果 r >=0 說明 沒有線程需要等待了 state==0
if (r >= 0) {
//嘗試將第一個線程關聯的節點設置為 head
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//經過自旋tryAcquireShared後,state還不為0,就會到這裡,第一次的時候,waitStatus是0,那麼node的waitStatus就會被置為SIGNAL,第二次再走到這裡,就會用LockSupport的park方法把當前線程阻塞住
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我看看到上面先執行了 addWaiter() 方法,就是將當前線程加入等待隊列,源碼如下:
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 嘗試快速入隊操作,因為大多數時候尾節點不為 null
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾節點為空(也就是隊列為空) 或者嘗試CAS入隊失敗(由於併發原因),進入enq方法
enq(node);
return node;
}
上面是向等待隊列中添加等待者(waiter)的方法。首先構造一個 Node 實體,參數為當前線程和一個mode,這個mode有兩種形式,一個是 SHARED ,一個是 EXCLUSIVE,請看上面的代碼。然後執行下麵的入隊操作 addWaiter,和 enq() 方法的 else 分支操作是一樣的,這裡的操作如果成功了,就不用再進到 enq() 方法的迴圈中去了,可以提高性能。如果沒有成功,再調用 enq() 方法。
private Node enq(final Node node) {
// 死迴圈+CAS保證所有節點都入隊
for (;;) {
Node t = tail;
// 如果隊列為空 設置一個空節點作為 head
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//加入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
說明:迴圈加 CAS 操作是實現樂觀鎖的標準方式,CAS 是為了實現原子操作而出現的,所謂的原子操作指操作執行期間,不會受其他線程的干擾。Java 實現的 CAS 是調用 unsafe 類提供的方法,底層是調用 c++ 方法,直接操作記憶體,在 cpu 層面加鎖,直接對記憶體進行操作。
上面是 AQS 等待隊列入隊方法,操作在無限迴圈中進行,如果入隊成功則返回新的隊尾節點,否則一直自旋,直到入隊成功。假設入隊的節點為 node ,上來直接進入迴圈,在迴圈中,先拿到尾節點。
1、if 分支,如果尾節點為 null,說明現在隊列中還沒有等待線程,則嘗試 CAS 操作將頭節點初始化,然後將尾節點也設置為頭節點,因為初始化的時候頭尾是同一個,這和 AQS 的設計實現有關, AQS 預設要有一個虛擬節點。此時,尾節點不在為空,迴圈繼續,進入 else 分支;
2、else 分支,如果尾節點不為 null, node.prev = t ,也就是將當前尾節點設置為待入隊節點的前置節點。然後又是利用 CAS 操作,將待入隊的節點設置為隊列的尾節點,如果 CAS 返回 false,表示未設置成功,繼續迴圈設置,直到設置成功,接著將之前的尾節點(也就是倒數第二個節點)的 next 屬性設置為當前尾節點,對應 t.next = node 語句,然後返回當前尾節點,退出迴圈。
setHeadAndPropagate 方法負責將自旋等待或被 LockSupport 阻塞的線程喚醒。
private void setHeadAndPropagate(Node node, int propagate) {
//備份現在的 head
Node h = head;
//搶到鎖的線程被喚醒 將這個節點設置為head
setHead(node)
// propagate 一般都會大於0 或者存在可被喚醒的線程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 只有一個節點 或者是共用模式 釋放所有等待線程 各自嘗試搶占鎖
if (s == null || s.isShared())
doReleaseShared();
}
}
Node 對象中有一個屬性是 waitStatus ,它有四種狀態,分別是:
//線程已被 cancelled ,這種狀態的節點將會被忽略,並移出隊列
static final int CANCELLED = 1;
// 表示當前線程已被掛起,並且後繼節點可以嘗試搶占鎖
static final int SIGNAL = -1;
//線程正在等待某些條件
static final int CONDITION = -2;
//共用模式下 無條件所有等待線程嘗試搶占鎖
static final int PROPAGATE = -3;
等待線程被喚醒
當執行 CountDownLatch 的 countDown()方法,將計數器減一,也就是state減一,當減到0的時候,等待隊列中的線程被釋放。是調用 AQS 的 releaseShared 方法來實現的,下麵代碼中的方法是按順序調用的,摘到了一起,方便查看:
// AQS類
public final boolean releaseShared(int arg) {
// arg 為固定值 1
// 如果計數器state 為0 返回true,前提是調用 countDown() 之前不能已經為0
if (tryReleaseShared(arg)) {
// 喚醒等待隊列的線程
doReleaseShared();
return true;
}
return false;
}
// CountDownLatch 重寫的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 依然是迴圈+CAS配合 實現計數器減1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/// AQS類
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果節點狀態為SIGNAL,則他的next節點也可以嘗試被喚醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 將節點狀態設置為PROPAGATE,表示要向下傳播,依次喚醒
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
因為這是共用型的,當計數器為 0 後,會喚醒等待隊列里的所有線程,所有調用了 await() 方法的線程都被喚醒,併發執行。這種情況對應到的場景是,有多個線程需要等待一些動作完成,比如一個線程完成初始化動作,其他5個線程都需要用到初始化的結果,那麼在初始化線程調用 countDown 之前,其他5個線程都處在等待狀態。一旦初始化線程調用了 countDown ,其他5個線程都被喚醒,開始執行。
總結
1、AQS 分為獨占模式和共用模式,CountDownLatch 使用了它的共用模式。
2、AQS 當第一個等待線程(被包裝為 Node)要入隊的時候,要保證存在一個 head 節點,這個 head 節點不關聯線程,也就是一個虛節點。
3、當隊列中的等待節點(關聯線程的,非 head 節點)搶到鎖,將這個節點設置為 head 節點。
4、第一次自旋搶鎖失敗後,waitStatus 會被設置為 -1(SIGNAL),第二次再失敗,就會被 LockSupport 阻塞掛起。
5、如果一個節點的前置節點為 SIGNAL 狀態,則這個節點可以嘗試搶占鎖。