10分鐘從源碼級別搞懂AQS(AbstractQueuedSynchronizer) ### 前言 上篇文章[15000字、6個代碼案例、5個原理圖讓你徹底搞懂Synchronized](https://juejin.cn/post/7272015112819556412)有說到synchroniz ...
10分鐘從源碼級別搞懂AQS(AbstractQueuedSynchronizer)
前言
上篇文章15000字、6個代碼案例、5個原理圖讓你徹底搞懂Synchronized有說到synchronized由object monitor實現的
object monitor中由cxq棧和entry list來實現阻塞隊列,wait set實現等待隊列,從而實現synchronized的等待/通知模式
而JDK中的JUC併發包也通過類似的阻塞隊列和等待隊列實現等待/通知模式
這篇文章就來講講JUC的基石AQS(AbstractQueuedSynchronizer)
需要瞭解的前置知識:CAS、volatile
如果不瞭解CAS可以看上篇講述synchronized的文章(鏈接在上面)
如果不瞭解volatile可以看這篇文章 5個案例和流程圖讓你從0到1搞懂volatile關鍵字
本篇文章以AQS為中心,深入淺出描述AQS中的數據結構、設計以及獲取、釋放同步狀態的源碼流程、Condition等
觀看本文大約需要10分鐘,可以帶著幾個問題去觀看
- 什麼是AQS,它是幹啥用的?
- AQS是使用什麼數據結構實現的?
- AQS獲取/釋放同步狀態是如何實現的?
- AQS除了具有synchronized的功能還擁有什麼其他特性?
- AQS如何去實現非公平鎖、公平鎖?
- 什麼是Condition?它跟AQS是什麼關係?
AQS數據結構
什麼是AQS呢?
AQS是一個同步隊列(阻塞隊列),是併發包中的基礎,很多併發包中的同步組件底層都使用AQS來實現,比如:ReentrantLock、讀寫鎖、信號量等等...
AQS有三個重要的欄位,分別是: head 頭節點、tail 尾節點、state 同步狀態
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//頭節點
private transient volatile Node head;
//尾節點
private transient volatile Node tail;
//同步狀態
private volatile int state;
}
頭尾節點很好理解,因為AQS本身就是個雙向鏈表,那麼state同步狀態是什麼?
AQS中使用同步狀態表示資源,然後使用CAS來獲取/釋放資源,比如設置資源為1,一個線程來嘗試獲取資源,由於同步狀態目前為1,於是該線程CAS替換同步狀態為0,成功後表示獲取到資源,之後其他線程再來獲取資源就無法獲取了(狀態為0),直到獲取資源的線程來釋放資源
上述獲取/釋放資源也可以理解成獲取/釋放鎖
同時三個欄位都被volatile修飾,用volatile來保證記憶體可見性,防止其他線程修改這些數據時當前線程無法感知
通過上面的描述,我們可以知道AQS大概長這樣
當某個線程獲取資源失敗時,會被構建成節點加入AQS中
節點Node是AQS中的內部類,Node中有些重要的欄位一起來看看
static final class Node {
//節點狀態
volatile int waitStatus;
//前驅節點
volatile Node prev;
//後繼節點
volatile Node next;
//當前節點所代表的線程
volatile Thread thread;
//等待隊列使用時的後繼節點指針
Node nextWaiter;
}
prev、next、thread應該都好理解
AQS同步隊列和等待隊列都使用這種節點,當等待隊列節點被喚醒出隊時,方便加入同步隊列
nextWaiter就是用於節點在等待隊列中指向下一個節點
waitStatus表示節點的狀態
狀態 | 說明 |
---|---|
INITIAL | 0 初始狀態 |
CANCELLED | 1 該節點對應的線程取消調度 |
SIGNAL | -1 該節點對應的線程阻塞,等待喚醒競爭資源 |
CONDITION | -2 該節點在等待(條件)隊列中,等待喚醒後從等待隊列出隊進入同步隊列競爭 |
PROPAGATE | -3 共用情況下,會喚醒後續所有共用節點 |
不太理解狀態不要緊,我們後文遇到再說
經過上面的描述,節點大概是長成這樣的
AQS中還有另外一個內部類ConditionObject
用於實現等待隊列/條件隊列,我們後文再來說說
AQS中可以分為獨占、共用模式,其中這兩種模式下還可以支持響應中斷、納秒級別超時
獨占模式可以理解為同一時間只有一個線程能夠獲取同步狀態
共用模式可以理解為可以有多個線程能夠獲取同步狀態,方法中常用shared
標識
方法中常用acquire
標識獲取同步狀態,release
標識釋放同步狀態
這些方法都是模板方法,規定流程,將具體的實現留給實現類去做(比如獲取同步狀態,該如何獲取交給實現類去實現)
獨占式
獨占式實際就是時刻上只允許一個線程獨占該資源,多線程競爭情況下也只能有一個線程獲取同步狀態成功
獲取同步狀態
不響應中斷的獨占獲取和響應中斷、超時的類似,我們以acquire
為例查看源碼
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
方法用於嘗試獲取同步狀態,參數arg表示獲取多少同步狀態,獲取成功返回true 則會退出方法,留給實現類去實現
addWaiter
addWaiter(Node.EXCLUSIVE) 構建獨占式節點,並用CAS+失敗重試的方式加入AQS的末尾
private Node addWaiter(Node mode) {
//構建節點
Node node = new Node(Thread.currentThread(), mode);
//尾節點不為空則CAS替換尾節點
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//尾節點為空或則CAS失敗執行enq
enq(node);
return node;
}
private Node enq(final Node node) {
//失敗重試
for (;;) {
Node t = tail;
//沒有尾節點 則CAS設置頭節點(頭尾節點為一個節點),否則CAS設置尾節點
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq
方法主要以自旋(中途不會進入等待模式)去CAS設置尾節點,如果AQS中沒有節點則頭尾節點為同一節點
由於添加到尾節點存在競爭,因此需要用CAS去替換尾節點
acquireQueued
acquireQueued
方法主要用於AQS隊列中的節點來自旋獲取同步狀態,在這個自旋中並不是一直執行的,而是會被park進入等待
final boolean acquireQueued(final Node node, int arg) {
//記錄是否失敗
boolean failed = true;
try {
//記錄是否中斷過
boolean interrupted = false;
//失敗重試
for (;;) {
//p 前驅節點
final Node p = node.predecessor();
//如果前驅節點為頭節點,並嘗試獲取同步狀態成功則返回
if (p == head && tryAcquire(arg)) {
//設置頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//失敗則設置下標記然後進入等待檢查中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果失敗則取消獲取
if (failed)
cancelAcquire(node);
}
}
在嘗試獲取同步狀態前有個條件p == head && tryAcquire(arg)
:前驅節點是頭節點
因此AQS中的節點獲取狀態是FIFO的
但即使滿足前驅節點是頭節點,並不一定就能獲取同步狀態成功,因為還未加入AQS的線程也可能嘗試獲取同步狀態,以此來實現非公平鎖
那如何實現公平鎖呢?
在嘗試獲取同步狀態前都加上這個條件就行了唄!
再來看看shouldParkAfterFailedAcquire
獲取同步狀態失敗後應該停放
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅節點狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前驅節點狀態是SIGNAL 說明前驅釋放同步狀態回來喚醒 直接返回
return true;
if (ws > 0) {
//如果前驅狀態大於0 說明被取消了,就一直往前找,找到沒被取消的節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//排在沒被取消的節點後面
pred.next = node;
} else {
//前驅沒被取消,而且狀態不是SIGNAL CAS將狀態更新為SIGNAL,釋放同步狀態要來喚醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
實際上是park前的一些準備
再來看看 parkAndCheckInterrupt
,用工具類進入等待狀態,被喚醒後檢查是否中斷
private final boolean parkAndCheckInterrupt() {
//線程進入等待狀態...
LockSupport.park(this);
//檢查是否中斷 (會清除中斷標記位)
return Thread.interrupted();
}
在acquireQueued
的中如果未獲取同步狀態並且拋出異常,最終會執行cancelAcquire
取消
當感知到中斷時返回true回去,來到第一層acquire
方法執行selfInterrupt
方法,自己中斷線程
acquire流程圖:
- 先嘗試獲取同步狀態失敗則CAS+失敗重試添加到AQS末尾
- 前驅節點為頭節點且獲取同步狀態成功則返回,否則進入等待狀態等待喚醒,喚醒後重試
- 在2期間發生異常取消當前節點
釋放同步狀態
先進行釋放同步狀態,成功後頭節點狀態不為0 喚醒下一個狀態不是被取消的節點
public final boolean release(int arg) {
//釋放同步狀態
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒下一個狀態不大於0(大於0就是取消)的節點
unparkSuccessor(h);
return true;
}
return false;
}
響應中斷
acquireInterruptibly
用於響應中斷的獲取同步狀態
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//查看是否被中斷,中斷拋出異常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly
與原過程類似,就是在被喚醒後檢查到被中斷時拋出中斷異常
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//被喚醒後檢查到被中斷時拋出中斷異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
響應中斷的獲取同步狀態被中斷時會直接拋出中斷異常,而不響應的是自己中斷
響應超時
響應超時的獲取同步狀態使用tryAcquireNanos
超時時間為納秒級別
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
可以看出響應超時同時也會響應中斷
doAcquireNanos
也與原過程類似
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//還有多久超時
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
//已超時
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
//大於1ms
nanosTimeout > spinForTimeoutThreshold)
//超時等待
LockSupport.parkNanos(this, nanosTimeout);
//響應中斷
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
響應超時在自旋期間會計算還有多久超時,如果大於1ms就等待對應的時間,否則就繼續自旋,同時響應中斷
共用
共用式就是允許多個線程同時獲取一定的資源,比如信號量、讀鎖就是用共用式實現的
其實共用式與獨占式流程類似,只是嘗試獲取同步狀態的實現不同
我們用個獲取同步狀態的方法來說明
共用式獲取同步狀態使用acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared
嘗試獲取同步狀態,參數arg表示獲取多少同步狀態,返回剩餘可獲取同步狀態的數量
如果剩餘可獲取同步狀態數量小於0 說明 未獲取成功進入doAcquireShared
private void doAcquireShared(int arg) {
//添加共用式節點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取前驅節點
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果前驅節點為頭節點 並且 獲取同步狀態成功 設置頭節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//獲取失敗進入會等待的自旋
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
響應中斷、超時等方法也與獨占式類似,只是有些設置細節不同
Condition
上文曾說過AQS充當阻塞(同步)隊列,Condition來充當等待隊列
AQS的內部類ConditionObject就是Condition的實現,它充當等待隊列,用欄位記錄頭尾節點
public class ConditionObject implements Condition{
//頭節點
private transient Node firstWaiter;
//尾節點
private transient Node lastWaiter;
}
節點之間使用nextWait指向下一個節點,形成單向鏈表
同時提供await
系列方法來讓當前線程進入等待,signal
系列方法來喚醒
public final void await() throws InterruptedException {
//響應中斷
if (Thread.interrupted())
throw new InterruptedException();
//添加到末尾 不需要保證原子性,因為能指向await一定是獲取到同步資源的
Node node = addConditionWaiter();
//釋放獲取的同步狀態
int savedState = fullyRelease(node);
int interruptMode = 0;
//不在同步隊列就park進入等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒後自旋獲取同步狀態
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//取消後清理
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await主要將節點添加到condition object末尾,釋放獲取的同步狀態,進入等待,喚醒後自旋獲取同步狀態
signal的主要邏輯在transferForSignal中
final boolean transferForSignal(Node node) {
//CAS修改節點狀態 失敗返回 變成取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//加入AQS末尾
Node p = enq(node);
int ws = p.waitStatus;
//CAS將節點狀態修改為SIGNAL 成功則喚醒節點
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal 主要把狀態從-2condition 修改為 0(失敗則取消節點), 然後加入AQS的末尾,最後再將狀態該為-1 signal,成功則喚醒節點
為什麼加入AQS末尾還是使用enq去CAS+失敗重試操作保證原子性呢?
因為ConditionObject允許有多個,也就一個AQS同步隊列可能對應多個Condition等待(條件)隊列
總結
本篇文章以AQS為核心,深入淺出的描述AQS實現的數據結構、設計思想、獲取/釋放同步資源的源碼級流程、Condition等
AQS使用頭尾節點來實現雙向隊列,提供同步狀態和獲取/釋放同步狀態的模板方法來實現阻塞(同步)隊列,並且這些欄位使用volatile修飾,保證可見性與讀取的場景配合,不需要保證原子性,在寫的場景下常用CAS保證原子性
AQS與Condition使用相同類型的節點,在AQS中節點維護成雙向鏈表,在Condition中節點維護成單向鏈表,節點除了維護指向關係,還需要記錄對應線程和節點狀態
AQS分為獨占式和共用式,使用獨占式時只允許一個線程獲取同步狀態,使用共用式時則允許多個線程獲取同步狀態;其中還提供響應中斷、等待超時的類似方法
獲取同步狀態:先嘗試獲取同步狀態,如果失敗則CAS+失敗重試的方式將節點添加到AQS末尾,等待被前驅節點喚醒;只有當前驅節點為頭節點並且獲取同步狀態成功才返回,否則進入等待,被喚醒後繼續嘗試(自旋);在此期間如果發生異常,在拋出異常前會取消該節點
釋放同步狀態:嘗試釋放同步狀態,成功後喚醒後繼未被取消的節點
在獲取同步狀態時,被喚醒後會檢查中斷標識,如果是響應中斷的則會直接拋出中斷異常,不響應的則是在最外層自己中斷
響應超時時,在自旋獲取同步狀態期間會計時,如果距離超時小於1ms就不進入等待的自旋,大於則再等待對應時間
AQS充當阻塞隊列,Condition充當它的等待隊列來實現等待/通知模式,AQS的內部類ConditionObject在await時會加入Condition末尾並釋放同步狀態進入等待隊列,在被喚醒後自旋(失敗會進入等待)獲取同步狀態;在single時會CAS的將condition頭節點並加入AQS尾部再去喚醒(因為一個AQS可能對應多個Condition因此要CAS保證原子性)
最後(不要白嫖,一鍵三連求求拉~)
本篇文章被收入專欄 由點到線,由線到面,深入淺出構建Java併發編程知識體系,感興趣的同學可以持續關註喔
本篇文章筆記以及案例被收入 gitee-StudyJava、 github-StudyJava 感興趣的同學可以stat下持續關註喔~
有什麼問題可以在評論區交流,如果覺得菜菜寫的不錯,可以點贊、關註、收藏支持一下~
關註菜菜,分享更多乾貨,公眾號:菜菜的後端私房菜
本文由博客一文多發平臺 OpenWrite 發佈!