### 目錄 *1:什麼是AQS?* *2:AQS都有那些用途?* *3:我們如何使用AQS* *4:AQS的實現原理* *5:對AQS的設計與實現的一些思考* ### 1:什麼是AQS 隨著電腦的算力越來越強大,各種各樣的並行編程模型也隨即踴躍而來,但當我們要在並行計算中使用共用資源的時候, ...
目錄
1:什麼是AQS?
2:AQS都有那些用途?
3:我們如何使用AQS
4:AQS的實現原理
5:對AQS的設計與實現的一些思考
1:什麼是AQS
隨著電腦的算力越來越強大,各種各樣的並行編程模型也隨即踴躍而來,但當我們要在並行計算中使用共用資源的時候,就需要利用一種手段控制對共用資源的訪問和修改來保證我們程式的正確的運行。而Java中除了在語言級別實現的synchronized鎖之外,還有另一種對共用資源訪問控制的實現,而這些實現都依賴於一個叫做抽象隊列同步器(AbstractQueuedSynchronizer)的模板框架類,簡稱AQS。所以我們想要更深刻的理解Java中對共用資源的訪問控制,就不可避免的要對AQS深入的瞭解和探討。
我們首先看一下官方對於AQS的描述:提供一個依賴先進先出(FIFO)等待隊列的掛起鎖和相關同步器(信號量、事件等)框架。此類被設計為大多數類型的同步器的基類,這些同步器依賴於單個原子int值來表示狀態。子類必須定義更改該狀態的受保護方法,以及定義該狀態在獲取或釋放該對象方面的含義。考慮到這些,類中的其他方法實現排隊和掛起機制。子類可以維護狀態欄位,但只有使用方法getState、setState和compareAndSetState方法才可以更改狀態。
2:AQS有哪些用途
AQS的用途有很多,幾乎Java中所有的共用資源控制的實現都離不開它,比如我們常用的鎖ReentrantLock、是基於AQS實現了一套可重入的公平和非公平互斥鎖的實現。上圖中我列舉了我們常用的一些對於共用資源訪問控制的一些工具,也都是基於AQS實現的。
3:如何使用AQS
我們要是用AQS實現我們自己的鎖,都離不開AQS中一個叫做int類型state的變數,而這個變數的具體意義是由使用者自已定義的,比如我們要基於AQS實現一個不可重入的互斥鎖,我們可以定義state為1代表有線程已經獲取了鎖,state為0為空閑狀態。
下麵是AQS文檔中的一段使用AQS自定義互斥鎖的一段示例代碼,我把它放到此處,方便大家查閱。
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
//Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
4:AQS的實現原理
AQS實現的核心思想是,如果被請求的共用資源空閑,那麼就將當前請求資源的線程設置為有效的工作線程,將共用資源設置為鎖定狀態;如果共用資源被占用,就需要將此線程放入一個叫做CLH(三個人名Craig、Landin and Hagersten)的等待隊列中,然後通過掛起和喚醒機制來保證鎖的分配。而將資源設置為鎖定狀態主要是通過說到的一個叫做int類型的state的變數來控制的,隊列的FIFO操作則是利用CLH隊列來實現。
等待隊列是“CLH”(Craig、Landin和Hagersten)鎖隊列的變體。我們使用它們來作為同步器,在其節點中存儲有關線程的一些控制信息。每個節點中的status欄位跟蹤線程是否應該掛起。當節點的前驅節點釋放時,節點會發出信號。隊列的每個節點在其他方面都充當一個特定的通知樣式監視器,其中包含一個等待線程。要進入CLH鎖的隊列,可以將其原子性地拼接在隊列的尾部。要退出隊列,只需將其放在隊列頭部,也就是將此節點設置為head欄位,原理圖如下。
4.1:AQS的數據結構
首先我們先看一下AQS中最基本的數據結構,也就是CLH隊列中的節點,是一個名為Node的靜態內部類
static final class Node {
/** 標記此節點是一個以共用模式等待的鎖 */
static final Node SHARED = new Node();
/** 標記此節點是一個以互斥模式等待的鎖 */
static final Node EXCLUSIVE = null;
/** 表示線程獲取鎖的線程已經取消了*/
static final int CANCELLED = 1;
/** 原文註釋:waitStatus value to indicate successor's thread needs unparking
表示此線程的後繼線程需要通過park()方法掛起。
我的理解是此線程的後繼節點需要被掛起,
但當前節點必須是釋放鎖或者取消獲取鎖,後繼線程等待被喚醒獲取鎖,後續可以通過源碼解釋。
*/
static final int SIGNAL = -1;
/** 表示節點在等待隊列中,節點線程等待喚醒,在使用Conditional的時候會有此狀態 */
static final int CONDITION = -2;
/**
* 當前線程處在SHARED情況下,該欄位才會使用
*/
static final int PROPAGATE = -3;
/**
這些值以數字形式排列,以簡化使用。非負值表示節點不需要信號。因此,大多數代碼不需要檢查特定的值,僅用檢查符號就可以 了。對於正常同步節點,該欄位初始化為0,並且條件節點的CONDITION。使用CAS進行修改。
*/
volatile int waitStatus;
/**
前驅界定
*/
volatile Node prev;
/**
後繼節點
*/
volatile Node next;
/**
* 此節點的線程
*/
volatile Thread thread;
/**
指向下一個處於CONDITION狀態的節點,使用Conditional模式才會用到此節點,
篇幅原因,本片不講述Condition Queue隊列,故對此欄位不多作介紹。
*/
Node nextWaiter;
/**
* 是否是共用模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回當前節點的前驅節點,前驅節點為null,則拋出NPE
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中其他重要欄位
/**
* 隊列的頭節點(虛節點,不存儲數據),懶初始化。除了初始化之外,僅能通過setHead方法修改。
* 註:假如頭節點存在,一定會保證waitStatus變數的值不是CANCELLED
*/
private transient volatile Node head;
/**
* 隊列的尾節點(虛節點,不存儲數據),懶初始化,僅僅可以通過enq方法初始化
*/
private transient volatile Node tail;
/**
* 同步狀態
*/
private volatile int state;
由靜態內部類Node中的prev和next欄位我們可以知道CLH變體隊列是一個由Node組成的雙向隊列,由欄位head和tail可以知道AQS中保存了頭節點和尾節點。state欄位則是用來作為同步的重要欄位,AQS中提供了三個final類型的方法來訪問該欄位。
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
4.2:模板方法
/**
嘗試以獨占模式獲取鎖,此方法此方法應查詢對象的狀態是否允許以獨占模式獲取該對象,若允許的話則可以獲取它。
此方法總是由執行acquire方法的線程調用。如果此方法返回false且線程尚未排隊,則acquire方法可以對線程進行排隊,直到某 個其他線程發出釋放信號,則該線程停止掛起。
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
嘗試以獨占模型釋放鎖
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
嘗試以共用模式獲取鎖,此方法此方法應查詢對象的狀態是否允許以獨占模式獲取該對象,若允許的話則可以獲取它。
此方法總是由執行acquire方法的線程調用。如果此方法返回false且線程尚未排隊,則acquire方法可以對線程進行排隊,直到某 個其他線程發出釋放信號,則該線程停止掛起。
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
嘗試以共用模式釋放鎖
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
返回是否以獨占模式持有鎖
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
思考:AQS作為一個模板框架類,為什麼 tryLock和tryRelease等方法,那麼為什麼這些方法不定義成abstract方法,而要定義成普通方法,且在方法中拋出異常呢?我的理解是AQS作為一個模板框架提供了加鎖和釋放鎖的通用邏輯,但是又不僅僅是提供了獨占鎖或共用鎖的邏輯,而是作為一個底層的通用執行模板類,如何定義成抽象的模板方法,那麼所有的子類都要實現所有的模板方法,但是有些子類僅僅需要獨占鎖,比如ReentrantLock,那麼就會要多先實現共用鎖的邏輯(即使是空方法也要實現),所以我猜想是為了子類不必要實現多餘的方法,所以才定義成普通的方法併在方法內部拋出異常。
4.3:獲取鎖源碼及流程分析
由於篇幅原因,本文將以獨占且忽略中斷的模式的方法未入口分析,首先看一下獲取鎖的方法,獲取鎖的總體流程圖如下:
獲取鎖的入口方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//acquireQueued方法會返回線程在掛起過程中是否被中斷,然後返回線程中斷的狀態
selfInterrupt();
}
可以看到獲取鎖的方法是一個由final修飾的不可被子類重寫的方法,首先調用了上面的模板方法(必須由子類重寫獲取邏輯)獲取鎖
如果獲取成功則獲取鎖流程執行結束。否則執行addWaiter方法執行入隊邏輯。
線程入隊方法
private Node addWaiter(Node mode) {
//mode線程獲取鎖的模式(獨占或者共用)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速入隊,失敗則執行完整入隊邏輯
Node pred = tail;
if (pred != null) {
node.prev = pred;
//如果尾節點不為null,則以原子方式把當前節點設置為尾節點並返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾節點不存(說明頭節點和尾節點未初始化)在或者由於競爭導致一次設置尾節點失敗,
//則執行完整入隊邏輯(會進行頭節點和尾節點初始化的工作)
enq(node);
return node;
}
addWaiter方法會先進行快速入隊操作,如果快速入隊失敗(由於競爭或者頭、尾節點未初始化),則進行完整入隊操作(如果頭、尾節點未初始化的話會先進行初始化操作)
完整入隊邏輯
private Node enq(final Node node) {
//自旋把當前節點鏈接到尾節點之後
for (;;) {
Node t = tail;
if (t == null) {
//尾節為空,說明隊列為空,則需要進行頭節點和尾節點的初始化
//這裡直接new Node(),一個虛節點作為頭節點,然後將頭節點和尾節點相同
//這裡說明頭節點和尾節點不存儲數據
if (compareAndSetHead(new Node()))
tail = head;
} else {
//尾節點不為空,使用cas把當前節點設置為尾節點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法會利用自旋先檢查頭節點和尾節點是否初始化,如果未初始化的話則先進行初始化。初始化完成之後以原子的方式插入node到隊尾並且插入成功之後返回此節點。
掛起線程並等待獲取鎖
final boolean acquireQueued(final Node node, int arg) {
//是否失敗,此線程被中斷可能失敗
boolean failed = true;
try {
//線程是否被中斷
boolean interrupted = false;
//自旋一直獲取鎖
for (;;) {
//獲取當前節點的前驅節點
final Node p = node.predecessor();
//如果當前節點的前驅節點是頭節點(因為頭節點是虛節點,所以當前節點可以獲取鎖),
//且當前節點獲取所成功
if (p == head && tryAcquire(arg)) {
//設置node為頭節點,因為當前節點已經獲取鎖成功了,當前節點需要作為頭節點
setHead(node);
p.next = null; // help GC
//設置失敗標誌為false
failed = false;
//返回中斷狀態
return interrupted;
}
//shouldParkAfterFailedAcquire方法檢查並更新獲取失敗的節點的狀態,如果線程應該掛起則返回true
//parkAndCheckInterrupt則掛起線程並返回是否中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//失敗,則取消獲取鎖
if (failed)
cancelAcquire(node);
}
}
通過上面流程分析可知,獲取鎖失敗,會調用addWaiter方法把當前節點放到隊尾,那麼線程入隊之後什麼時候掛起線程,什麼時候出隊,我們一點一點分析acquireQueued方法這些問題就會逐漸顯露出來。
首先該方法會一直自旋獲取鎖(中間可能會被掛起,防止無效自旋),判斷當前節點的前驅節點是否是頭節點來判斷當前是否有獲取鎖的資格,如果有的話則設置當前節點為頭節點並返回中斷狀態。否則調用shouldParkAfterFailedAcquire判斷獲取鎖失敗後是否可以掛起,可以的話則調用parkAndCheckInterrupt進行線程掛起操作。
設置頭節點
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
註:setHead方法是把當前節點置為虛節點,但並沒有修改waitStatus,因為其他地方要用到。
檢查並更新獲取失敗的節點的狀態
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* node的狀態已經是SIGNAL可以安全的掛起,直接返回true
*/
return true;
if (ws > 0) {
/*
* 由之前的waitStatus變數枚舉值可知,waitStatus大於0為取消狀態,直接跳過此節點
*/
do {
//重新鏈接prev指針,至於為什麼沒有操作next指針後面會通過代碼解釋
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 原子方式設置waitStatus的值為SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
掛起並檢查線程的中斷狀態
private final boolean parkAndCheckInterrupt() {
//使用LockSupport掛起此線程
LockSupport.park(this);
//返回並清除中斷狀態
return Thread.interrupted();
}
取消獲取鎖
private void cancelAcquire(Node node) {
// 忽略不存在的節點
if (node == null)
return;
//設置當前節點不持有線程
node.thread = null;
// 跳過取消的前驅節點
Node pred = node.prev;
//waitStatus>0未已取消的節點
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 未取消的節點的後繼節點
Node predNext = pred.next;
//設置狀態未取消狀態
node.waitStatus = Node.CANCELLED;
// 如果node為尾節點,設置pred為尾節點,然後設置尾節點的下一個節點為null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果當前節點不是head的後繼節點,
//1:判斷當前節點前驅節點的是否為SIGNAL,
//2:如果不是,則把前驅節點設置為SINGAL看是否成功
// 如果1和2中有一個為true,再判斷當前節點的線程是否為null
// 如果上述條件都滿足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//為什麼沒有自旋,如果此處設置失敗,下次仍然會丟掉predNext到next中間節點,所以不會出現問題
compareAndSetNext(pred, predNext, next);
} else {
//當前節點是頭節點的後繼節點或者上述條件都不滿足
unparkSuccessor(node);
}
//為什麼此處能help GC,不得不多說Doug Lea大神心思之縝密,考慮之周全。
//解釋參考http://www.codebaoku.com/question/question-sd-1010000043795300.html
node.next = node; // help GC
}
}
當node==tail時,節點變化情況如下圖
當pred==head時,節點的變化情況如下圖
當pred!=head時,且上述條件滿足時節點的變化情況如下圖
通過上面的流程,我們對於取消獲取鎖的cancelAcquire方法對節點操作狀態的產生和變化已經有了一定的瞭解,但是為什麼所有的變化都是對next指針進行了操作,而沒有對Prev指針進行操作呢?帶著這個問題我們回顧一下shouldParkAfterFailedAcquire方法。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
原因:執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時修改Prev指針,有可能會導致Prev指向另一個已經移除隊列的Node,因此這塊變化Prev指針不安全。 shouldParkAfterFailedAcquire方法中,會執行下麵的代碼,其實就是在處理Prev指針。shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會執行,進入該方法後,說明共用資源已被獲取,當前節點之前的節點都不會出現變化,因此這個時候變更Prev指針比較安全。
喚醒後繼節點
private void unparkSuccessor(Node node) {
/*
* node一般為head節點,如果waitStatus為負,則嘗試清除信號,設置為0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 正常情況下我們是要喚醒頭節點的後繼節點,但是如果後繼節點為空或者已被取消,
* 則需要從尾節點開始,找到離頭節點最近的未被取消的後繼節點。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果當前節點的下個節點不為空,而且狀態小於等於0,就把當前節點喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
為什麼需要從後往前找呢?從美團技術團隊中的一片文章中(https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html)找到了答案,我把大佬的解釋放到下麵,供大家參考!
4.4:釋放鎖源碼及流程分析
釋放鎖流程圖如下:
public final boolean release(int arg) {
//嘗試釋放鎖成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
//嘗試釋放鎖失敗
return false;
}
釋放鎖的流程就比較簡單了,先嘗試釋放鎖,如果釋放鎖成功,(如果頭節點不為null且頭節點的狀態不等於0則釋放頭節點的後繼節點)返回true,否則返回false。
5:對AQS的設計與實現的一些思考
1:設計方面,AQS作為底層一個通用的模板框架類,就要考慮到一些易用性和擴展性,比如作者模板方法使用的拋出異常,而不是作為抽象方法強制使用者實現所有的模板方法,而是使用者可以自由選擇要使用的方法和特性選擇性實現模板方法,當然也犧牲掉了一些其他東西,比如設計原則的最小職責性。這也就體現了一些折衷的思想,任何方案都不是完美的,我們只有權衡利弊之後達到一個相對完美的方案。
2:實現方面,AQS的實現不得不令人驚嘆,每一次讀都會想到書讀百遍,其意自現這句名言,每次讀有不一樣的收穫,看似一行不經意的代碼,體現了作者對每一行代碼細緻又獨到的思考。在讀AQS代碼的時候參考下麵的鏈接,看大牛對AQS的的理解時見解,不僅加深了我對AQS核心思想的理解,也讓我從另一方面看到了AQS的優秀之處(由於個人水平有限,理解不到位或者錯誤還請各位道友不吝賜教)。路漫漫其修遠兮,吾將上下而求索。
參考:
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
//help GC 相關的解釋
http://www.codebaoku.com/question/question-sd-1010000043795300.html