閱讀本文前,需要儲備的知識點如下,點擊鏈接直接跳轉。 [java線程詳解](https://www.cnblogs.com/star95/p/17583193.html) [Java不能操作記憶體?Unsafe瞭解一下](https://www.cnblogs.com/star95/p/1761943 ...
閱讀本文前,需要儲備的知識點如下,點擊鏈接直接跳轉。
java線程詳解
Java不能操作記憶體?Unsafe瞭解一下
一文讀懂LockSupport
AQS簡介
AQS即AbstractQueuedSynchronizer
的簡稱,翻譯過來就是抽象隊列同步器的意思,由Doug Lea大神開發的。說他抽象是因為它提供的是一個基於隊列的同步器框架,定義了一些基礎功能方法(控制狀態變數,獲取和釋放同步狀態方法以及入隊出隊操作等),具體場景使用只需要根據需要實現對應的方法即可。我們在鎖(比如ReentrantLock)、併發工具類(比如CountDownLatch)都可以看到內部類繼承了AbstractQueuedSynchronizer
,也就是說AQS才是這些類的基石。說了這麼多,感覺把抽象說的越抽象了,下麵我們從幾個慄子入手吧。
註意:本文使用的JDK版本為JDK8,AQS的代碼非常巧妙和經典,很多細節和模塊都可以單獨拉出來寫一篇文章,很多細節問題建議自行閱讀和思考。
本篇文章主要講獨占模式的應用和原理分析,關於共用模式不再這裡展開細講。
應用舉例
ReentrantLock的使用
3個線程獲取同一個鎖,獲得後休眠1秒結束,所以3個線程間隔1秒列印輸出。
public class ReentrantLockTest {
public static void main(String[] args) {
lockTest();
}
public static void lockTest() {
ReentrantLock lock = new ReentrantLock();
PrintThread t1 = new PrintThread(lock, "t1");
PrintThread t2 = new PrintThread(lock, "t2");
PrintThread t3 = new PrintThread(lock, "t3");
t1.start();
t2.start();
t3.start();
}
}
class PrintThread extends Thread {
private Lock lock;
public PrintThread(Lock lock, String threadName) {
this.lock = lock;
this.setName(threadName);
}
@Override
public void run() {
lock.lock();
try {
System.out.println(String.format("time:%s,thread:%s,result:%s",
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()),
Thread.currentThread().getName(), "get lock success"));
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
列印結果如下
time:2021-04-13 13:53:55,thread:t1,result:get lock success
time:2021-04-13 13:53:56,thread:t2,result:get lock success
time:2021-04-13 13:53:57,thread:t3,result:get lock success
是因為這3個線程執行時都要先獲取鎖執行完邏輯後再釋放鎖,而ReentrantLock
是獨占鎖,相當於這3個線程間是串列執行的,相互間隔1秒(註意,線程的先後執行順序不一定是固定的,但線程內有休眠1秒的操作,所以至少相隔1秒)
CountDownLatch的使用
main線程創建一個CountDownLatch latch = new CountDownLatch(1),3個線程持有該CountDownLatch
並調用CountDownLatch
的await()
方法,直到main線程休眠2秒後執行CountDownLatch
的countDown()
方法,釋放一個同步狀態使得數量值為0,喚醒等待在await()
的線程繼續執行。
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ConcurrentThread concurrentThread1 = new ConcurrentThread(latch, "t1");
ConcurrentThread concurrentThread2 = new ConcurrentThread(latch, "t2");
ConcurrentThread concurrentThread3 = new ConcurrentThread(latch, "t3");
concurrentThread1.start();
concurrentThread2.start();
concurrentThread3.start();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " countDown...");
latch.countDown();
}
}
class ConcurrentThread extends Thread {
private CountDownLatch latch;
public ConcurrentThread(CountDownLatch latch, String threadName) {
this.latch = latch;
this.setName(threadName);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is ready...");
try {
latch.await();
System.out.println(Thread.currentThread().getName() + " is executing...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
列印結果如下(註意,線程的先後執行順序不一定是固定的)
t1 is ready...
t3 is ready...
t2 is ready...
main countDown...
t1 is executing...
t3 is executing...
t2 is executing...
這三個線程在執行時先列印“...ready”後,然後等待在await()方法上,由於CountDownLatch
是共用鎖,而初始的state是1,main線程休眠2秒後調用了countDown()方法會將state置成0,會喚起等待隊列里的所有後繼線程,所以會相繼列印“executing...”。
這裡就兩個簡單的使用慄子,不過可以看出,均是在多線程場景中使用,而且代碼里並沒有AQS相關的影子,那是因為在這些類的內部有內部類去繼承了AbstractQueuedSynchronizer
,由這些內部類處理業務邏輯,底層核心邏輯是由AQS框架提供的(線程排隊、線程等待、線程喚醒、超時處理、中斷處理等),子類調用API實現核心邏輯,AQS在多線程中使用發揮真正的作用。下麵我們一步步來分析AQS。
AQS原理分析
類UML圖
圖中紅色連接的線表示內部類,藍色線表示繼承
我們首先來看看AQS相關的URL類圖吧,從JDK的源碼中我們發現,AQS真正出現的在兩個地方,第一個就是lock鎖(比如ReentrantLock等),第二個就是併發工具類(比如CountDownLatch、Semaphore等),由這些內部類繼承了AQS去實現相關的方法輔助主類實現相關控制,但是我們在JDK的源碼中可以看先這些lock鎖和併發工具類應在了很多的地方,比如隊列、線程池及併發類相關的一些地方。
上圖把各類的方法展示出來了,我們可以看到繼承了AQS類的那些Sync內部類都只用覆蓋實現一小部分方法即可完成特定的功能。因為在AQS類中已經實現了大部分底層通用的邏輯,對於其子類來說只用實現部分對外暴露的方法即可,同樣我們也可以繼承AQS實現自定義的鎖或者工具類。
類及方法介紹
AbstractOwnableSynchronizer
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer
類里包含一個Thread
的屬性並提供了get、set方法,這個Thread對象就是當前持有鎖的線程。線程能否支持重入功能就是判斷當前線程和持有鎖的線程是不是同一個對象,只是同步狀態state值增加而已,等線程主動釋放鎖後該同步狀態state值數量值減少。
該類使用了abstract
修飾,但是類中並沒有抽象方法,目的就是這個類不對外直接使用,而get、set方法使用了protected final修飾,說明方法可被子類使用但不能被子類重寫。
另外,exclusiveOwnerThread是用了transient
修飾,說明這個屬性不參與序列化,因為Thread沒有實現Serializable
介面,不能進行序列化處理,另外進程是系統資源分配的最小單位,線程是進程執行的最小單位,線程是由操作系統分配和調度的,所以不能將線程進行序列化。
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
類也是一個抽象類,繼承自AbstractOwnableSynchronizer
,也就擁有了設置持有鎖線程的能力,同樣該類使用了abstract
修飾,目的就是這個類不對外直接使用,需要具體子類去繼承後使用。雖然他實現了序列化介面,但是其內部類Node
並未實現序列化介面,所以在AbstractQueuedSynchronizer
類的屬性head、tail都是Node類型並且加了transient
關鍵字不參與序列化,從以上我們大概就能猜到如果將AQS序列化它只保存一些基本屬性的值,並不包含線程以及隊列,基本在使用過程中也不會對其進行序列化,具體的屬性和隊列後續會詳細介紹,下麵列舉一些AQS類里重要的方法和屬性。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 獨占模式,嘗試獲取同步狀態,立即返回獲取成功或失敗,需要子類實現
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 獨占模式,嘗試釋放同步狀態,立即返回獲取成功或失敗,需要子類實現
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共用模式,嘗試獲取共用鎖,需要子類實現,
* 立即返回獲取的數量值
* 0:獲取鎖成功,沒有剩餘資源
* > 0:獲取鎖成功,並且有剩餘資源
* < 0:獲取失敗
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共用模式,嘗試釋放共用鎖,需要子類實現,釋放成功返回true
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 當前線程是否獨占資源,需要子類實現,true:是,false:否
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 入隊
*/
private Node enq(final Node node) {...}
/**
* 將當前線程封裝成Node邏輯里也有調入隊enq方法的邏輯
*/
private Node addWaiter(Node mode){...}
/**
* 【重要】對外提供的獲取鎖的方法,子類調用此方法執行獲取鎖的動作,
* 內部調用包含了獲取鎖、排隊、阻塞、中斷等操作
*/
public final void acquire(int arg) {...}
/**
* 【重要】對外提供的釋放鎖方法,子類調用此方法執行釋放鎖的動作,
* 內部包含更新state、喚醒等待隊列的第一個等待節點
*/
public final boolean release(int arg) {...}
/**
* 【重要】雙向隊列頭結點
*/
private transient volatile Node head;
/**
* 【重要】雙向隊列尾結點
*/
private transient volatile Node tail;
/**
* 【重要】同步狀態,控制線程是否可獲取資源,是用一個整型的變數表示,
* 加了volatile,保證了該變數在多線程間的可見性
*/
private volatile int state;
/**
* 靜態內部類,將等待鎖的線程封裝成Node進行排隊
*/
static final class Node {
...
}
// 其他方法、屬性、內部類未列出
...
}
該類中沒有抽象方法,但是上面提到的幾個方法都是拋了UnsupportedOperationException
異常,說明需要具體子類實現時去覆寫,這也正是獨占模式和共用模式要對應實現的方法。
head、tail兩個Node
類型的屬性分別表示了雙向鏈表的隊頭和隊尾,如果線程不能獲取到鎖則進入隊列排隊並且等待喚醒或者超時中斷,後續細講。
整型的state屬性比較核心,表示同步狀態,就是用它來控制線程是否需要阻塞。上面的代碼沒有列出其他方法,部分方法源碼後文會詳細分析。
Node類
AQS類中有一個非常重要的內部類Node
,我們稱作它為節點,這個內部類是AQS框架線程排隊的基石,非常核心,按照註釋上所說Node類是CLH
隊列的一種變種(CLH隊列是一種單向隊列,這裡不做介紹,感興趣可自行搜索),Node類是一種雙向隊列,內部有Node prev,Node next屬性,分別表示前驅節點和後繼節點,還有一個Thread
屬性,表示封裝的當前線程,所以AQS的隊列其實就是以Node節點形成的一個雙向鏈表,結構如下:
我們看下Node類的屬性和方法類圖。
- 節點模式:
Node SHARED = new Node()來表示共用模式,Node EXCLUSIVE = null表示獨占模式。 - 節點等待狀態waitStatus:
這個屬性欄位比較重要,因為它是AQS控制線程執行的關鍵欄位,這個值的改變是採用CAS操作的。他的取值只有以下幾種。
(1)1:CANCELLED,取消狀態,可能情況有節點等待超時被取消或者被中斷,那麼代表這個Node節點中包含的線程未獲取到鎖,由具體業務判斷是否需要執行後續邏輯。
(2)0:初始化值,創建節點的時候預設會初始化,0也就是他的預設值。
(3)-1:SIGNAL,表明該節點以後的線程需要等待喚醒,後續節點的線程可以阻塞。
(4)-2:CONDITION,表明該節點的線程需要等待,由ConditionObject
實現條件隊列會用到。
(5)-3:PROPAGATE,一般在共用模式下會有該狀態,表明頭節點獲取到了共用資源,可向後傳播,等待隊列里的其他節點也都可以獲取共用資源。 - Thread thread屬性對象
AQS框架將當前正在獲取同步狀態的線程包裝成Node節點的一個屬性,根據Node節點的waitStatus狀態來控制當前線程是被喚醒繼續嘗試獲取鎖還是線程取消。
隊列
AQS內部的兩個變數head代表隊列的頭結點,tail代表隊列的尾節點,是一個雙向隊列,如Node類所介紹,head和tail指向如下圖所示。
註意:head節點比較特殊,隊列里需要喚醒的線程是從head節點的next節點開始, 在隊列初始化時放的是一個new Node()對象,屬性thread並沒有賦值,後續排隊的線程被喚醒時會把他自己設置成head並且將thread屬性設置成null。所以head節點可以這麼理解,head節點初始化時是一個虛擬節點,沒有用處,只是充當一個隊頭標識,當隊列中有線程排隊時,說明head節點已經是獲取到鎖的線程的節點了,等這個線程執行完需要喚醒head.next之後的線程繼續執行,這就是排隊和喚醒的邏輯。
同步狀態
在AQS類中,有一個state屬性,描述如下
/**
* The synchronization state.
*/
private volatile int state;
state是整型變數,叫同步狀態,也可叫加鎖的次數,使用了volatile修飾,保證了線程間的可見性,所有的線程是否可獲取到鎖資源都是基於對這個欄位值的操作來確定。對於獨占鎖來說,初始情況下state=0,表示當前資源空閑,可被線程獲取到鎖,如果state>0,表示已經有線程占用資源,後續的線程(非持有鎖的線程)需要進入隊列,不會存在<0的情況,因為如果釋放鎖的過程中到state=0時就已將exclusiveOwnerThread置成null了,所以多次調用釋放鎖的方法時,如果exclusiveOwnerThread不是當前線程的話,則會拋出IllegalMonitorStateException
異常。
公平鎖&非公平鎖
- 公平鎖:
多個線程獲取鎖時按照請求的先後順序排隊,不存在插隊的情況。
常用的實現方式如下:
final void lock() {
acquire(1);
}
acquire方法是AQS的獲取鎖方法,多線程競爭獲取鎖時會排隊。
- 非公平鎖:
多個線程獲取鎖時,首先不是按照請求的先後順序排隊,而且先嘗試去獲取鎖,也就是搶占式獲取,如果獲取到了那麼該線程就是持有鎖的線程可以執行他的邏輯,如果沒有獲取到鎖,那麼就會走入隊排隊流程,所以有可能會出現後到的線程可能比等待隊列里的線程先獲取到鎖。
常用的實現方式如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
通過代碼可以看到非公平的情況下,線程會先嘗試使用cas方式設置state,如果設置成功則獲取到鎖,設置失敗則走入隊排隊等待獲取鎖流程。
所以,這兩個的區別在於是否會搶占獲取鎖。 設置成公平鎖時,每個線程獲取鎖的概率是一樣的,每個線程會先看等待隊列是否為空,若為空,直接獲取鎖,若不為空,自動排隊等候獲取鎖;設置成非公平鎖時,所有的線程都會優先去嘗試爭搶鎖,不會按順序等待,若搶不到鎖,再用類似公平鎖的方式獲取鎖。
那為什麼會這樣設計呢,這兩種分別使用在什麼場景下呢。
- 恢復掛起的線程到真正鎖的獲取還是有時間差的,從開發人員來看這個時間微乎其微,但是從CPU的角度來看,這個時間差存在的還是很明顯的。所以非公平鎖能更充分的利用CPU的時間片,儘量減少CPU空閑狀態時間
- 使用多線程很重要的考量點是線程切換的開銷,當採用非公平鎖時,當1個線程請求鎖獲取同步狀態,然後釋放同步狀態,因為不需要考慮是否還有前驅節點,所以剛釋放鎖的線程在此刻再次獲取同步狀態的概率就變得非常大,所以就減少了線程的開銷
貌似上面說的兩點都是非公平鎖比較好,但是非公平鎖也有他的問題,有可能導致排隊的線程長時間排隊也沒有機會獲取到鎖,這就是傳說中的“鎖饑餓”,如果使用的是帶有超時時間的方式獲取鎖,則可能導致排隊中的線程大面積超時獲取鎖失敗。
那什麼時候用公平鎖,什麼時候用非公平鎖?
如果為了更高的吞吐量,非公平鎖是比較合適的,因為節省很多線程切換時間,吞吐量自然就上去了; 否則那就用公平鎖,大家按請求先後順序排隊使用。
獨占鎖加鎖流程
以ReentrantLock公平鎖方式不帶超時不可中斷獲取鎖為例。
整體流程如下,先瞭解整體流程有助於我們理解,會涉及到子流程,流程圖單獨給出。
主要獲取鎖代碼如下,這也是調用獲取鎖的入口,邏輯看代碼註釋:
public final void acquire(int arg) {
/*
(1)tryAcquire方法由子類實現嘗試獲取鎖的邏輯,
返回true就不走後面的判斷,表示獲取到了鎖,返回false表示未獲取到鎖,走後續入隊等待流程
(2)addWaiter方法是將當前線程封裝成Node對象返回,裡面也有關於入隊的操作
(3)acquireQueued方法主要是先再嘗試獲取一次鎖,
獲取到了就返回是否被中斷標識,獲取不到則需要確認線程是否需要阻塞以及阻塞操作,
最終返回釋放被中斷標識
(4)selfInterrupt是將當前線程中斷,因為LockSupport.park阻塞線程時是不會響應中斷的,
但是通過Thread.interrupted()這個方法可以獲取到當前線程是否被中斷標識
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這裡tryAcquire(arg)嘗試獲取鎖的方法由AQS子類實現,其餘三個方法(acquireQueued、addWaiter、selfInterrupt)都是AQS來實現的,這也是個模板方法設計模式。
tryAcquire(arg)流程,嘗試獲取鎖的具體實現邏輯。
代碼如下:
protected final boolean tryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
// 獲取AQS的同步狀態值state
int c = getState();
// state是0則表示沒有線程持有鎖,可以嘗試去獲取鎖
if (c == 0) {
/*
(1)hasQueuedPredecessors方法判斷隊列里當前線程的Node之前是否還有其他Node,
返回true說明有其他線程也在等待,嘗試獲取鎖失敗,返回false說明前面沒有線程等待,
可以繼續執行邏輯,這裡先判斷了state=0沒有直接cas操作而是再判斷隊列里是否有等待的線程,
充分體現了公平性
(2)如果compareAndSetState(0, acquires)也設置成功,則說明加鎖成功,
將exclusiveOwnerThread設置成當前線程,返回true表示獲取鎖成功
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/*
這個else if邏輯主要就是可重入的判斷和處理,
如果持有鎖的線程是當前線程則state= state + acquires
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node.EXCLUSIVE)流程,將線程包裝成Node節點的邏輯,有入隊排隊的邏輯,返回包裝的Node節點。
代碼如下:
private Node addWaiter(Node mode) {
// 將當前節點封裝成Node對象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
/*
(1)隊列不為空的情況下,先嘗試將node插入到隊尾,
compareAndSetTail返回成功則說明node變成隊列成功,直接返回,否則需要走入隊流程
(2)主要是將當前node的prev指向原tail,原tail節點的next指向當前node上,
這樣就完成了node的入隊
*/
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 嘗試直接插入隊尾失敗了就走入隊邏輯
enq(node);
// 返回當前線程封裝成的Node對象
return node;
}
private Node enq(final Node node) {
// 入隊使用的for無限迴圈,是一個自旋的過程,直到成功
for (;;) {
Node t = tail;
/*
如果隊尾tail為空,則說明隊列還未初始化,先初始化head節點,然後tail也指向head,
完成初始化隊列,雖然只有一個節點,但head和tail都有了指向
*/
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
如果隊尾tail不為空,則採用cas方式將當前node插入隊尾,
成功則返回,否則一直自旋嘗試直到成功
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
線程阻塞邏輯,acquireQueued(final Node node, int arg)具體實現流程
代碼如下:
final boolean acquireQueued(final Node node, int arg) {
/*
failed變數表示獲取鎖是否失敗,初始化為true表示失敗,只有在獲取到鎖時failed為false,
為true時表示獲取鎖過程中異常,finally塊里的判斷是否需要取消當前這個線程獲取鎖的相關邏輯,
包括隊列的調整以及後繼Node里線程的喚醒
*/
boolean failed = true;
try {
/*
interrupted變數表示當前線程是否被中斷的標識,true:線程被中斷,false:線程未被中斷,
這個方法整體返回的就是這個值,用來確定後續是否要調用selfInterrupt()方法中斷當前線程
*/
boolean interrupted = false;
// for無限迴圈,自旋處理
for (;;) {
// 取當前節點的前一個節點
final Node p = node.predecessor();
// 如果前一個節點是head並且tryAcquire嘗試獲取到鎖了,則將當前線程設置成head
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
這裡就是線程阻塞等待的核心了,嘗試獲取鎖失敗時,判斷是否需要阻塞,
需要阻塞的話就調用LockSupport.park方法阻塞當前線程
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/*
在不可中斷模式下,failed的值始終會是false,因為雖然被中斷了,
但是當前線程還是獲取到鎖了,走正常的後續處理邏輯,finally這裡的邏輯就不會走了
*/
if (failed)
cancelAcquire(node);
}
}
嘗試獲取鎖失敗時是否需要阻塞當前線程判斷流程,shouldParkAfterFailedAcquire(Node pred, Node node)邏輯
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
/*
當前線程的前一個節點的waitStatus狀態是Node.SIGNAL,
則說明前一個線程如果獲取到鎖並且執行完成後釋放了鎖需要喚醒後續節點,
從另一個角度來說當前線程自然要阻塞等待了
*/
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
當前線程的前一個節點的waitStatus狀態是Node.CANCELLED時,說明前驅節點已經取消獲取鎖了
需要從當前節點一直向前查找知道節點沒有被取消,
然後把找到的第一個沒有被取消的節點的next指向當前節點,這樣就把當前節點前取消狀態的都刪掉
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
前一個節點的waitStatus狀態還是0,或者是共用鎖的傳播狀態PROPAGATE時,
則會把前一個節點的waitStatus狀態改成Node.SIGNAL
所以是後一個節點排隊時把前一個節點waitStatus改成Node.SIGNAL,
表示前一個節點執行完釋放鎖了要走喚醒後續節點的邏輯,
依次類推,隊列里只有最後一個Node節點的waitStatus是0,因為它沒有後續節點,
也不需要執行喚醒操作,其餘在沒有被中斷狀態下應該都是Node.SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
/*
阻塞當前線程調的就是LockSupport.park,原理之前文章有講過,這就是線程阻塞等待的核心實現了
線程被LockSupport.park了不會響應中斷,
如果線程被中斷了需要用Thread.interrupted()獲取當前線程的中斷標識
*/
LockSupport.park(this);
return Thread.interrupted();
}
獨占鎖釋放鎖流程
以ReentrantLock釋放鎖為例,釋放鎖不區分公平鎖還是非公平鎖,釋放的邏輯是一樣的,整體流程如下。
release(int arg)這是AQS里定義的模板方法,主要釋放鎖代碼如下,這也是調用釋放鎖的入口,邏輯看代碼註釋:
public final boolean release(int arg) {
// 嘗試釋放鎖,由子類實現具體邏輯
if (tryRelease(arg)) {
Node h = head;
// 頭節點不為null,並且waitStatus!=0,說明要喚醒後續節點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
// 返回鎖是否空閑標識,其實就是tryRelease(arg)的返回結果
return false;
}
tryRelease(int releases)是嘗試釋放鎖的邏輯,AQS定義的方法,預設是拋異常,子類根據具體場景實現邏輯。以下是ReentrantLock的內部類Sync的具體實現,返回true表示現在鎖空閑了,返回false表示鎖現在還被占用。
protected final boolean tryRelease(int releases) {
// 計算釋放releases後,新的state值
int c = getState() - releases;
// 如果當前釋放鎖的線程不是持有鎖的線程直接拋異常,只有持有鎖的線程才能釋放鎖
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
/*
如果釋放releases後,新的state是0,那麼說明鎖就空閑了,將free標識賦值為true,
然後將exclusiveOwnerThread賦值為null
*/
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 設置state新值,只有持有鎖的線程才可操作,無需cas
setState(c);
return free;
}
unparkSuccessor(Node node) 這個方法就是關鍵的喚醒後續等待隊列里的線程關鍵方法。通過調用LockSupport.unpark方法將阻塞的線程喚醒繼續執行。
private void unparkSuccessor(Node node) {
// node是當前釋放鎖的線程,它的waitStatus如果<0就把他置成0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
如果node的next節點是null或者取消了,則從隊尾往前查找,一直找到node節點,
獲得第一個未被取消的節點
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到第一個未被取消的節點,並喚醒線程,使其繼續執行
if (s != null)
LockSupport.unpark(s.thread);
}
這裡有一個比較關鍵的地方,如果node的next節點是null或者取消狀態,則從隊尾往前查找,一直找到node節點,為什麼會從後往前遍歷?
這裡考慮了併發的場景,從後往前不會導致node丟失,具體我們可以從addWaiter方法看。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這裡的第6、7、8行就是關鍵了,先設置prev節點,這樣就保證了所有的節點都有前驅節點,第7、8這兩行沒有保證原子操作,如果cas成功了,但是剛好cpu時間片切換,第8行未執行,那麼pred的next就是空了,所以從前往後可能會漏節點,從後往前是完整的隊列,舉個慄子:
(1)假如釋放鎖的線程是tail尾節點,剛好unparkSuccessor時,執行到node.next為空的判斷之前,cpu時間片切換了。
(2)有個線程調用了addWaiter方法,把新node的prev指向了tail,cas設置尾節點也成功了,就在這兒cpu又切換了,那麼原tail節點的next還沒有設置。
(3)cpu再切回到unparkSuccessor的node.next為空判斷時,這時候他的next是null(因為next指針還沒有指向新node節點),實際上後面還有一個node節點,這樣就會漏掉節點數據了。
如果從後往前的話,每一個node的前驅肯定是有值的,但是高併發情況下不能保證每一個node的後繼節點也能及時連接上。所以從後往前就確保了能遍歷到每一個節點。
也就是從等待隊列里阻塞的方法恢復執行,返回線程是否中斷標識,然後再繼續嘗試獲取鎖。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
到這裡,基本上已經把獨占鎖的獲取鎖和釋放鎖的流程和邏輯都講完了,AQS基本已經把大部分的核心功能幫我們寫好了,我們只用去寫或利用他已有的方法,實現我們自己的邏輯即可,就比如以上講到的獨占鎖的獲取和釋放,其實我們自己僅僅具體實現了tryAcquire(int acquires)、tryRelease(int releases)這兩個方法,花了大篇幅講的都是AQS的流程和邏輯,由此,真正的感受到了AQS的巧妙設計。
超時&中斷處理
理解了上面的獨占鎖的加鎖流程,對於超時和中斷處理的理解就很容易了,這兩種其實都有線程中斷拋出異常邏輯,另外將帶超時時間獲取鎖和可響應中斷獲取鎖這兩種方式關於獲取結果交給開發人員自行處理,既體現了設計的靈活性也可讓開發人員根據具體業務場景具體處理,還是以ReentrantLock來講解。
超時
關於超時,就是在指定的時間內未獲取到鎖就返回獲取失敗,在指定的時間內獲取到了鎖返回成功,有兩種,一個是嘗試獲取,例如:tryLock(),不管有沒有獲取到立即返回,相當於超時是0,另一種是指定超時時間,如果指定時間未獲取到鎖就返回false,例如:tryLock(long timeout, TimeUnit unit),下麵詳細講解下。
- tryLock()
public boolean tryLock() {
// 入口方法,是以非公平方式嘗試獲取鎖,返回true:獲取成功,false:獲取失敗
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state值是0時,表示暫時鎖空閑,嘗試cas賦值,也可以理解成嘗試加鎖
if (c == 0) {
// cas成功,則說明加鎖成功,設置當前線程為持有鎖的線程,返回true:獲取成功
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 當前線程如果是持有鎖的線程,可重入,判斷並設置state=state+acquires,返回true:獲取成功
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 嘗試沒有獲取到鎖,當前線程也不是持有鎖的線程,直接返回false:獲取失敗
return false;
}
tryLock()的實現邏輯還是挺簡單了,不帶超時相關設置,相當於超時時間是0,要麼立即成功,要麼立即失敗,不涉及複雜的入隊、阻塞、喚醒、取消相關邏輯。單純的看state=0說明空閑cas成功則立即獲取鎖,或者持有鎖的線程是當前線程,這樣就可重入,獲取鎖成功,其他情況均嘗試獲取鎖失敗,直接返回。
- tryLock(long timeout, TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
/*
主入口方法,帶超時時間嘗試獲取鎖,獲取到返回true,未獲取到返回false,
註意還有可能拋出被中斷異常InterruptedException
*/
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 判斷如果線程被中斷,則拋異常
if (Thread.interrupted())
throw new InterruptedException();
//還是先嘗試獲取鎖,獲取成功則返回true,獲取失敗執行後面的doAcquireNanos方法,帶超時等待
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* 這個方法就是帶超時等待獲取鎖的核心實現,
* 大體流程上跟acquireQueued(final Node node, int arg)這個方法差不多
* 邏輯里調用了相同的方法的就不再詳細闡述了,只說不同的核心關鍵邏輯
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 先入隊列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果頭節點是head並且嘗試獲取鎖成功則返回true
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 方法執行到這裡已經超時了,直接返回false
if (nanosTimeout <= 0L)
return false;
/*
以下的邏輯是關鍵實現超時返回的邏輯
先判斷是否需要阻塞,再判斷超時時間是否大於1000納秒即0.001 毫秒,
這個時間可以說非常短了,但對於高速CPU來說還是需要一定的時間,
如果這兩個條件都成功,則阻塞,否則自旋
阻塞調用的是LockSupport.parkNanos(this, nanosTimeout);精確到納秒級的阻塞,
並且第一個參數是this,表明瞭這個線程具體阻塞在哪個對象上,通過jstat可查看到
*/
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 判斷如果線程被中斷,則拋異常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
/*
這裡可能會走,雖然LockSupport.parkNanos不響應中斷,
但是最後的邏輯判斷了當前線程是否中斷的標識,如果中斷了則會拋InterruptedException異常,
那麼failed變數的值還是true,需要走取消的邏輯,將當前線程的Node從隊列去掉相關邏輯處理
*/
if (failed)
cancelAcquire(node);
}
}
中斷
上文已經說過了,如果線程進入等待隊列並且阻塞了,那麼它是不會響應中斷的,雖然阻塞隊列不響應中斷,但是被喚醒後,線程的中斷標識是可以獲取到的,所以可以通過該標識來處理是否需要主動拋異常中斷處理。
需要註意中斷並不是實時感知的,雖然被中斷瞭如果沒有被喚醒,還是需要繼續等待,直到被喚醒後,獲取中斷標識來做處理。
我們還是以ReentrantLock為例,lockInterruptibly()這個就是可以響應中斷的方法。
public void lockInterruptibly() throws InterruptedException {
// sync這個對象繼承了AbstractQueuedSynchronizer,這裡直接調用的是AQS的方法了。
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 先判斷下如果線程已經被中斷了,直接拋出InterruptedException異常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 嘗試獲取鎖沒有成功時,才進入可響應中斷獲取鎖的方法里
doAcquireInterruptibly(arg);
}
/**
* 這個方法就是獲取鎖時可響應中斷核心實現,
* 大體流程上跟tryLock(long timeout, TimeUnit unit)這個方法差不多
* 邏輯里調用了相同的方法的就不再詳細闡述了,只說不同的核心關鍵邏輯
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*
主要的處理就在這裡了,判斷需要阻塞並且阻塞被喚醒後,
如果中斷標識為true則拋出InterruptedException異常
*/
throw new InterruptedException();
}
} finally {
/*
這裡可能會走,如果線程被中斷了,拋出InterruptedException異常後,failed變數還是true
需要走取消的邏輯,將當前線程的Node從隊列去掉相關邏輯處理
*/
if (failed)
cancelAcquire(node);
}
}
AQS的使用
AQS是一個抽象隊列同步框架,支持獨占模式和共用模式,由於AQS是一個抽象類,僅僅需要子類去實現具體的獲取鎖釋放鎖方法,鎖的獲取和釋放入口統一由AQS提供,如下所示。
獨占模式
- 獲取鎖入口
(1)不響應中斷
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(2)響應中斷
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
獨占模式下,不管是否響應中斷,獲取鎖時子類僅需要實現tryAcquire(arg)方法,嘗試獲取資源,成功則返回true,失敗則返回false,其他都由AQS提供。
- 釋放鎖入口
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
獨占模式下,釋放鎖時子類僅需要實現tryRelease(arg)方法,嘗試釋放資源,成功則返回true,失敗則返回false,其他都由AQS提供。
共用模式
- 獲取鎖入口
(1) 不響應中斷
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
(2) 響應中斷
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
共用模式下,不管是否響應中斷,獲取鎖時子類僅需要實現tryAcquireShared(arg)方法,嘗試獲取資源,返回值<0表示失敗;=0表示成功,但沒有剩餘可用資源;>0表示成功,且有剩餘資源,其他都由AQS提供。
- 釋放鎖入口
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
共用模式下,釋放鎖時子類僅需要實現tryReleaseShared(arg)方法,嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false,其他都由AQS提供。
自定義鎖的實現
使用AQS自定義鎖時,子類可以實現Lock介面(因為Lock定義了獲取鎖和釋放鎖的方法,也可以不實現這個介面,自己定義方法),然後實現嘗試獲取鎖和釋放鎖的方法即可。
需求
實現一個獨占不響應中斷不可重入的公平鎖。
分析
獨占鎖需要實現tryAcquire(arg)、tryRelease(arg)這兩個方法。不可重入,則要判斷只要有線程占用鎖,不管是不是當前線程都返回獲取失敗,公平鎖說明嘗試獲取鎖時要先看隊列里是否有等待獲取鎖的Node。
實現
其實也就是ReentrantLock的另一個版本
- 定義一個實現需求的MyLock類。
- 定義MyLock類的加鎖方法lock()和釋放鎖方法unLock()。
- 在MyLock類內部定義一個Sync類繼承AbstractQueuedSynchronizer類,實現tryAcquire(int arg)和tryRelease(int arg)方法。
- MyLock類中定義一個Sync的變數,構造函數中實例化Sync類,在lock方法調用sync.acquire(1),在unlock方法中調用sync.release(1)
這樣鎖的定義和實現都完成了,代碼如下。
public class MyLock {
private Sync sync;
public MyLock() {
sync = new Sync();
}
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (getState() == 1) {
free = true;
setExclusiveOwnerThread(null);
setState(0);
}
return free;
}
}
public final void lock() {
sync.acquire(1);
}
public void unLock() {
sync.release(1);
}
}
測試
- 多個線程獲取鎖
class Test {
public static void main(String[] args) {
MyLock myLock = new MyLock();
List<Thread> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(new Thread(() -> {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "將要加鎖");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加鎖成功");
try {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "執行業務邏輯");
Thread.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解鎖成功");
myLock.unLock();
}
}, "t" + i));
}
list.forEach(Thread::start);
}
}
結果輸出:
2023-06-08T11:35:27.822:t0將要加鎖
2023-06-08T11:35:27.822:t4將要加鎖
2023-06-08T11:35:27.822:t3將要加鎖
2023-06-08T11:35:27.822:t1將要加鎖
2023-06-08T11:35:27.822:t2將要加鎖
2023-06-08T11:35:27.823:t0加鎖成功
2023-06-08T11:35:27.823:t0執行業務邏輯
2023-06-08T11:35:27.828:t0解鎖成功
2023-06-08T11:35:27.828:t4加鎖成功
2023-06-08T11:35:27.828:t4執行業務邏輯
2023-06-08T11:35:27.831:t4解鎖成功
2023-06-08T11:35:27.831:t3加鎖成功
2023-06-08T11:35:27.831:t3執行業務邏輯
2023-06-08T11:35:27.836:t3解鎖成功
2023-06-08T11:35:27.836:t1加鎖成功
2023-06-08T11:35:27.836:t1執行業務邏輯
2023-06-08T11:35:27.837:t1解鎖成功
2023-06-08T11:35:27.837:t2加鎖成功
2023-06-08T11:35:27.837:t2執行業務邏輯
2023-06-08T11:35:27.845:t2解鎖成功
- 線程是否可重入
class Test {
public static void main(String[] args) {
MyLock myLock = new MyLock();
new Thread(() -> {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "將要加鎖");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加鎖成功");
try {
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次加鎖成功");
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "執行業務邏試");
Thread.sleep(new Random().nextInt(10));
myLock.unLock();
}
catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解鎖成功");
myLock.unLock();
}
},"t1").start();
new Thread(() -> {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "將要加鎖");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加鎖成功");
try {
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "執行業務邏試");
Thread.sleep(new Random().nextInt(10));
myLock.unLock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解鎖成功");
myLock.lock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次加鎖成功");
myLock.unLock();
System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次解鎖成功");
}
catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
有兩種可能的輸出:
- t1先獲取鎖成功
這種情況輸出如下,t1先加鎖成功,t2等待,實現了多線程間的加鎖互斥,另外t1加鎖成功後有再次加鎖,發現還是等待,這說明鎖不可重入,功能實現,這兩個線程都將一直等下去。
2023-06-08T11:47:57.016:t1將要加鎖
2023-06-08T11:47:57.017:t1加鎖成功
2023-06-08T11:47:57.016:t2將要加鎖
- t2先獲取鎖成功
這種情況輸出如下,t2先加鎖成功,正常執行業務邏輯後釋放鎖,t2釋放鎖後線程可正常結束。t2釋放了鎖,則t1加鎖成功,當t1想第二次再加鎖時,發現需要等待,鎖不可重入。
2023-06-08T11:49:28.492:t2將要加鎖
2023-06-08T11:49:28.492:t1將要加鎖
2023-06-08T11:49:28.493:t2加鎖成功
2023-06-08T11:49:28.493:t2執行業務邏試
2023-06-08T11:49:28.501:t2解鎖成功
2023-06-08T11:49:28.501:t1加鎖成功
通過這兩個例子,我們可以看出,這種獨占鎖、不可重入的情況下,lock()和unlock()方法必須配對使用,不能連續加鎖和釋放鎖。
JUC包下AQS子類鎖的實現
java.util.concurrent包下有幾個基於AQS實現的鎖,如下所示,有了以上知識基礎,再理解這些鎖是很容易的,瞭解詳細可參考具體源碼實現。
鎖 | 類型 | 描述 |
---|---|---|
ReentrantLock | 獨享鎖 | 可重入鎖 |
ReentrantReadWriteLock | 獨享鎖、共用鎖兼備 | ReadLock是共用鎖,WriteLock是獨享鎖 |
CountDownLatch | 共用鎖 | 不可重覆使用 |
Semaphore | 共用鎖 | 可重覆使用 |
CyclicBarrier | 共用鎖 | 使用ReentrantLock實現的共用鎖,可重覆使用 |
總結
主要講解了AQS的獨占模式,提到了一些共用模式相關的知識,有了獨享模式的基礎,理解共用模式並不難,還有關於Condition相關的知識沒有講,所以關於共用模式和Condition相關的大家可以自行去閱讀源碼,後續有機會也會出相關的文章。
還有另外一個類AbstractQueuedLongSynchronizer
,這個類是AbstractQueuedSynchronizer
的一個變種,只是把state的類型從int變成long了,所有涉及跟這個state相關的操作參數和返回都改成long類型了,理論上使用這個類實現的鎖可以超過Integer.MAX_VALUE的限制,最大的可獲取鎖的次數就變成Long.MAX_VALUE,這個在如多級鎖和需要64位狀態時會非常有用,目前在JDK里並沒有發現使用的地方,而在HikariCP連接池com.zaxxer.hikari.util.QueuedSequenceSynchronizer
這個類內部使用到了這個類,感興趣的可自行閱讀。
AQS的設計確實相當巧妙、邏輯非常嚴謹,在多線程下使用,已儘可能最大限度支持高併發操作,通過對源碼的學習,我們瞭解了鎖的設計,大部分的工作都由AQS完成(包括線程的包裝排隊、阻塞、喚醒、超時處理、中斷處理等),剩下的小部分代碼由開發者根據業務場景具體實現(嘗試獲取鎖,釋放鎖),不得不佩服如此精美巧妙的設計和實現,Doug Lea,我永遠的神!