AQS是JUC包中許多類的實現根基,這篇文章只是個人理解的產物,不免有誤,若閱讀過程中有發現不對的,希望幫忙指出[贊]! 1 AQS內臟圖 在開始瞭解 之前,我們先從上帝視角看看 是由幾個部分組成的。 內部維護了一個 修飾的 資源變數 ,裡面的所有操作都可以說跟這個變數有關係,因為它代表的就 ...
AQS是JUC包中許多類的實現根基,這篇文章只是個人理解的產物,不免有誤,若閱讀過程中有發現不對的,希望幫忙指出[贊]!
1 AQS內臟圖
在開始瞭解AQS
之前,我們先從上帝視角看看AQS
是由幾個部分組成的。
AQS
內部維護了一個volatile
修飾的資源變數,裡面的所有操作都可以說跟這個變數有關係,因為它代表的就是資源,這是一點;另外內部還有為公平爭奪資源而準備的同步隊列,說是同步隊列,實質上存放在AQS
的也就head
節點和tail
節點;此外還有一個等待隊列,等待隊列是為了實現喚醒指定組線程爭奪資源而出現的,通過內部類ConditionObject
的firstWaiter
和lastWaiter
實現,兩個隊列的概念圖如下。
除去這些還有兩個內部類Node
和ConditionObject
,Node
是隊列的實現根基,裡面存放了許多重要的信息,如操作的線程、線程競爭的狀態等;而ConditionObject
則是Condition
介面的實現類,用來實現喚醒指定線程組的(等待隊列)。
state
:資源變數,AQS
重要組成成分,其內部的操作大多數都是對此資源的競爭。
head
和tail
節點:這兩個Node
節點其實就是AQS中
的同步隊列,而Node
是AQS
的內部類,整個資源爭奪的過程就是Node
同步隊列節點的調整和狀態變更的過程。
Node
內部類:AQS
兩個隊列的實現節點。
waitStatus
:節點狀態,取值為-3~1
(整數)。
0
:初始狀態或者不代表任何意義時的取值。
-1
:SIGNAL
狀態,個人理解是處於這個狀態的節點後方還有可用的節點,所以當其釋放資源 時要提醒後方節點參與競爭。
-2
:CONDITION
狀態,這個狀態標識當前節點處於等待隊列中,等待隊列中的節點不會參與 競爭,必須從等待隊列出來後重新加入同步隊列才能參與競爭。
-3
:PROPAGATE
,表示處於共用模式,此時不僅只是喚醒下個節點,還可能喚醒下下個節 點。
1
:CANCELLED
,廢棄節點,表示當前節點沒用了,處於該狀態的節點不會再改變,所以AQS
中經常會判斷節點狀態是否大於0
來檢查節點是否還有用。
thread
:爭奪資源的線程,存放在節點當中。prev
:同步隊列中的上一個節點。next
:同步隊列的下一個節點。nextWaiter
:下一個等待節點。AQS
中的等待隊列,可以有多個等待隊列。
ConditionObject
:AQS
內部類,實現Condition
介面,定義了兩個變數firstWaiter
和lastWaiter
,這 兩個變數 組成等待隊列。可以簡單的理解為Condition
介面的功能是能讓一定數量的線程一起等待某個條件,這個條件就是condition
,當condition
喚醒的時候,那麼這些等待的線程就會被其喚醒,反之線程則一直等待其喚醒條件。而在AQS
中,ConditionObject
可以維護多個等待隊列,當同步隊列中的節點使用了await()
方法則將其移除同步隊列放入相應的等待隊列,在等待隊列中使用signal
方法則從等待隊列中移除放入同步隊列隊尾。
2 AQS的開放方法
現在我們對AQS
的組成有了大概的瞭解,接下來看看其內部資源的競爭、獲取和釋放的實現。AQS
採用模板設計模式實現,其定義了許多頂級方法例如acquire
、release
等,這些方法子類不能重寫但是可以調用,而如果想讓其正確的調用則需要根據其規則實現開放出來的介面如tryAcquire
等(頂級方法內部調用了開放方法)。
其開放的方法有tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
、isHeldExclusively
共五種,每個方法裡面沒有具體的實現,反而是直接拋出了異常,如下,所以子類需要重寫用到的方法。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這些方法表示嘗試去獲取資源或者釋放資源,其實現必須要跟state
資源狀態相關,舉個例子,tryAcquire
方法表示以獨占的方式嘗試獲取資源,如果獲取到了那麼其他線程不得操作其資源,其中入參的arg
則表示想要獲取到的資源數量,例如我tryAcquire(5)
成功了,那麼狀態變數state
變數則增加5
,如果tryRelease(5)
成功則state
狀態變數減少5
,等到state==0
的時候則表示資源被釋放,即可以理解為鎖被釋放。
如果只是使用AQS
的話,再加上幾個變更狀態的方法就可以了,我們不需要瞭解更多的東西,如同AQS
的文檔給出的案例一般,簡單的重寫幾個方法便可以實現一種鎖,如下,一個不可重入鎖的簡單實現。
class Mutex implements Lock, java.io.Serializable {
// 同步內部類,鎖的真正操作都是通過該類的操作
private static class Sync extends AbstractQueuedSynchronizer {
// 檢查當前是否已經處於鎖定的狀態
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 如果資源變數為0,則獲取鎖(資源)
public boolean tryAcquire(int acquires) {
// acquires的值只能是1,否則的話不進入下麵代碼
assert acquires == 1;
if (compareAndSetState(0, 1)) {
// 設置持有當前鎖的線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 通過將狀態變數state設定為0來表示鎖的釋放
protected boolean tryRelease(int releases) {
// 傳入的參數只能是1,否則是無效操作
assert releases == 1;
// 如果狀態狀態等於0,說明不是鎖定狀態
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供Condition,返回其AQS內部類ConditionObject
Condition newCondition() { return new ConditionObject(); }
// 反序列化
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// 內部類已經實現了所有需要的方法,我們只要封裝一層就行
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
進行一個小測試
public static void main(String[] args) {
Lock lock = new Mutex();
new Thread(() -> {
lock.lock();
try {
System.err.println("獲得鎖線程名:" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.err.println(Thread.currentThread().getName() + "釋放鎖");
}
}).start();
new Thread(() -> {
lock.lock();
try {
System.err.println("獲得鎖線程名:" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.err.println(Thread.currentThread().getName() + "釋放鎖");
}
}).start();
}
最終的結果圖如下
這樣就實現了一個不可重入鎖,是不是看起來很簡單?那肯定啊,難的都被AQS
團隊的大佬們封裝完了。
AQS的那些頂級方法
首先來看acquire
方法:
// 代碼邏輯不複雜,首先嘗試獲取資源,如果失敗了則將封裝節點放入同步隊列中直到獲取到資源
public final void acquire(int arg) {
// 嘗試獲得鎖,如果失敗了則增加節點放入等待隊列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中addWaiter
將當前線程封裝成一個節點放入等待隊列中,而acquireQueued
方法則是在一個迴圈中嘗試獲取資源,如果獲取資源的過程中被線程被打斷不會進行任何形式的相應,只是記錄一下當前節點被打斷過,在獲取到資源後再把被打斷的邏輯補上。
我們看看addWaiter
做了什麼。
private Node addWaiter(Node mode) {
// 將當前線程封裝入一個節點之中
Node node = new Node(Thread.currentThread(), mode);
// 首先嘗試一次快速的入隊,如果失敗的話則採用正常方式入隊
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 入隊操作
enq(node);
return node;
}
再看下入隊操作的實現
private Node enq(final Node node) {
// 迴圈直到將節點放入同步隊列中
for (;;) {
Node t = tail;
// 如果同步隊列是空的話則進行隊列的初始化
if (t == null) {
// 這裡註意初始化的時候head是一個新增的Node,其waitStatus為0
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 否則的話嘗試設置尾節點,失敗的話重新迴圈
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
到這裡我們可以知道addWaiter
方法首先將當前線程封裝為節點,然後嘗試快速的將節點放入隊列尾,如果失敗的話則進行正常的入隊操作,而入隊操作的則是不斷的迴圈將當前節點設置為尾節點,其中如果一開始隊列為空的話則進行隊列的初始化。
再回到acquire
方法中這一段代碼中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
節點入隊之後還有一個acquireQueued
操作,這個方法就是線程不斷自旋的去獲取資源的過程,在一定嘗試後進入阻塞。我們進入此方法
final boolean acquireQueued(final Node node, int arg) {
// 預設獲取失敗
boolean failed = true;
try {
/*
* 線程打斷標識,我們知道使用interrupt()方法只是改變了線程中的打斷標識變數,
* 並不能打斷正在運行的線程,而對於這個打斷變數的處理一般有兩種方式,
* 一種是記錄下來,一種是拋出異常,這裡選擇前者,而可打斷的acquire則是選擇後者
*/
boolean interrupted = false;
// 這裡就是自旋的過程了
for (;;) {
// 拿到前一個節點
final Node p = node.predecessor();
// 如果前節點為頭節點則嘗試一次獲取
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 沒有拿到資源,根據前節點決定線程是否進入阻塞狀態,兩個方法解釋在下方
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如上方所說,記錄打斷標識
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 檢查是否需要阻塞線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前節點的狀態
int ws = pred.waitStatus;
// 如果前節點狀態為SIGNAL,我對這個狀態的理解是其後續還有處於同步隊列中的節點
if (ws == Node.SIGNAL)
// 表示下方代碼已經執行過一次了,所以直接返回
return true;
if (ws > 0) {
// 狀態大於0,則表示節點已經取消作廢,那麼需要一直往前找直到找到有效的節點
// 在AQS中經常使用狀態>0來表示無效,<0表示有效
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 剩下的狀態只能是0或者PROPAGATE(CONDITION(-2)狀態不會出現在同步隊列中)
* 這裡並沒有返回true,說明還要進行一次迴圈
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 在上方方法判斷需要掛起線程之後,調用parkAndCheckInterrupt方法將線程掛起
private final boolean parkAndCheckInterrupt() {
// 使用park方法將線程掛起
LockSupport.park(this);
// 在上面我們提到線程的打斷標識,interrupted()方法返回後會重置這個標識
return Thread.interrupted();
}
獨占式獲取資源的主要方法差不多就是這樣,還有可打斷的獨占式獲取方法acquireInterruptibly
,代碼如下,其實現基本相同,只是對於我們方纔說的打斷標識的處理從記錄改成了拋出異常,所以才是可打斷的,有興趣可以自己再看下,基本邏輯相同,看起來也就耗費不了多少時間。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
// 拋出異常處理
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
瞭解完獲取資源自然知道釋放資源的過程,相對來說釋放資源要相對容易一些,大致邏輯為嘗試釋放資源,如果成功了,則改變節點的狀態並且喚醒下一個可用節點(一般是下一個,但是可能出現下一個節點已經被取消的情況)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 修改線程的狀態,並且喚醒下一個節點進行資源競爭
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 改變節點狀態
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 喚醒下一個可用節點,一般來說是下一個節點,但是可能出現下個節點被取消
* 或者為空的情況,這個時候就要從尾結點向前遍歷直到找到有效的節點(從尾節點向前遍歷
* 是因為無論下個節點是空還是取消的節點,正向遍歷都不可能走得通了,取消的節點的next
* 就是其本身,所以只能從後面開始往前遍歷)
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到下個節點之後將其喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
到這裡AQS
獨占式的獲取釋放資源的過程大致就結束了,雖然只是簡單的將其獲取和釋放過程過了一遍,但是知道這些腦子裡對AQS
也有了大概的框架模型,在這個模型中繼續去理解其他方法相信也不會太難,總之先記錄到這裡。
總結
首先從一開始我們用一張圖瞭解了AQS
的大致構造,接下來又瞭解了組成成分的作用,其中主要圍繞兩個隊列(同步隊列和等待隊列)和兩個同步類(Node
和ConditionObject
)說明瞭其實現。
接著進入資源獲取和釋放,用獨占式的acquire
和release
方法來說明整個過程,acquire
方法包含了嘗試獲取資源、入隊和休眠等操作,而release
方法相對簡易的改變了狀態,並且喚醒後方節點,從而分別表示鎖的獲取和釋放,到這裡就算過了一遍AQS
的獨占式流程。
I wish I could write better.