前言 由於AQS的源碼太過凝練,而且有很多分支比如取消排隊、等待條件等,如果把所有的分支在一篇文章的寫完可能會看懵,所以這篇文章主要是從正常流程先走一遍,重點不在取消排隊等分支,之後會專門寫一篇取消排隊和等待條件的分支邏輯。讀源碼千萬別在每個代碼分支中來回游走,先按一個正常的分支把流程看明白,之後再 ...
前言
由於AQS的源碼太過凝練,而且有很多分支比如取消排隊、等待條件等,如果把所有的分支在一篇文章的寫完可能會看懵,所以這篇文章主要是從正常流程先走一遍,重點不在取消排隊等分支,之後會專門寫一篇取消排隊和等待條件的分支邏輯。讀源碼千萬別在每個代碼分支中來回游走,先按一個正常的分支把流程看明白,之後再去重點關註其他分支,各個擊破。我相信看完正常流程,你再去分析其他分支會更加得心應手。本篇將主要方法名都做了目錄索引,查看時可通過目錄快速跳到指定方法的邏輯。
執行流程
AQS的執行流程大體為當線程獲取鎖失敗時,會加入到等待隊列中,在等待隊列中的線程會按照從頭至尾的順序依次再去嘗試獲取鎖執行。
當線程獲取鎖後如果還需要等待特定的條件才能執行,那麼線程就加入到條件隊列排隊,當等待的條件到來時再從條件隊列中按照從頭至尾的順序加入到等待隊列中,然後再按照等待隊列的執行流程去獲取鎖。所以AQS最核心的數據結構其實就兩個隊列,等待隊列和條件隊列,然後再加上一個獲取鎖的同步狀態。
AQS數據結構
AQS最核心的數據結構就三個
-
等待隊列
源碼中head和tail為等待隊列的頭尾節點,在通過前後指向則構成了等待隊列,為雙向鏈表,學名為CLH隊列。
-
條件隊列
ConditionObject中的firstWaiter和lastWaiter為等待隊列的頭尾節點,然後通過next指向構成了條件隊列,是個單向鏈表。
-
同步狀態
state為同步狀態,通過CAS操作來實現獲取鎖的操作。
public abstract class AbstractQueuedSynchronizer{
/**
* 等待隊列的頭節點
*/
private transient volatile Node head;
/**
* 等待隊列的尾節點
*/
private transient volatile Node tail;
/**
* 同步狀態
*/
private volatile int state;
public class ConditionObject implements Condition, java.io.Serializable {
/** 條件隊列的頭節點 */
private transient Node firstWaiter;
/** 條件隊列的尾節點 */
private transient Node lastWaiter;
}
}
Node節點
兩個隊列中的節點都是通過AQS中內部類Node來實現的。主要欄位:
-
waitStatus
當前節點的狀態,具體看源碼列出的註釋。很重要,之後會在源碼中講解。
-
Node prev
等待隊列節點指向的前置節點
-
Node next
待隊列節點指向的後置節點
-
Node nextWaiter
條件隊列中節點指向的後置節點
-
Thread thread
當前節點持有的線程
static final class Node {
/** */
static final Node SHARED = new Node();
/** */
static final Node EXCLUSIVE = null;
/** 標明當前節點線程取消排隊 */
static final int CANCELLED = 1;
/** 標明該節點的後置節點需要自己去喚醒 */
static final int SIGNAL = -1;
/** 標明當前節點在等待某個條件,此時節點在條件隊列中 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* 等待狀態,值對於上面的四個常量
*/
volatile int waitStatus;
/**
* 等待隊列節點指向的前置節點
*/
volatile Node prev;
/**
* 等待隊列節點指向的後置節點
*/
volatile Node next;
/**
* 當前節點持有的線程
*/
volatile Thread thread;
/**
* 條件隊列中節點指向的後置節點
*/
Node nextWaiter;
加鎖
上面說明的數據結構我們先大致有個印象,現在通過加鎖來一步步說明下具體的流程,上篇文章JUC併發編程基石AQS之結構篇,我們知道了AQS加鎖代碼執行的是acquire方法,那麼我們從這個方法說起,從源碼中看出執行流程為:tryAcquire——>addWaiter——>acquireQueued
tryAcquire為自己實現的具體加鎖邏輯,當加鎖失敗時返回false,則會執行addWaiter,將線程加入到等待隊列中,Node.EXCLUSIVE為獨占鎖的模式,即同時只能有一個線程獲取鎖去執行。
例子說明
首先假設有四個線程t0-t4調用tryAcquire獲取鎖,t0線程為天選之子獲取到了鎖,則t1-t4線程接著去執行addWaiter。
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter分支1
addWaiter方法,首先會初始化一個node節點,將當前線程設置到node節點中。然後判斷head和tail節點是否為空,head和tail節點是懶載入的,當AQS初始化時為null,則第一次進來時if (pred != null) 條件不成立,執行enq方法。
例子說明
假如t1和t2線程同時執行到該方法,head節點未初始化則執行enq。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq
此時可能多個線程會同時調用enq方法,所以該方法中也使用CAS操作。for (;;)是個死迴圈,首先會CAS操作初始化head節點,且head節點是個空節點,沒有設置線程。然後第二次迴圈時通過CAS操作將該節點設置我尾部節點,並將前置節點指向head,之後會跳出迴圈,返回生成的Node節點到addWaiter,從源碼可以看到addWaiter方法後面沒有邏輯,之後會調用acquireQueued。
例子說明
t1和t2線程同時執行,t1線程上天眷顧CAS成功,則流程為
- 初始化head
- t1線程的node節點加入等待隊列
- t2線程執行,node節點加入等待隊列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter分支2
現在在來說t3和t4,t3和t4線程這時終於獲取到了cpu的執行權,此時head節點已經初始化,則進入條件中的代碼,其實也是通過CAS操作將節點加入到等待隊列尾部,之後會調用acquireQueued。
例子說明
假如t3線程先CAS成功,之後t4成功,此時的數據結構為
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
acquireQueued
這個方法有兩個邏輯,首先如果該節點的前置節點是head會走第一個if,再次去嘗試獲取鎖???
獲取鎖成功,則將頭節點設置為自己,並返回到acquire方法,此時acquire方法執行完,代表獲取鎖成功,線程可以執行自己的邏輯了。這裡有下麵幾個註意點
- p.next = null; // help GC 設置舊的head節點的後置節點為null
- setHead方法 將t1節點設置為頭節點,因為頭節點是個空節點,所以設置t1線程節點線程為null,設置t1前置節點為null,此時舊的head節點已經沒有任何指向和關聯,可以被gc回收,所以上面那一步會寫個help GC 的註釋。
例子說明
現在t1線程的前置節點為頭結點,如果t1執行tryAcquire成功則結果為
當獲取鎖失敗或者前置節點不是頭節點都會走第二個if邏輯,首先會判斷當前線程是否需要掛起,如果需要則執行線程掛起。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire
判斷線程是否需要掛起,首先需要註意的是這個方法的參數是當前節點的前置節點。當線程需要掛起的時候,它需要把身後事安排明白,掛起後讓誰來把我喚醒。這個方法就主要做這個操作。我們再來看Node節點中的waitStatus狀態,這個狀態有一個Node.SIGNAL=-1,代表了當前節點需要將後置節點喚醒。這個理解可能有點繞。首先我們要理解一點,如果我需要被喚醒,那麼我就要設置我們的前置節點的狀態為Node.SIGNAL,這樣當我的前置節點發現waitStatus=Node.SIGNAL時,它才知道,我執行完後需要去喚醒後置節點讓後置節點去執行。所以這個方法是當前節點去設置自己的前置節點的狀態為Node.SIGNAL。
waitStatus初始化後是0,
第一次進入該方法,發現自己的前置節點不是Node.SIGNAL,需要先設置為Node.SIGNAL狀態
第二次進入時發現前置節點已經是Node.SIGNAL狀態,那麼我就可以安心的掛起了,有人會喚醒我的。
所以這個方法其實是兩個邏輯,先設置前置節點狀態,再判斷是否可以掛起。因為前面acquireQueued方法中for (;