最近一直在研究AQS的源碼,希望可以更深刻的理解AQS的實現原理。雖然網上有很多關於AQS的源碼分析,但是看完以後感覺還是一知半解。於是,我將自己的整個理解過程記錄下來了,希望對大家有所幫助。 基本原理 AQS是Java中鎖的基礎,主要由兩個隊列組成。一個隊列是同步隊列,另一個是條件隊列。 同步隊列 ...
最近一直在研究AQS的源碼,希望可以更深刻的理解AQS的實現原理。雖然網上有很多關於AQS的源碼分析,但是看完以後感覺還是一知半解。於是,我將自己的整個理解過程記錄下來了,希望對大家有所幫助。
基本原理
AQS是Java中鎖的基礎,主要由兩個隊列組成。一個隊列是同步隊列,另一個是條件隊列。
同步隊列的原理
- 同步隊列的隊列頭部是
head
,隊列尾部是tail
節點,head
節點是一個空節點,同步隊列是一個雙向鏈表,通過next
和prev
連接所有節點 - 所有的線程在競爭鎖的時候都會創建一個
Node
節點,線程與節點綁定在一起,(如果是同步鎖和排他鎖不同之處是通過nextWaiter
來區分的)並且添加到同步隊列的尾部 head
的第一個節點獲取鎖,其餘節點都需要等待被喚醒- 同步隊列中的節點會存在取消和
null
的情況(如:線程超時中斷、線程更新節點的中間態),被取消和null
的節點不能被喚醒,將會被視為無效節點 - 一個線程只能被有效的前驅節點(取消和
null
的節點除外)喚醒 - 持有鎖的線程只能是有一個,其他有效節點對應的線程都會被掛起
條件隊列的原理
- 一個同步隊列可以對應多個條件隊列
- 條件隊列是一個單向鏈表,通過
nextWaiter
來連接起來,條件隊列的頭節點是firstWaiter
,尾節點是lastWaiter
- 某個條件隊列中滿足條件的節點(被
signal
或signalAll
方法喚醒的節點)才會被轉移到同步隊列 - 條件隊列中的被轉移到同步隊列的節點是從頭節點開始,條件隊列中被阻塞的線程會添加到隊列的尾部
同步隊列的實現
首先,瞭解以下同步隊列中隊列的節點Node
的數據結構
static final class Node {
/** 共用鎖的標識 */
static final Node SHARED = new Node();
/** 排他鎖的標識 */
static final Node EXCLUSIVE = null;
/** 線程取消 */
static final int CANCELLED = 1;
/** 持有鎖的線程的後繼線程被掛起 */
static final int SIGNAL = -1;
/** 條件隊列標識 */
static final int CONDITION = -2;
/**
* 共用鎖情況下,通知所有其他節點
*/
static final int PROPAGATE = -3;
/**
* waitStatus的取值如下:
* SIGNAL(-1): 當前節點的後繼節點應該被掛起
* CANCELLED(1): 當前節點被取消
* CONDITION(-2): 當前節點在條件隊列
* PROPAGATE(-3): 釋放共用鎖時需要通知所有節點
* 0: 初始值
*
*/
volatile int waitStatus;
/**
* 前驅節點
*/
volatile Node prev;
/**
* 後繼節點
*/
volatile Node next;
/**
* 節點對應的線程
*/
volatile Thread thread;
/**
* 在共用鎖的情況下,該節點的值為SHARED
* 在排他鎖的情況下,該節點的值為EXCLUSIVE
* 在條件隊列的情況下,鏈接的是下一個等待條件的線程
*/
Node nextWaiter;
}
其次,我們來看一下同步隊列的鏈表結構
接著,我們根據同步隊列的原理來分析以下acquire
和release
需要做哪些事情:
實現acquire功能需要做的事情
- 創建一個Node節點
node
(該節點可能是排他鎖,也可以能是共用鎖) - 將
node
添加到同步隊列尾部,如果同步隊列為空(初始情況下),需要先創建一個空的頭節點,然後再添加到隊列的尾部 - 如果
node
的前驅節點是head
,說明node
是第一個節點,能夠獲取鎖,需要將head
修改成node
,釋放前驅節點的資源 - 如果
node
的前驅節點不是head
,說明獲取鎖失敗,需要檢測是否需要將node
綁定的線程掛起,分以下幾種情況:- 如果
node
的waitStatus
已經被設置為SIGNAL
表示需要被掛起 - 如果
node
的waitStatus
設置為CANCEL
表示該節點已經被取消,需要被去掉,並修改node
的prev
,直到鏈接上一個有效的節點為止 - 否則將
node
的waitStatus
設置為SIGNAL
,表示即將要被掛起
- 如果
- 如果需要將
node
綁定的線程掛起,則讓出CPU,直到當前驅節點來喚起node
才會開始繼續從步驟3
開始執行
與acquire功能相關的代碼
- acquire方法:獲取排他鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
-
tryAcquire(arg)
:對外提供的一個擴展方法,常用的鎖都要實現這個方法,具體實現與鎖相關 -
addWaiter(Node.EXCLUSIVE)
: 創建一個排他鎖節點,並將該節點添加到同步隊列尾部,代碼如下:
private Node addWaiter(Node mode) {
// 創建一個node,EXCLUSIVE類型
Node node = new Node(mode);
for (;;) {
// 獲取尾節點
Node oldTail = tail;
if (oldTail != null) {
// 設置即將成為尾節點的前驅
node.setPrevRelaxed(oldTail);
// CAS操作設置尾節點
if (compareAndSetTail(oldTail, node)) {
// 將新尾節點的前驅節點與新的尾節點關聯起來
oldTail.next = node;
// 返回添加的節點
// 這個節點現在不一定是尾節點,因為如果有多個線程調用這個方法時,
// 可能還有節點添加在這個節點後面
return node;
}
} else {
// 如果隊列為空,初始化頭節點
initializeSyncQueue();
}
}
}
acquireQueued
:同步隊列中的節點獲取排他鎖
final boolean acquireQueued(final Node node, int arg) {
try {
// 線程是否中斷
boolean interrupted = false;
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 如果前驅節點是頭節點,獲取鎖
if (p == head && tryAcquire(arg)) {
// 修改頭節點
setHead(node);
// 釋放頭節點的資源
p.next = null; // help GC
// 返回線程中斷的狀態
// 這也是該方法唯一的返回值
// 沒有獲取鎖的線程會一直執行該方法直到獲取鎖以後再返回
return interrupted;
}
// 獲取鎖失敗後是否需要將線程掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 線程掛起並返回是否被中斷
interrupted = true;
}
} catch (Throwable t) {
// 取消該節點
cancelAcquire(node);
throw t;
}
}
shouldParkAfterFailedAcquire
:檢測線程獲取鎖失敗以後是否需要被掛起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驅節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 狀態已經設置成SIGNAL,可以直接掛起該節點
*/
return true;
// 節點被取消
if (ws > 0) {
/*
* 找到pred第一個有效的前驅節點
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// pred可能是一個新的節點,需要將pred的next重寫設置為node
pred.next = node;
} else {
/*
* CAS操作將pred節點的狀態設置為SIGNAL
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
// 只有當pred節點的waitStatus已經是SIGNAL狀態時,才可以安全的掛起線程
// 否則需要不能被掛起
return false;
}
parkAndCheckInterrupt
:將當前線程掛起,並檢測當前線程是否中斷
private final boolean parkAndCheckInterrupt() {
// 線程掛起
LockSupport.park(this);
// 檢測線程是否中斷
return Thread.interrupted();
}
cancelAcquire
:取消節點
private void cancelAcquire(Node node) {
// 如果節點為空,什麼都不做
if (node == null)
return;
// 釋放線程
node.thread = null;
// 從後往前過濾掉所有的被取消的節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 有效前驅節點的nex節點
Node predNext = pred.next;
// 將node設置為CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果是尾節點,設置新的尾節點
if (node == tail && compareAndSetTail(node, pred)) {
// 將新的尾節點的後續設置為null
pred.compareAndSetNext(predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 如果前驅節點的線程不為null並且waitStatus為SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 將node設置成pred的後繼節點
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
// 喚起node節點的後繼節點
// 因為node節點已經釋放鎖了
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
unparkSuccessor
:喚醒後繼節點
private void unparkSuccessor(Node node) {
/*
* 獲取node節點的waitStatus
*/
int ws = node.waitStatus;
// 用CSA操作將waitStatus設置成初始狀態
// 不管設置是否成功,都無所謂,因為該節點即將被銷毀
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* 獲取node的後繼節點
*/
Node s = node.next;
// 如果後繼節點為null或者被取消,
// 通過從同步隊列的尾節點開始一直往前找到一個有效的後繼節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 如果後繼節點不為空
if (s != null)
LockSupport.unpark(s.thread);// 喚醒後繼節點的線程
}
與acquire
方法類似的還有acquireInterruptibly
、tryAcquireNanos
、acquireShared
、acquireSharedInterruptibly
和tryAcquireSharedNanos
,我們都一一分析以下
- acquireInterruptibly方法:獲取可中斷的排他鎖
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 如果線程中斷,直接返回
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg); // 中斷式的獲取鎖
}
doAcquireInterruptibly
:可中斷式的獲取鎖
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 創建一個排他節點加入同步隊列
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 如果前驅節點是頭節點,說明已經獲取的鎖
if (p == head && tryAcquire(arg)) {
// 修改頭節點
setHead(node);
p.next = null; // help GC
return;
}
// 如果沒有獲取鎖,檢測是否需要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 如果發現線程已經被中斷,需要拋出異常
}
} catch (Throwable t) {
// 發生異常取消節點
cancelAcquire(node);
throw t;
}
}
- tryAcquireNanos方法:超時中斷獲取排他鎖
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 線程中斷直接返回
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout); // 超時獲取排他鎖
}
doAcquireNanos
:超時獲取排他鎖
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果超時直接返回
if (nanosTimeout <= 0L)
return false;
// 獲取超時時長
final long deadline = System.nanoTime() + nanosTimeout;
// 添加一個排他節點到同步隊列尾部
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 已經獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果超時了就取消
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 檢測節點是否需要被掛起
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
// 如果需要掛起,且超時時長大於SPIN_FOR_TIMEOUT_THRESHOLD
// 線程掛起nanosTimeout時間
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
// 發生異常取消節點
cancelAcquire(node);
throw t;
}
}
- acquireShared方法:獲取共用鎖
public final void acquireShared(int arg) {
// 對外提供的一個擴展方法,常用的鎖都要實現這個方法,
// 該方法的實現與鎖的用途有關
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg); // 獲取共用鎖
}
doAcquireShared
:獲取共用鎖
private void doAcquireShared(int arg) {
// 添加一個共用節點到同步隊列尾部
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
if (p == head) {
// 返回結果大於等於0表示獲取共用鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
// 設置頭節點並廣播通知其他獲取共用鎖的節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 如果線程被中斷,將該線程中斷
// 共用鎖會被多個線程獲取,如果需要中斷
// 所有獲取共用鎖的線程都要被中斷
if (interrupted)
selfInterrupt();
return;
}
}
// 檢測是否需要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 掛起並中斷
interrupted = true;
}
} catch (Throwable t) {
// 發生異常取消節點
cancelAcquire(node);
throw t;
}
}
setHeadAndPropagate
:設置頭節點並廣播其他節點來獲取鎖
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 記錄舊的頭節點
setHead(node);// 設置新的頭節點
/*
* 如果頭節點為null或者是不是取消狀態,嘗試喚醒後繼節點
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// node節點的next是SHARED,即共用鎖
if (s == null || s.isShared())
// 喚起獲取共用鎖的線程
doReleaseShared();
}
}
doReleaseShared
:喚醒等待共用鎖的節點
private void doReleaseShared() {
/*
* 喚醒時是從頭節點開始先喚醒第一個共用節點,
* 第一個共用節點被喚醒後會在doAcquireShared方法里繼續執行(之前就是在這個方法里被掛起的)
* 第一個共用節點如果獲取鎖會調用setHeadAndPropagate方法修改頭節點,然後再調用doReleaseShared方法
* 喚醒第二個共用節點,以此類推,最後把所有的共用節點都喚醒
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
// 獲取頭節點的狀態
int ws = h.waitStatus;
// 如果頭節點是SIGNAL,需要將狀態設置為0,表示已經即將被喚醒
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // 如果失敗了說明有其他線程在修改頭節點,需要繼續重試
unparkSuccessor(h); // 喚醒頭節點的後繼節點
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // 將頭節點狀態從0設置成PROPAGATE,如果失敗了繼續,因為也有其他獲取共用鎖的線程在更改頭節點
}
// 如果頭節點未改變(因為沒有後繼節點需要等待共用鎖),跳出迴圈
if (h == head)
break;
}
}
selfInterrupt
:中斷當前線程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- acquireSharedInterruptibly方法:可中斷的獲取共用鎖
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 如果線程被中斷拋出異常
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); // 可中斷的方式獲取共用鎖
}
doAcquireSharedInterruptibly
:可中斷的方式後去共用鎖
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 添加共用鎖節點到同步隊列尾部
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取共用鎖以後修改頭節點,通知其他等待共用鎖的節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 線程獲取共用鎖失敗後需要掛起,並且發現線程被中斷,所以拋出異常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
// 發生異常取消節點
cancelAcquire(node);
throw t;
}
}
- tryAcquireSharedNanos方法:超時中斷獲取共用鎖
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted()) // 線程如果中斷了,直接拋出異常
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout); // 超時獲取共用鎖
}
doAcquireSharedNanos
:超時的方式獲取中斷鎖
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 超時直接返回
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 添加共用節點到同步隊列尾部
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取鎖,修改頭節點,通知所有其他等待共用鎖的節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
// 超時取消節點
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
// 如果需要掛起,且超時時長大於SPIN_FOR_TIMEOUT_THRESHOLD
// 線程掛起nanosTimeout時間
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException(); // 中斷了拋出異常
}
} catch (Throwable t) {
// 發生異常取消節點
cancelAcquire(node);
throw t;
}
}
實現release功能需要做的事情
- 釋放當前獲取鎖的線程持有的資源
- 喚醒有效的一個後繼節點
與release功能相關的代碼
- release方法:釋放排他鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 頭節點不能是一個中間態
if (h != null && h.waitStatus != 0)
// 喚醒後繼節點
unparkSuccessor(h);
return true;
}
return false;
}
- release方法:釋放共用鎖
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 釋放共用鎖,從頭節點開始一個一個的釋放
// 如果存在多個共用節點在同步隊列時,doReleaseShared方式其實是遞歸調用
doReleaseShared();
return true;
}
return false;
}
至此,將所有獲取鎖和釋放鎖的方法相關的源碼全部分析完
條件隊列的實現
我們來看一下條件隊列的鏈表結構
實現await功能需要做的事情
- 創建一個
CONDITION
類型的節點,將該節點添加到條件隊列 - 釋放已經獲取的鎖(因為只有當前線程先獲取了鎖才可能再調用
Condition.await()
方法) - 如果無法獲取鎖,線程掛起
與await功能相關的代碼
- await方法:等待條件
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 如果線程中斷,直接拋出異常
// 創建一個CONDITION類型的節點,將該節點添加到條件隊列尾部
Node node = addConditionWaiter();
// 釋放鎖
// 在調用await方法之前都會調用lock方法,這個時候已經獲取鎖了
// 有時候鎖還是可重入的,所以需要將所有的資源都釋放掉
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果節點不再同步隊列,全部都要掛起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果在等待期間發生過中斷(不管是調用signal之前還是之後),直接退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 讓線程嘗試去獲取鎖,如果無法獲取鎖就掛起
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除所有在條件隊列中是取消狀態的節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 發生中斷,上報中斷情況
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter
:在條件隊列中添加一個節點
private Node addConditionWaiter() {
Node t = lastWaiter;
// 清除條件隊列中無效的節點
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 創建一個節點
Node node = new Node(Node.CONDITION);
// 添加到條件隊列尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters
:清除在條件隊列中被取消的節點
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
// 遍歷條件隊列將所有不是CONDITION狀態的節點全部清除掉
// 這些節點都是取消狀態的節點
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
fullyRelease
:釋放線程持有的所有鎖資源
final int fullyRelease(Node node) {
try {
int savedState = getState();
// 釋放所有的資源
// 如果是可重入鎖,savedState就是重入的次數
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
// 發生異常就取消該節點
node.waitStatus = Node.CANCELLED;
throw t;
}
}
isOnSyncQueue
:判斷節點是否在同步隊列
final boolean isOnSyncQueue(Node node) {
// waitStatus是CONDITION或者node沒有前驅節點,說明node不在同步隊列
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // 有後繼節點一定在同步隊列
return true;
/*
* 在同步隊列中查找node,看是否在同步隊列中
*/
return findNodeFromTail(node);
}
findNodeFromTail
:在同步隊列中查找節點
private boolean findNodeFromTail(Node node) {
// 從尾節點開始查找
for (Node p = tail;;) {
if (p == node) // 找到了
return true;
if (p == null) // 找到頭了還沒找到
return false;
p = p.prev;
}
}
checkInterruptWhileWaiting
:檢測中斷的情況
private int checkInterruptWhileWaiting(Node node) {
// 沒有發生中斷返回0
// 調用signal之前發生中斷返回THROW_IE
// 調用signal之後發生中斷返回REINTERRUPT
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
transferAfterCancelledWait
:清除在條件隊列中被取消的節點
// 只有線程處於中斷狀態,才會調用此方法
// 如果需要的話,將這個已經取消等待的節點轉移到阻塞隊列
// 返回 true,如果此線程在 signal 之前被取消,否則返回false
final boolean transferAfterCancelledWait(Node node) {
// 用 CAS 將節點狀態設置為 0
// 如果這步 CAS 成功,說明是 signal 方法之前發生的中斷,
// 因為如果 signal 先發生的話,signal 中會將 waitStatus 設置為 0
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
enq(node); // 將節點放入阻塞隊列
return true;
}
// 到這裡是因為 CAS 失敗,肯定是因為 signal 方法已經將 waitStatus 設置為了 0
// signal 方法會將節點轉移到阻塞隊列,但是可能還沒完成,這邊自旋等待其完成
// 當然,這種事情還是比較少的吧:signal 調用之後,沒完成轉移之前,發生了中斷
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
enq
:把節點添加到同步隊列
private Node enq(Node node) {
// 無限迴圈,將節點添加到同步隊列尾部
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
// 如果同步隊列為空,初始化
initializeSyncQueue();
}
}
}
reportInterruptAfterWait
:中斷處理
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// 如果是THROW_IE狀態,拋異常
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT) // 再次中斷,因為中斷狀態被使用過一次
selfInterrupt();
}
awaitNanos
、awaitUntil
和await(long time, TimeUnit unit)
這幾個方法的整體邏輯是一樣的,就不再分析了
實現signal功能需要做的事情
- 將條件隊列中的節點加入同步隊列
- 喚醒線程
與signal功能相關的代碼
- signal方法:喚醒等待條件的節點
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 獲取條件隊列中的第一個節點
Node first = firstWaiter;
if (first != null)
// 喚醒等待條件的節點
doSignal(first);
}
doSignal
:喚醒等待條件的節點
private void doSignal(Node first) {
do {
// 去掉無效的節點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 將節點轉移到同步隊列
(first = firstWaiter) != null);
}
transferForSignal
:將節點轉移到同步隊列
final boolean transferForSignal(Node node) {
/*
* 取消的節點不需要轉移
*/
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/*
* 將節點加入同步隊列尾部
*/
Node p = enq(node);
int ws = p.waitStatus;
// ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程
// 如果 ws <= 0, 那麼 compareAndSetWaitStatus 將會被調用
// 節點入隊後,需要把前驅節點的狀態設為SIGNAL
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
// 如果前驅節點取消或者 CAS 失敗,會進到這裡喚醒線程
LockSupport.unpark(node.thread);
return true;
}
- signalAlll方法:喚醒所有等待條件的節點
public final void signalAll() {
// 如果是當前線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 喚醒所有等待條件的節點
doSignalAll(first);
}
doSignalAll
:喚醒所有等待條件的節點
// 將所有的節點都轉移到同步隊列
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
現在將與AQS相關的核心代碼都整理了一遍,裡面如果有描述不清晰或者不准確的地方希望大家可以幫忙指出!