簡介 AQS(AbstractQueuedSynchronizer)是併發開發中一個基礎組件。主要實現了同步狀態管理、線程隊列管理、線程等待、線程喚醒等底層操作。JDK中許多的併發類都是依賴AQS的。 ReentrantLock(可重入鎖)、Semaphore(信號量)、CountDownLatch ...
目錄
簡介
AQS(AbstractQueuedSynchronizer)是併發開發中一個基礎組件。主要實現了同步狀態管理、線程隊列管理、線程等待、線程喚醒等底層操作。JDK中許多的併發類都是依賴AQS的。 ReentrantLock(可重入鎖)、Semaphore(信號量)、CountDownLatch(計數器)。
Lock簡單實用
- 介紹原理前我們簡單來看看Lock使用。
public static void main(String[] args) {
Integer index = 0;
ReentrantLock lock = new ReentrantLock();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
System.out.println(finalI);
lock.unlock();
}
});
threadList.add(thread);
}
for (Thread thread : threadList) {
thread.start();
}
}
- 就是lock 和unlock的使用。就能夠保證中間的業務是有序執行的。上面不會保證輸出數字有序,但是能保證輸出的個數是100個,因為這裡我們理解成他們會進入隊列中。但是進入的順序不確定。那麼下麵我們看看lock 、unlock 與我們今天的主角AQS有什麼關係。
主體框架
AQS提供了一個依賴FIFO(先進先出)等待隊列的阻塞鎖和同步器的框架。該類是一個抽象類。其中暴露出來的方法主要用來操作狀態和類別判斷。這些方法我們不需要考慮阻塞問題,因為在AQS中調用這些方法的地方會處理阻塞問題
方法 | 描述 |
---|---|
boolean tryAcquire(int args) | 嘗試獲取獨占鎖 |
boolean tryRelease(int args) | 嘗試釋放獨占鎖 |
int tryAcquireShared(int args) | 嘗試獲取共用鎖 |
boolean tryReleaseShared(int args) | 嘗試釋放共用鎖 |
boolean isHeldExclusively() | 當前線程是否獲得了獨占鎖 |
其他方法有AQS類實現。在AQS中實現的方法會調用到上面的抽象方法。正常子類是已內部類方式呈現的。這樣的好處可以做到封閉式的同步屬性。AQS內部實現的方法大概介紹
方法 | 描述 |
---|---|
void acquire(int args) | 獲取獨占鎖,內部調用tryAcquire方法, |
void acquireInterruptibly(int args) | 響應中斷版本的acquire |
boolean tryAcquireNanos(int args , long nanos) | 響應中斷+超時版本的acquire |
void acquireShared(int args) | 獲取共用鎖,內部調用tryAcquireShared方法 |
void acquireSharedInterruptibly(int args) | 響應中斷版本的獲取共用鎖 |
boolean tryAcquireSharedNonos(int args,long nanos) | 響應中斷+超時獲取共用鎖 |
boolean release(int args) | 釋放獨占鎖 |
boolean releaseShared(int args) | 釋放共用鎖 |
Collection getQueuedThreads() | 獲取同步隊列上的線程集合 |
原理解析
AQS內部是通過一個雙向鏈表來管理鎖的(俗稱CLH隊列)。
當前程嘗試獲取鎖失敗時,會將當前線程包裝成AQS內部類Node對象加入到CLH隊列中,並將當前線程掛起。當有線程釋放自己的鎖時AQS會嘗試喚醒CLH隊列中head後的直接後繼的線程。AQS的status我們可以根據他來做成不同的需求。這個後續再說。下麵我們已ReentrantLock來說明下AQS原理。
- 上面標註的是ReentrantLock中的lock方法。這個方法表示去上鎖。瞭解Lock的都知道這個方法會一直阻塞住知道上鎖成功才會執行完。而ReentrantLock.lock方法實際上的sync對象去上鎖的。而sync在ReentrantLock中有公平鎖和非公平鎖兩種。
- 在AQS中預設的是非公平鎖,即隨機喚醒線程。
- 通過上面繼承關係我們發現了我們今天的主角-AbstractQueueSynchronizer 。
- NonfairSync實現了兩個方法lock、tryAcquire方法。其中lock就是通過狀態位實現鎖機制的。0-未上鎖;1-已上鎖 。 lock的邏輯就是如果上鎖成功會將狀態置為1且設置獨占模式的所屬線程為當前線程。否則調用acquire嘗試獲取鎖。
獨占鎖
AQS數據結構
- AQS裡面主要是狀態位的管理。下麵我們看看包含的屬性
Class AbstractQueuedSynchronizer{
/*隊列中的頭結點,無實際意義,head的後繼節點才是隊列中的第一個節點*/
private transient volatile Node head;
/*隊列中的尾節點*/
private transient volatile Node tail;
/*隊列中的狀態,上鎖解鎖 可以擴展成不同的狀態 。 AQS實際上也是對該欄位的管理。子類中通過get set compare方法對state的管理*/
private volatile int state;
}
CLH數據結構
- 上面我們瞭解到會將線程包裝成Node對象加入到雙向鏈表(CLH)中。下麵我們看看Node的結構吧
static final class Node {
/*共用模式的標記*/
static final Node SHARED = new Node();
/*獨占模式的標記*/
static final Node EXCLUSIVE = null;
/*隊列等待狀態-取消*/
static final int CANCELLED = 1;
/*隊列等待狀態-喚醒*/
static final int SIGNAL = -1;
/*隊列等待狀態-條件等待*/
static final int CONDITION = -2;
/*隊列等待狀態-廣播*/
static final int PROPAGATE = -3;
/*隊列等待狀態,取值範圍就是上面的等待狀態之一*/
volatile int waitStatus;
/*前驅節點*/
volatile Node prev;
/*後繼節點*/
volatile Node next;
/*節點對應的線程:綁定關係*/
volatile Thread thread;
/*TODO*/
Node nextWaiter;
/*判定是否是共用模式*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/*獲取當前節點的前驅節點,如果沒有前驅節點拋出NullPointerException*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/*用於創建雙向鏈表中的Head節點,其實Head節點就是一個標誌並不會與線程掛鉤。相當於一個隊列的預設頭節點。或者用來創建共用模式的節點。因為共用模式的節點就是無參構造*/
Node() {
}
/*將線程包裝成Node對象加入隊列中,源碼中是用來添加Thread至隊列*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/*常用語加入條件狀態隊列中TODO*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
acquire實現步驟
-
上面我們瞭解到Lock中實現lock的底層是AQS的acquire實現的。
-
通過查看源碼我們大概能瞭解到其上鎖的流程,
- 首先嘗試獲取鎖
- 獲取鎖失敗後,將當前線程包裝成Node對象添加到CLH隊列中
- 自行阻塞當前線程,等待隊列喚醒自己
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter
/**
* 通過Node對象的構造函數構造Node對象添加到CLH隊列中
* 這個方法主要是雙向鏈表的操作。C++的同學應該會很容易理解
*/
private Node addWaiter(Node mode) {
/*當前線程加入隊列後此時是沒有後繼節點的,且已獨占模式訪問的
*所以這裡加入的Node在上一不傳入的是Node.EXCLUSIVE,這裡就表示
*是已獨占模式進行上鎖從而進行加入隊列的
*/
Node node = new Node(Thread.currentThread(), mode);
/*獲取隊列中的最後一個Node節點;這裡是進行快速插入測試。
*預設隊列已經在堆積Node節點了這個時候直接將節點追加到tail里。
*其實這裡和enq()方法是一樣的邏輯。只不過enq裡面會進行等待隊列
*正常才會加入
*/
Node pred = tail;
if (pred != null) {
/*隊列已經產生線程等待就會將當前node節點的前驅節點只為tail
*的複製節點
*/
node.prev = pred;
/*基於CAS(內部UnSafe實現)設置尾部為node節點*/
if (compareAndSetTail(pred, node)) {
/*原本的tail節點的後繼節點自然就是node節點*/
pred.next = node;
/*到這裡node節點就已經加入了CLH隊列中*/
return node;
}
}
/*邏輯同上,不在贅述*/
enq(node);
return node;
}
acquireQueued
- 這裡傳的Node是我們上一步剛剛添加到隊尾的節點。為什麼不直接用tail節點呢?我們仔細觀察發現tail的修飾
private transient volatile Node tail;
- 我們知道
volatile
是記憶體可見的。什麼叫記憶體可見。我們的屬性變數是存儲在記憶體中的。每次有線程啟動訪問這個類的時候都會複製記憶體中屬性值到自己線程中。所以在多線程情況下修改了這個屬性就會出現問題因為A線程修改了值但是B線程並無法感知還是以原先的值進行交互。這就是典型的多線程帶來的問題。而volatile
做到了的線程感知。當A線程修改了tail後立馬B線程就感知到了。但是這並不能徹底的解決多併發的問題。這裡我們簡單介紹下這個關鍵字 - 經過上面簡單闡述高併發場景,所以這裡不能直接用tail。因為這個時候tail很有可能已經不是我們的tail的。這裡直接傳遞Node節點是非常明智的選擇。而且是final修飾的。更加保證了使我們上一步驟添加到隊尾的那個節點
/**
* 再次嘗試獲取鎖,對中斷不敏感。
*/
final boolean acquireQueued(final Node node, int arg) {
/*失敗標誌位*/
boolean failed = true;
try {
/*線程是否被打斷標誌位*/
boolean interrupted = false;
/**/
for (;;) {
/*獲取當前想成包裝的Node節點的前驅節點*/
final Node p = node.predecessor();
/*如果前驅節點是head節點表示當前節點在隊首可以嘗試
*獲取下鎖,這裡為什麼是嘗試獲取呢因為這個時候可能鎖
*還被其他線程占著。這裡嘗試獲取純粹就是試試機會
*/
if (p == head && tryAcquire(arg)) {
/*成功獲取到鎖,說明我們試一試的心態成功了。
*人生也一樣,總得試一試萬一成功了呢。看源碼還
*能學到人生道理呢。劃重點
*/
/*這個時候在tryAcquire中已經被當前線程占用了鎖了。
*我們這裡不需要擔心其他線程會搶占,這個時候我們
*需要將當前線程從隊列中踢出,直接將當前線程置為
*head節點。setHead方法也很簡單,將node的前驅節
*點置為null,因為head是首位,首位之前不應該在
*有節點了,然後線程也被銷毀了
*/
setHead(node);
/*p節點是老的head節點這個時候已經不需要了。
*這裡jdk的操作是將next至為null, 這樣p節點
*就成為不可達狀態,接下來的命運就是等待被GC。
*這裡我們不是將p置為null的原因是我們p=null ,
*只是將p指向null, 但是原先的head的那個Node的
*地址任然通過Node進行指向,GC是無法回收的。好好理解下*/
p.next = null; // help GC
/*這裡我們已經獲取了。而且成功上了鎖。所以這裡就
* 無法取消獲取了,而且我們已經將Node剔除了,也
* 沒有必要再進行取消獲取操作了。所以在finnally中
* 就沒必要執行了*/
failed = false;
/*返回線程是否被中斷狀態*/
return interrupted;
}
/*如果當前線程對應的Node節點不是head的後繼節點或者
* 沒有獲取到鎖,這個時候我們開始阻塞線程*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
/*取消當前線程對應的Node節點在隊列中排隊。這裡可以
*理解成棄權操作。這裡取消會順便遍歷之前的節點如果
* 有棄權的這裡會一併操作掉
*/
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
/**
* 在失敗獲取鎖的情況下判斷是否需要對線程進行阻塞並同意修改線程
* 在隊列中狀態。如果前驅節點是SIGNAL狀態那麼node節點就進入
* 準備狀態。前驅節點CANEL狀態需要剔除。如果是CONDITION或者
* PROGAGATE狀態,在ReentrantLock中我們暫時不考慮這兩者情況,
* 所以這裡就強制轉換為SIGNAL狀態
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/*獲取前驅節點的狀態*/
int ws = pred.waitStatus;
/*如果前驅節點是等待通知狀態,那麼當前節點需要等待前驅
* 結點被喚醒,所以這裡需要被阻塞
*/
if (ws == Node.SIGNAL)
return true;
/*如果前驅節點>0,即為canclled狀態*/
if (ws > 0) {
//這裡其實和cancelAcquire邏輯差不多,需要將取消的節點從隊列中剔除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*剩下的情況,統一將節點狀態更正為等待通知狀態*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
/**
* 阻塞當前線程,等待被喚醒
*/
private final boolean parkAndCheckInterrupt() {
/*這裡就是阻塞線程,並等待LockSupport.unpark喚醒*/
LockSupport.park(this);
/*在park之後我們需要Thread.interrupted恢復下線程的中斷狀態,
* 這樣下一次park才會生效。否則下一次的park不會生效的
*/
return Thread.interrupted();
}
cancelAcquire
/**
* 將node節點之前(包括當前node)取消狀態的全部剔除
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
/*剔除操作需要解綁node和thread關係*/
node.thread = null;
/*獲取node的前驅節點*/
Node pred = node.prev;
/*大於0就是取消狀態*/
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
/*這裡直接置為取消狀態,是為了方便其他線程進行取消是的操作,
* 也是為了方便跳躍該節點
*/
node.waitStatus = Node.CANCELLED;
/*如果node是隊尾的haul,那麼將隊尾設置成node的前驅結點*/
if (node == tail && compareAndSetTail(node, pred)) {
/*將隊尾的pred節點的後繼節點置空,這是一個隊列的標準要求*/
compareAndSetNext(pred, predNext, null);
} else {
//如果是非隊尾節點
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
/*pred節點狀態如果是有效節點且不是head,將pred的後繼
* 節點指向node的後繼節點。這裡和C++指針指向是一個道理*/
Node next = node.next;
if (next != null && next.waitStatus <= 0)
/*node的後繼節點是有效節點且不是取消狀態,進行替換*/
compareAndSetNext(pred, predNext, next);
} else {
/*
* 這裡就是對上面提到的阻塞進行放行。裡面
* 實際上是LockSupport.unpark進行放行的。
* 這個時候我們通過上面的if知道,這個時候在以下幾種場景出現
* 1、pred==head
* 2、pred是取消狀態
* 3、pred.thread==null 即不是有效節點
* 以上這些情況都表示pred不是能進行喚醒的節點,我們
* 這裡理解為不是標準節點。這個時候為了保證隊列的活躍性,
* 我們需要喚醒後繼節點,實際上就是node的後繼節點。
*/
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 在上面代碼中當代碼執行到
unparkSuccessor(node)
這一塊時就會去喚醒node節點。但是我們的canelAcquire方法是為了取消node節點之前取消狀態的節點的。這樣就會與我們功能違背。命名方法是為了剔除canel節點。現在確實去喚醒node節點。這裡我們上面shouldParkAfterFailedAcquire
方法中在狀態>0時回去自動剔除這些節點的。這樣就實現了canelAcquire方法的功能了。所以我們不需要糾結。
ps: 源碼終究是源碼,考慮的是非常全面的。
if (ws > 0) {
//這裡其實和cancelAcquire邏輯差不多,需要將取消的節點從隊列中剔除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
unparkSuccessor
/**
* 喚醒node節點
*/
private void unparkSuccessor(Node node) {
/*獲取當前節點的狀態*/
int ws = node.waitStatus;
/*對狀態進行判斷*/
if (ws < 0)
/*若果小於0,則進行強制糾偏為0*/
compareAndSetWaitStatus(node, ws, 0);
/*獲取當前節點的後繼節點*/
Node s = node.next;
/*判斷*/
if (s == null || s.waitStatus > 0) {
/*後繼節點為有效節點且狀態>0 , 這裡即為CANCELLED狀態,
* 則將該節點在CLH中剔除,併進行斷層連接*/
s = null;
/*這裡和向前去除取消狀態的前驅節點一樣,只不過這裡是向後
*至於為什麼是從後向前呢,是為了避免高併發帶來的節點不一
* 致性。因為從node開始往後的話,很有可能後面會被其他
* 線程修改了。因為添加節點的往後添加的。所以從後往前的話這樣能保證數據一致。但是這樣就會導致其他線程添加的節點是無法訪問到的。這一點和數據一致性比較還是前者比較重要。此次獲取不到沒關係,在獲取鎖的時候jdk使用的是for迴圈。會不停的檢查隊列中節點是否可以被喚醒的。這裡我們理解是一個定時器。所以一次獲取不到節點沒關係。總有一次會被喚醒。
*/
for (Node t = tail; t != null && t != node; t = t.prev)
/*head節點狀態應該是0,所以這裡最後s就是head.所以後面釋放* 的就是head的後繼節點。*/
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
/*這裡對應的是parkAndCheckInterrupt中的
* LockSupport.lock(this)方法。unpark
* 之後parkAndCheckInterrupt方法就會執行到Thread.interrupted
* 併進行返回,這個時候回返回true*/
LockSupport.unpark(s.thread);
}
acquire
- 到這裡acquire執行步驟我們按照方法維度一一進行閱讀了。我們大概梳理下就是第一步獲取鎖,獲取失敗就會加入隊列,這個時候該線程會被阻塞,在加入隊列的過程中會進行針對隊列進行無效節點去除(取消狀態或者參數null等情況)。保證隊列里的node都是有效且活躍的節點。這個過程會保證隊列是運轉的。如果加入隊列順利的話下一步就是自行的中斷線程進行掛起
Thread.currentThread().interrupt();
,其實執行到這一步就表示這個線程已經不需要了。被取消了。後續會將這個線程作廢。
下麵貼出一個來自於博客園大神的原理圖
release
- 獲取獨占鎖的邏輯還是很複雜的,裡面涉及到操作雙向鏈表的操作,如果沒有接觸過C++應該還是很吃力的。其實在獲取的邏輯中已經牽涉了釋放的邏輯。在我們喚醒node的後繼節點其實也是釋放邏輯中的重頭戲。
public final boolean release(int arg) {
/*會調用tryRelease,這個方法是有子類實現的。我們在ReentrantLock
* 中應該是非公平鎖實現的tryRelease。這個方法後面會說。
* 這裡我們需要提一點的:當一個線程獲取到鎖時,它對應的Node是
* 不會再隊列中的。所以這裡釋放我們可以理解成喚醒Head的後繼節點。
* 這裡就和上面喚醒node的後繼節點一樣了。所以你會看到同樣的
* 方法*unparkSuccessor(h)
*/
if (tryRelease(arg)) {
/*獲取CLH隊列中的隊首節點*/
Node h = head;
if (h != null && h.waitStatus != 0)
/*喚醒head節點的後繼節點*/
unparkSuccessor(h);
return true;
}
return false;
}
- 這裡需要解釋下為什麼會對head節點進行判斷。因為AQS中head預設的null。那麼head是什麼創建的呢。是在我們上面加鎖的時候加入,在加入隊列後需要進行前驅結點判斷的時候創建head的。這個時候的head沒有設置狀態。那麼這個狀態是預設0的。所以上面判斷只需要判空就行了。但是為了嚴謹JDK進行雙重判斷了。
private transient volatile Node head;
- 所以這裡需要對head進行判空。
tryRelease
- 其實在上面acquire步驟講解中,我們漏掉了
tryAcquire
方法的閱讀。目的是為了和tryRelease
方法進行合併講解。因為這兩個方法都是交由子類實現的。放在一起講我們更加能理解設計 。 在ReentrantLock中tryAcquire是有非公平鎖的nonfairTryAcquire實現的
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
/*首先獲取獨占鎖的state*/
int c = getState();
if (c == 0) {
/*c==0表示當前獨占鎖沒有被任何線程占用。這個時候是可以加鎖的*/
if (compareAndSetState(0, acquires)) {
/*設置當前擁有次所的線程為當前線程*/
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
/*因為這個判斷,實現了可重入式的鎖,這樣一個線程可以重覆上鎖操作。*/
/*c!=0表示已有線程占用。如果是當前線程的表示被重入了。那麼這個獨占鎖state就會繼續累加。這裡的state是AQS的state和Node裡面waitStaus是兩回事。在這裡累加在釋放方法里就是遞減。這樣對比我們就容易理解了。這裡的status不同的實現有著不同的定位功能*/
int nextc = c + acquires;
if (nextc < 0) // overflow
/*這裡的判斷著實沒有看懂。希望大神指點。*/
throw new Error("Maximum lock count exceeded");
/*CAS設置state*/
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
/*看完tryAcquire中遞增的操作,我們就能理解這裡遞減的邏輯了*/
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
/*c==0表示這個線程因為可重入的上鎖方式,完全的釋放的獨占鎖。這個時候才可以被別的線程占用*/
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
共用鎖
-
共用鎖的實現主要應用場景就是在讀場景。獨占鎖應用場景就是寫場景。這個在
ReentrantReadWriteLock
類中使用了這兩種場景。上面獨占鎖我們通過ReentrantLock
閱讀了一遍。下麵我們通過ReentrantReadWriteLock
來體驗下共用鎖的邏輯吧。 -
共用鎖邏輯有所變動。但是裡面涉及到的方法在獨占鎖中都提到了。下麵我們會提及下未提到的方法。公用的方法聰明的你應該是閱讀明白了。
- 同樣
tryAcquireShared
方法這裡暫時不看。到後面和釋放方法一起閱讀。我們先來通過doAcquireShared
方法為入口進行閱讀
獲取共用鎖
doAcquireShared
/**
* 這個方法仔細看其實和獨占鎖acquire是一樣的邏輯。只不過方法全都提到方* 法內部了。
* addWaiter和獨占鎖中是一個方法
* 後面的for迴圈也是一樣的,如果是head的後繼節點則會執嘗試獲取鎖,並替* 換head。並且如果線程阻塞過就會自行中斷線程等操作。所以看完獨占鎖在* 學習共用鎖就很容易了。兩者雖有不同但是還是及其相似的
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
/*如果前驅節點是head節點就會去嘗試獲取鎖,有可能會成功*/
int r = tryAcquireShared(arg);
if (r >= 0) {
/*獲取成功就會將節點剔除,從而head節點指向最新節點*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
/**
* progagate表示當前共用鎖的容量
* node 表示當前線程對應的Node
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 在方法外部已經確保了Progagate>=0
* progagate=0表示當前共用鎖已經無法被獲取了。所以這裡條件
* 之一 progagte>0
* 1、progagate>0 那麼就會查看隊列中後繼節點是否符合條件,如果符* 合的 則通過doReleaseShared方法進行喚醒隊列中head的後繼節點
* 2、head==null 表示AQS還沒有創建head這個時候出發釋放的方法是為* 了讓釋放這個過程啟動。內部實現因為是for迴圈。相當於監聽head節點
* 3、head.waitStatus<0 表示在doReleaseShared被設置成
* Node.PROGAGATE屬性了。釋放鎖的時候會設置head的狀態從
* SIGNAL置為0,也會從0置為PROGAGATE。head節點預設的狀態也是0,
* 所以這裡的head狀態小於0只可能是被另外一個線程釋放資源是
* 執行了置為PROGAGATE的代碼了。雖然progagate==0但是只是
* 獲取那會是0在高併發場景下會被改變的。既然另外一個線程釋放
* 資源那麼這裡自然就可以去喚醒隊列線程去嘗試獲取。這裡條件判斷
* 我們後面整個邏輯講完會重新梳理下這個地方
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
/*獲取head*/
int ws = h.waitStatus;
/*
*head節點狀態預設是0,所以在隊列中第一次應該是進入
*下麵的if中並且設置head節點為傳播狀態;設置成傳播狀
*態的目的是為了方便對應上面我們方法中的
*判斷h.waitStatus < 0 。這樣就會去喚醒head節點
*的後繼節點了。這個時候可能會失敗但是共用就是讓他
*們儘可能的獲取。所以這裡設置傳播狀態。也有可能
*會經過shouldParkAfterFailedAcquire方法將傳播
*狀態糾偏為SIGNAL狀態,也就是後面會被糾正過來。這個
*時候需要和shouldParkAfterFailedAcquire對比,
*shouldParkAfterFailedAcquire是遇到SIGNAL狀態對
*後繼節點進行阻塞,而在這裡是遇到SIGNAL狀態就進行釋放
*/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
/*與獨占鎖一樣*/
unparkSuccessor(h);
}
/*這裡就是設置傳播狀態,與setHeadAndPropagate方法對應*/
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
釋放共用鎖
- tryReleaseShared同理是交由子類實現的。後面我們通過ReentrantReadWriteLock類來看這兩個方法實現邏輯。最終AQS的釋放邏輯還是放在的doReleaseShared方法上。
doReleaseShared
- 在上面閱讀獲取共用鎖時,設置head節點後會檢查後繼節點,判斷是否需要喚醒的時候就是doReleaseShared 。 所以這個方法這裡也不需要說了。
tryAcquireShared
/**
* 與讀鎖不衝突的前提下獲取寫鎖,有剩餘的前提下會一直獲取直至獲取成功,
* 獲取失敗返回 -1
* 獲取成功返回 1
*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
/*exclusiveCount就是c與獨占鎖容量的一個與運算。共用容量2^16-1
*所以只要c!=0 exclusiveCount(c)就!=0,另一個條件時判斷是否
*是當前線程。這個也是可重入式鎖的憑證
*/
/*
* 讀鎖和寫鎖是互斥的,所以這裡如果其他線程已經獲取了寫鎖,那麼
* 讀鎖就沒法獲取了。
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
/*sharedCount就是獲取共用鎖的容量*/
int r = sharedCount(c);
/*readerShouldBlock就是判斷是否需要對該節點進行阻塞,只要是有
*效節點且是共用節點就不阻塞;讀鎖寫鎖是一個32位表示的,高位寫
*鎖低位讀鎖,SHARED_UNIT是低16位,所以這裡就是增加讀鎖次數*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
/*表示第一次讀*/
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
/*第一次讀的線程重覆讀,累計線程讀取次數*/
firstReaderHoldCount++;
} else {
/*實際上就是一個ThreadLocal管理讀的次數。和上面firstReader作用一樣。*/
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
/*高併發場景下CAS疊加次數不一定會成功,這個時候需要*fullTryAcquireShared再次獲取讀鎖,這個方法邏輯和上面可以說是
*一樣的。那麼為什麼他叫full ,因為裡面用了迴圈確保在有剩餘的條件
*下一隻獲取讀鎖。不會因為CAS的問題獲取不到*/
return fullTryAcquireShared(current);
}
tryReleaseShared
/**
* 這裡只要有讀鎖存在就會返回false ,這裡有個疑問,
* 如果返回false那麼AQS的release就無法去釋放隊列。這種情況
* 是因為隊列本身是活躍的。會按順序釋放鎖的。而讀鎖的釋放
* 其實在tryReleseShared里就釋放了。讀鎖其實就是計數。
* 這裡會在ReentrantReadWriteLock章節詳細解說
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 當前線程是第一個獲取讀鎖的。這裡會加讀的次數一直遞減。
//當前線程全部釋放完了,就接觸當前線程的占位
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//這裡針對非第一個獲取讀鎖的線程進行釋放。顯示次數的釋放
//完全釋放後就丟棄線程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//這裡和上面的fullTryAcquireShared對應。迴圈釋放一直到釋放成功為止
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
總結
AQS是jdk中併發類的一個底層原理。好多jdk的併發類都是基於此實現的。AQS其實就是一個框架。簡單總結幾句話
- AQS是併發的一個基類
- 內部維護了FIFO隊列
- 擁有兩種模式: 獨占模式(寫鎖)、共用模式(讀鎖)
內部state就是表示鎖的狀態。不同的實現可以有不同的定義。
ReentrantLock : 純粹鎖的狀態 +1、-1
Semaphore : 鎖的個數
CountDownLatch: 計數器,一個標誌位