本文將主要講述 的內部結構和實現邏輯,在看本文之前最好先瞭解一下 隊列鎖, 就是根據 隊列鎖的變種實現的,因為本身 比較複雜不容易看清楚他本身的實現邏輯,所以查看 隊列鎖的實現,可以幫助我們理清楚他內部的關係;關於隊列鎖的內容可以參考 , "CLH、MCS 隊列鎖簡介" ; 一、AQS 結構概述 在 ...
本文將主要講述 AbstractQueuedSynchronizer
的內部結構和實現邏輯,在看本文之前最好先瞭解一下 CLH
隊列鎖,AbstractQueuedSynchronizer
就是根據 CLH
隊列鎖的變種實現的,因為本身 AQS
比較複雜不容易看清楚他本身的實現邏輯,所以查看 CLH
隊列鎖的實現,可以幫助我們理清楚他內部的關係;關於隊列鎖的內容可以參考 ,CLH、MCS 隊列鎖簡介 ;
一、AQS 結構概述
在 JDK 中除 synchronized
內置鎖外,其他的鎖和同步組件,基本可以分為:
- 面向用戶的邏輯部分(對於鎖而言就是 Lock interface);
- 面向底層的線程調度部分;
而 AbstractQueuedSynchronizer
即同步隊列則是 Doug Lea 大神為我們提供的底層線程調度的封裝;AQS
本身是根據 CLH
隊列鎖實現的,這一點在註釋中有詳細的介紹,CLH、MCS 隊列鎖簡介 ;
簡單來講,CLH
隊列鎖就是一個單項鏈表,想要獲取鎖的線程封裝為節點添加到尾部,然後阻塞檢查前任節點的狀態 (一定要註意是前任節點,因為這樣更容易實現取消、超時等功能,同時這也是選擇 CLH 隊列鎖的原因),而頭結點則是當前已經獲得鎖的線程,其主要作用是通知後繼節點(也就是說在沒有發生競爭的情況下,是不需要頭結點的,這一點後面會詳細分析);
而對於 AQS
的結構大致可以表述為:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
private transient volatile Node head; // 懶載入,只有在發生競爭的時候才會初始化;
private transient volatile Node tail; // 同樣懶載入;
private volatile int state; // 自定義的鎖狀態,可以用來表示鎖的個數,以實現互斥鎖和共用鎖;
}
這裡的可以直觀的看到鏈表結構的變化,其實next鏈表只是相當於遍歷的優化,而node節點的變化才是主要的更新;
1. Node 結構
static final class Node {
static final Node SHARED = new Node(); // 共用模式
static final Node EXCLUSIVE = null; // 互斥模式
static final int CANCELLED = 1; // 表示線程取消獲取鎖
static final int SIGNAL = -1; // 表示後繼節點需要被喚醒
static final int CONDITION = -2; // 表示線程位於條件隊列
static final int PROPAGATE = -3; // 共用模式下節點的最終狀態,確保在doReleaseShared的時候將共用狀態繼續傳播下去
/**
* 節點狀態(初始為0,使用CAS原則更新)
* 互斥模式:0,SIGNAL,CANCELLED
* 共用模式:0,SIGNAL,CANCELLED,PROPAGATE
* 條件隊列:CONDITION
*/
volatile int waitStatus;
volatile Node prev; // 前繼節點
volatile Node next; // 後繼節點
volatile Thread thread; // 取鎖線程
Node nextWaiter; // 模式標識,取值:SHARED、EXCLUSIVE
// Used by addWaiter,用於添加同隊隊列
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// Used by Condition,同於添加條件隊列
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
根據上面的代碼和註釋已經可以看到 AQS
為我們提供了兩種模式,獨占模式和共用模式(彼此獨立可以同時使用);其中:
AbstractQueuedSynchronizer.state
: 表示鎖的資源狀態,是我們上面所說的面向用戶邏輯的部分;Node.waitStatus
: 表示節點在隊列中的狀態,是面向底層線程調度的部分;
這兩個變數一定要分清楚,在後面的代碼中也很容易弄混;
2. AQS 運行邏輯
AQS 的運行邏輯可以簡單表述為:
如果你熟悉 synchronized
,應該已經發現他們的運行邏輯其實是差不多的,都用同步隊列和條件隊列,值得註意的是這裡的條件隊列和 Condition
一一對應,可能有多個;根據上圖可以將 AQS
提供的功能總結為:
- 同步狀態的原子性管理;
- 線程的阻塞與解除阻塞;
- 隊列的管理;
3. 入隊
因為獨占模式和共用模式彼此獨立可以同時使用,所以在入隊的時候需要首先指定 Node
的類型,同時入隊的時候有競爭的可能,所以需要 CAS 入隊;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // SHARED、EXCLUSIVE
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
代碼中註釋也說明瞭,此處快速嘗試入隊,是一種優化手段,因為就一般情況而言大多數時候是沒有競爭的;失敗後在迴圈入隊;
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 此時head和tail才初始化
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
而對於出隊則稍微複雜一點,獨占模式下直接出隊,因為沒有競爭;共用模式下,則需要 CAS 設置頭結點,因為可能對有多個節點同時出隊,同時還需要向後傳播狀態,保證後面的線程可以及時獲得鎖;此外還可能發生中斷或者異常出隊,此時則需要考慮頭尾的情況,保證不會影響隊列的結構;具體內容將會在源碼中一次講解;
二、獨占模式
1. 應用
public class Mutex implements Lock {
private final Sync sync = new Sync();
private static final int lock = 1;
private static final int unlock = 0;
@Override
public void lock() {
sync.acquire(lock);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(lock);
}
@Override
public void unlock() {
sync.release(unlock);
}
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively() {
return getState() == lock;
}
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(unlock, lock)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (getState() == unlock)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(unlock);
return true;
}
}
}
註意代碼中特意將 AbstractQueuedSynchronizer.state
取值定為lock\unlock
,主要是便於理解 state
的含義,在互斥鎖中可以任意取值,當然也可以是負數,但是一般情況下令其表示為鎖的資源數量(也就是0、1)和共用模式對比,比較容易理解;
2. 獲取鎖
對於獨占模式取鎖而言有一共有四中方式,
- tryAcquire: 快速嘗試取鎖,成功時返回true;這是獨占模式必須要重寫的方法,其他方式獲取鎖時,也會先嘗試快速獲取鎖;同時
tryAcquire
也就決定了,這個鎖時公平鎖/非公平鎖,可重入鎖/不重沖入鎖等;(比如上面的實例就是不可重入非公平鎖,具體分析以後還會詳細講解) - acquire: 不響應中斷,阻塞獲取鎖;
- acquireInterruptibly: 響應中斷,阻塞獲取鎖;
- tryAcquireNanos: 響應中斷,超時阻塞獲取鎖;
acquire 方法
流程圖:
源碼分析:
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 首先嘗試快速獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失敗後入隊,然後阻塞獲取
selfInterrupt(); // 最後如果取鎖的有中斷,則重新設置中斷
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 只要取鎖過程中有一次中斷,返回時都要重新設置中斷
for (;;) {
final Node p = node.predecessor(); // 一直阻塞到前繼節點為頭結點
if (p == head && tryAcquire(arg)) { // 獲取同步狀態
setHead(node); // 設置頭結點,此時頭部不存在競爭,直接設置
// next 主要起優化作用,並且在入隊的時候next不是CAS設置
// 也就是通過next不一定可以準確取到後繼節點,所以在喚醒的時候不能依賴next,需要反向遍歷
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 判斷並整理前繼節點
parkAndCheckInterrupt()) // 當迴圈最多第二次的時候,必然阻塞
interrupted = true;
}
} finally {
if (failed) // 異常時取消獲取
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) return true;
if (ws > 0) { // 大於0說明,前繼節點異常或者取消獲取,直接跳過;
do {
node.prev = pred = pred.prev; // 跳過pred並建立連接
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 標記後繼節點需要喚醒
}
return false;
}
其中 node.prev = pred = pred.prev;
相關的記憶體分析可以查看 JAVA 連等賦值問題;
acquireInterruptibly 方法
流程圖:
源碼分析:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException(); // 中斷退出
if (!tryAcquire(arg)) // 獲取同步狀態
doAcquireInterruptibly(arg); // 中斷獲取
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); // 加入隊尾
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && // 判斷並整理前繼節點
parkAndCheckInterrupt()) // 等待
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireNanos 方法
流程圖:
源碼分析:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L) return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) return false; // 超時退出
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3. 釋放鎖
釋放鎖時,判斷有後繼節點需要喚醒,則喚醒後繼節點,然後退出;有喚醒的後繼節點重新設置頭結點,並標記狀態
public final boolean release(int arg) {
if (tryRelease(arg)) { // 由用戶重寫,嘗試釋放
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒後繼節點
return true;
}
return false;
}
三、共用模式
1. 應用
public class ShareLock implements Lock {
private Syn sync;
public ShareLock(int count) { this.sync = new Syn(count); }
@Override
public void lock() { sync.acquireShared(1); }
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; }
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() { sync.releaseShared(1); }
@Override
public Condition newCondition() { throw new UnsupportedOperationException(); }
private static final class Syn extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 5854536238831876527L;
Syn(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
@Override
public int tryAcquireShared(int reduceCount) {
for (; ; ) {
int current = getState();
int newCount = current - reduceCount;
//如果新的狀態小於0 則返回值,則表示沒有鎖資源,直接返回
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
public boolean tryReleaseShared(int retrunCount) {
for (; ; ) {
int current = getState();
int newCount = current + retrunCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
}
上述代碼中的 AbstractQueuedSynchronizer.state
表示鎖的資源數,但是仍然是不可重入的;
2. 獲取鎖
同樣對於共用模式取鎖也有四中方式:
- tryAcquireShared: 快速嘗試取鎖,由用戶重寫
- acquireShared: 不響應中斷,阻塞獲取鎖;
- acquireSharedInterruptibly: 響應中斷,阻塞獲取鎖;
- tryAcquireSharedNanos: 響應中斷,超時阻塞獲取鎖;
tryAcquireShared 方法
@Override
public int tryAcquireShared(int reduceCount) {
for (; ; ) {
int current = getState();
int newCount = current - reduceCount;
//如果新的狀態小於0 則返回值,則表示沒有鎖資源,直接返回
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
需要註意的是 tryAcquireShared
方法是快速嘗試獲取鎖,並更新鎖狀態,如果失敗則必然鎖資源不足,返回負值;
acquireShared 方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 快速獲取失敗
doAcquireShared(arg); // 阻塞獲取鎖
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 設置頭結點,並是情況將信號傳播下去
p.next = null; // help GC
if (interrupted) selfInterrupt(); // 重新設置中斷狀態
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// propagate 表示線程獲取鎖後,共用鎖剩餘的鎖資源
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// propagate > 0 :表示還有剩餘的資源
// h.waitStatus < 0 : 表示後繼節點需要被喚醒
// 其餘還做了很多保守判斷,確保後面的節點能及時那到鎖
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 喚醒後繼節點
}
}
根據上面的代碼可以看到,共用模式和獨占模式獲取鎖的主要區別:
- 共用模式可以有多個鎖
- 設置頭結點的時候,同時還要將狀態傳播下去
其餘的思路和獨占模式差不多,他家可以自己看源碼;
3. 釋放鎖
同樣 tryReleaseShared
是由用戶自己重寫的,這裡需要註意的是如果不能確保釋放成功(因為共用模式釋放鎖的時候可能有競爭,所以可能失敗),則在外層 Lock
介面使用的時候,就需要額外處理;
@Override
public boolean tryReleaseShared(int retrunCount) {
for (; ; ) {
int current = getState();
int newCount = current + retrunCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
releaseShared 方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 嘗試取鎖成功,此時鎖資源已重新設置
doReleaseShared(); // 喚醒後繼節點
return true;
}
return false;
}
doReleaseShared
方法必然執行兩次,
- 第一次頭結點釋放鎖,然後喚醒後繼節點
- 第二次後繼設置頭結點
最終使得頭結點的狀態必然是 PROPAGATE
;
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
四、條件隊列
1. ConditionObject 結構
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
...
}
如代碼所示條件隊列是一個由 Node
組成的鏈表,註意這裡的鏈表不同於同步隊列,是通過 nextWaiter
連接的,在同步隊列中 nextWaiter
用來表示獨占和共用模式,所以區分條件隊列的方法就有兩個:
- Node.waitStatus = Node.CONDITION;
- Node.next = null & Node.prev= null;
2. await
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter(); // 添加節點到條件隊列
int savedState = fullyRelease(node); // 確保釋放鎖,並喚醒後繼節點
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // node 不在同步隊列中
LockSupport.park(this); // 阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
3. signal
public final void signal() {
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 從頭結點一次喚醒
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 將節點移動到同步節點中
(first = firstWaiter) != null);
}
因為篇幅有點長了,所以條件隊列講的也就相對簡單了一點,但是大體的思路還是講了;
總結
- AbstractQueuedSynchronizer 通過私有變數繼承方式使用
- 觀察 AbstractQueuedSynchronizer ,其實和 synchronized 的結構基本相同,但是 synchronized 還會自動根據使用情況進行鎖升級
- 此外本文的主要參考資料是《java 併發編程的藝術》,有興趣的可以自行查看;