### AQS的定義 隊列同步器 AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步組件的基礎框架,它使用了一個 int 成員變數表示同步狀態,通過內置的 FIFO 隊列來完成資源獲取線程的排隊工作,併發包的作者(Doug Lea)期望它能夠成為實現 ...
AQS的定義
隊列同步器 AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步組件的基礎框架,它使用了一個 int 成員變數表示同步狀態,通過內置的 FIFO 隊列來完成資源獲取線程的排隊工作,併發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。
隊列同步器的介面與示例
同步器的設計是基於模板方法模式的,也就是說,使用者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。重寫同步器指定的方法時,需要使用同步器提供的如下 3 個方法來訪問或修改同步狀態。
-
getState():獲取當前同步狀態。
-
setState(int newState):設置當前同步狀態。
-
compareAndSetState(int expect,int update):使用 CAS 設置當前狀態,該方法能夠保證狀態設置的原子性。
同步器可重寫的方法與描述如下表所示。
方法名稱 | 描述信息 |
---|---|
protected boolean tryAcquire(int arg) | 獨占式獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然後再進行CAS設置同步狀態 |
protected boolean tryAcquire(int arg) | 獨占式釋放同步狀態,等待獲取同步狀態的線程將有機會獲取同步狀態。 |
protected int tryAcquireShared(int arg) | 共用式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之,失敗。 |
protected boolean tryReleaseShared(int arg) | 共用式釋放鎖。 |
protected boolean isHeldExclusively() | 判斷當前線程是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占。 |
實現自定義同步組件的同時,將會調用同步器提供的模板方法,這些(部分)模板方法與描述下表所示。
方法名稱 | 描述 |
---|---|
public final void acquire(int arg) | 獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法將會調用重寫的tryAcquire(int arg)方法。 |
public final void acquireInterruptibly(int arg) | 與acquire(int arg) 相同,但是該方法響應中斷,當前線程未獲取到同步狀態而進入同步隊列中,如果當前線程被中斷,則該方法會拋出InterruptedException 異常。 |
public final boolean tryAcquireNanos(int arg, long nanosTimeout) | 在acquireInterruptibly(int arg) 的基礎上增加了超時限制,如果當前線程在超時時間內沒有獲取同步狀態,那麼會返回false,如果獲取到了返回true。 |
public final void acquireShared(int arg) | 共用式的獲取同步狀態,如果當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨占式獲取鎖的主要區別是在同一時刻可以有多個線程獲取到同步狀態。 |
public final void acquireSharedInterruptibly(int arg) | 與acquireInterruptibly(int arg) 方法相同,該方法響應中斷。 |
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 與acquireShared(int arg) 相同,增加了超時限制。 |
public final boolean release(int arg) | 獨占式的釋放同步狀態,該方法會在釋放同步狀態之後,將同步隊列中第一個節點包含的線程喚醒。 |
public final boolean releaseShared(int arg) | 共用式釋放同步狀態。 |
public final Collection |
獲得在同步隊列上的線程集合。 |
同步器提供的模板方法基本上分為 3 類:獨占式獲取與釋放同步狀態、共用式獲取與釋放同步狀態和查詢同步隊列中的等待線程情況。自定義同步組件將使用同步器提供的模板方法來實現自己的同步語義。
自定義獨占鎖代碼示例
通過重寫模板模式的鉤子方法實現自定義獨占鎖。
class Mutex implements Lock {
// 內部靜態類自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否處於占用狀態
@Override
protected boolean isHeldExclusively() {
return this.getState() == 1;
}
// 當狀態為0時獲取到鎖
@Override
protected boolean tryAcquire(int arg) {
if (this.compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//釋放鎖,將鎖狀態設置為0
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0){
throw new IllegalMonitorStateException();
}
this.setExclusiveOwnerThread(null);
this.setState(0);
return true;
}
//返回一個Condition,每個condition都包含了一個condition隊列
Condition newCondition(){
return new ConditionObject();
}
}
// 僅需將操作代理到Sync上面。
private Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
上述示例中,獨占鎖 Mutex 是一個自定義同步組件,它在同一時刻只允許一個線程占有鎖。Mutex 中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨占式獲取和釋放同步狀態。在 tryAcquire(int acquires)方法中,如果經過 CAS 設置成功(同步狀態設置為 1),則代表獲取了同步狀態,而在 tryRelease(int releases)方法中只是將同步狀態重置為 0。用戶使用 Mutex 時並不會直接和內部同步器的實現打交道,而是調用 Mutex提供的方法,在 Mutex 的實現中,以獲取鎖的 lock()方法為例,只需要在方法實現中調用同步器的模板方法 acquire(int args)即可,當前線程調用該方法獲取同步狀態失敗後會被加入到同步隊列中等待,這樣就大大降低了實現一個可靠自定義同步組件的門檻。
隊列同步器的實現分析
接下來將從實現角度分析同步器是如何完成線程同步的,主要包括:
-
同步隊列
-
獨占式同步狀態獲取與釋放、
-
共用式同步狀態獲取與釋放
-
超時獲取同步狀態等同步器的核心數據結構與模板方法。
1 同步隊列
同步器依賴內部的同步隊列(一個 FIFO 雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造成為一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。同步隊列中的節點(Node)用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和後繼節點,節點的屬性類型與名稱以及描述如下表所示。
屬性類型和名稱 | 描述 |
---|---|
int waitStatus | 用來表示當前節點在隊列中的狀態,包含以下狀態: 0 當一個Node被初始化的時候的預設值 CANCELLED 為1,表示線程獲取鎖的請求已經取消了 CONDITION 為-2,表示節點在等待隊列中,節點線程等待喚醒 PROPAGATE 為-3,當前線程處在SHARED情況下,該欄位才會使用 SIGNAL 為-1,表示線程已經準備好了,就等資源釋放了 |
Node prev | 前驅節點,當節點加入同步隊列時設置 |
Node next | 後續節點 |
Thread thread | 獲取同步狀態的線程 |
Node nextWaiter | 等待隊列中的後繼節點。如果當前節點是共用的,那麼這個欄位將是一個 SHARED常量,也就是說節點類型(獨占和共用)和等待隊列中的後繼節點共用同一個欄位 |
節點是構成同步隊列的基礎,同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態的線程將會成為節點加入該隊列的尾部,同步隊列的基本結構如下圖所示。
在上圖中,同步器包含了兩個節點類型的引用,一個指向頭節點,而另一個指向尾節點。試想一下,當一個線程成功地獲取了同步狀態(或者鎖),其他線程將無法獲取到同步狀態,轉而被構造成為節點並加入到同步隊列中,而這個加入隊列的過程必須要保證線程安全,因此同步器提供了一個基於 CAS 的設置尾節點的方法:compareAndSetTail(Node expect,Node update),它需要傳遞當前線程“認為”的尾節點和當前節點,只有設置成功後,當前節點才正式與之前的尾節點建立關聯。
同步器將節點加入到同步隊列的過程如下圖所示。
同步隊列遵循 FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,而後繼節點將會在獲取同步狀態成功時將自己設置為首節點,該過程如下圖所示。
在上圖 中,設置首節點是通過獲取同步狀態成功的線程來完成的,由於只有一個線程能夠成功獲取到同步狀態,因此設置頭節點的方法並不需要使用 CAS 來保證,它只需要將首節點設置成為原首節點的後繼節點並斷開原首節點的 next 引用即可。
2.通過ReentrantLock理解AQS
ReentrantLock中公平鎖和非公平鎖在底層是相同的,這裡以非公平鎖為例進行分析。
在非公平鎖中,有一段這樣的代碼:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 首先嘗試獲取同步狀態
if (compareAndSetState(0, 1))
// 獲取成功,將當前線程設置為鎖獨占線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 獲取失敗,調用AQS模板方法acquire(int arg)
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
2.1 acquire(int arg)方法
接下來看看acquire的源碼,acquire方法在上面介紹了,他的功能是獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法將會調用重寫的tryAcquire(int arg)方法。這段代碼主要完成了同步狀態獲取、節點構造、加入同步隊列以及在同步隊列中自旋等待的相關工作,其主要邏輯是:首先調用自定義同步器實現的 tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態,如果同步狀態獲取失敗,則構造同步節點(獨占式 Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)並通過addWaiter(Node node) 方法將該節點加入到同步隊列的尾部,最後調用acquireQueued(Node node,int arg)方法,使得該節點以“死迴圈”的方式獲取同步狀態。如果獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.1.1 tryAcquire(arg)方法
先來看看ReentrantLock在非公平鎖中重寫的 tryAcquire(arg)方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
// 獲取當前鎖的同步狀態
int c = getState();
if (c == 0) {
// 如果同步狀態為0,嘗試獲取鎖同步狀態
if (compareAndSetState(0, acquires)) {
// 獲取鎖同步狀態成功,設置當前鎖占用線程為當前線程
setExclusiveOwnerThread(current);
// 返回獲取鎖成功
return true;
}
}
// 判斷當前鎖占用線程是否是當前線程,用來實現可重入鎖邏輯
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 獲取鎖失敗,調用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
return false;
}
上面這段代碼中可以得知如果當前線程調用tryAcquire(arg) 方法失敗後會繼續調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
2.1.2acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
下麵我們先看addWaiter(Node.EXCLUSIVE) 源碼。
private Node addWaiter(Node mode) {
// 通過當前線程和鎖模式(這裡是Node.EXCLUSIVE 獨占模式)封裝Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// pred 指針指向尾節點
Node pred = tail;
// 判斷指針指向的tail節點是否為null
if (pred != null) {
// 如果tail節點不為null,嘗試將node放入到同步隊列尾部。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
// 放入成功,放回node節點
return node;
}
}
// 尾節點設置失敗或者說tail為null,調用enq方法
enq(node);
return node;
}
/**
* 同步器通過“死迴圈”來保證節點的正確添加,在“死迴圈”中只有通過 CAS 將節點設置成為尾節點之後,當前線程才能從該方法返回
* 否則,當前線程不斷地嘗試設置。
*/
private Node enq(final Node node) {
for (;;) {
// t 指向尾節點tail
Node t = tail;
// 判斷 tail是否為null
if (t == null) { // Must initialize
// 如果為null嘗試創建同步隊列第一個節點(虛擬節點)
if (compareAndSetHead(new Node()))
// 創建虛擬節點成功,將AQS的尾節點指針也指向這個虛擬節點
tail = head;
} else {
// 將node節點加入到隊列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上述代碼通過使用 compareAndSetTail(Node expect,Node update)方法來確保節點能夠被線程安全添加。試想一下:如果使用一個普通的 LinkedList 來維護節點之間的關係,那麼當一個線程獲取了同步狀態,而其他多個線程由於調用 tryAcquire(int arg)方法獲取同步狀態失敗而併發地被添加到 LinkedList 時,LinkedList 將難以保證 Node 的正確添加,最終的結果可能是節點的數量有偏差,而且順序也是混亂的。
在 enq(final Node node)方法中,同步器通過“死迴圈”來保證節點的正確添加,在“死迴圈”中只有通過 CAS 將節點設置成為尾節點之後,當前線程才能從該方法返回,否則,當前線程不斷地嘗試設置。可以看出,enq(final Node node)方法將併發添加節點的請求通過 CAS 變得“串列化”了。
雙向鏈表中,第一個節點為虛節點,其實並不存儲任何信息,只是占位。真正的第一個有數據的節點,是在第二個節點開始的。
下麵繼續查看方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
節點進入同步隊列之後,就進入了一個自旋的過程,每個節點(或者說每個線程)都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中(並會阻塞節點的線程),代碼如下
final boolean acquireQueued(final Node node, int arg) {
// 標記是否成功拿到資源
boolean failed = true;
try {
// 標記等待過程中是否中斷過
boolean interrupted = false;
// 開始自旋,要麼獲取鎖,要麼中斷
for (;;) {
// 獲取當前節點的前驅節點
final Node p = node.predecessor();
// 如果p是頭結點,說明當前節點在真實數據隊列的首部,就嘗試獲取鎖(別忘了頭結點是虛節點)
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功,頭指針移動到當前node
setHead(node);
//斷開了p節點與後繼節點之間的引用關係以便在適當的時候回收記憶體。
p.next = null; // help GC
failed = false;
return interrupted;
}
// 說明p為頭節點且當前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限迴圈浪費資源。具體兩個方法下麵細細分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
註:setHead方法是把當前節點置為虛節點,但並沒有修改waitStatus,因為它是一直需要用的數據。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 將當前節點設置為虛節點
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 靠前驅節點判斷當前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取頭結點的節點狀態
int ws = pred.waitStatus;
// 說明頭結點處於喚醒狀態
if (ws == Node.SIGNAL)
return true;
// 通過枚舉值我們知道waitStatus>0是取消狀態
if (ws > 0) {
do {
// 迴圈向前查找取消節點,把取消節點從隊列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 設置前任節點等待狀態為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// parkAndCheckInterrupt主要用於掛起當前線程,阻塞調用棧,返回當前線程的中斷狀態。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2.2.3 總結:ReentrantLock非公平鎖的lock()的流程如下:
3.如何解鎖
public void unlock() {
sync.release(1);
}
調用AQS的release(int arg)方法
public final boolean release(int arg) {
// 調用ReentrantLock 重寫的tryRelease(arg)方法
if (tryRelease(arg)) {
// 釋放鎖成功,獲取頭結點
Node h = head;
if (h != null && h.waitStatus != 0)
//頭結點不為空並且頭結點的waitStatus不是初始化節點情況,解除線程掛起狀態
unparkSuccessor(h);
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
// 減去可重入次數
int c = getState() - releases;
// 調用該方法線程不去當前獲取鎖線程拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 判斷c是否為0
if (c == 0) {
free = true;
//將獨占線程設置為null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
參考:
書籍 《Java併發編程的藝術》
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html