摘要:當你使用java實現一個線程同步的對象時,一定會包含一個問題:你該如何保證多個線程訪問該對象時,正確地進行阻塞等待,正確地被喚醒? 本文分享自華為雲社區《JUC中的AQS底層詳細超詳解,剖析AQS設計中所需要考慮的各種問題!》,作者: breakDawn 。 java中AQS究竟是做什麼的? ...
摘要:當你使用java實現一個線程同步的對象時,一定會包含一個問題:你該如何保證多個線程訪問該對象時,正確地進行阻塞等待,正確地被喚醒?
本文分享自華為雲社區《JUC中的AQS底層詳細超詳解,剖析AQS設計中所需要考慮的各種問題!》,作者: breakDawn 。
java中AQS究竟是做什麼的?
當你使用java實現一個線程同步的對象時,一定會包含一個問題:
你該如何保證多個線程訪問該對象時,正確地進行阻塞等待,正確地被喚醒?
關於這個問題,java的設計者認為應該是一套通用的機制
因此將一套線程阻塞等待以及被喚醒時鎖分配的機制稱之為AQS
全稱 AbstractQuenedSynchronizer
中文名即抽象的隊列式同步器 。
基於AQS,實現了例如ReentenLock之類的經典JUC類。
AQS簡要步驟
- 線程訪問資源,如果資源足夠,則把線程封裝成一個Node,設置為活躍線程進入CLH隊列,並扣去資源
- 資源不足,則變成等待線程Node,也進入CLH隊列
- CLH是一個雙向鏈式隊列, head節點是實際占用鎖的線程,後面的節點則都是等待線程所對應對應的節點
AQS的資源state
state定義
AQS中的資源是一個int值,而且是volatile的,並提供了3個方法給子類使用:
private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } // cas方法 compareAndSetState(int oldState, int newState);
如果state上限只有1,那麼就是獨占模式Exclusive,例如 ReentrantLock
如果state上限大於1,那就是共用模式Share,例如 Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
已經有CAS方法了,為什麼資源state還要定義成volatile的?
對外暴露的getter/setter方法,是走不了CAS的。而且setter/getter沒有被synchronized修飾。所以必須要volatile,保證可見性
這樣基於AQS的實現可以直接通過getter/setter操作state變數,並且保證可見性,也避免重排序帶來的影響。比如CountDownLatch,ReentrantReadWriteLock,Semaphore都有體現(各種getState、setState)
對資源的操作什麼時候用CAS,什麼使用setState?
volatile的state成員有一個問題,就是如果是複合操作的話不能保證複合操作的原子性
因此涉及 state增減的情況,採用CAS
如果是state設置成某個固定值,則使用setState
AQS的CLH隊列
為什麼需要一個CLH隊列
這個隊列的目的是為了公平鎖的實現
即為了保證先到先得,要求每個線程封裝後的Node按順序拼接起來。
CLH本質?是一個Queue容器嗎
不是的,本質上是一個鏈表式的隊列
因此核心在於鏈表節點Node的定義
除了比較容易想到的prev和next指針外
還包含了該節點內的線程
以及 waitStatus 等待狀態
4種等待狀態如下:
- CANCELLED(1): 因為超時或者中斷,節點會被設置為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
- SIGNAL(-1):後繼節點的線程處於等待狀態,而當前節點的線程如果釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行
- CONDITION(-2) : 點在等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()後,改節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中
- PROPAGATE(-3) : 表示下一次共用式同步狀態獲取將會無條件地傳播下去
- INIT( 0):
入隊是怎麼保證安全的?
入隊過程可能引發衝突
因此會用CAS保障入隊安全。
private Node enq(final Node node) { //多次嘗試,直到成功為止 for (;;) { Node t = tail; //tail不存在,設置為首節點 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //設置為尾節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
出隊過程會發生什麼?
一旦有節點出隊,說明有線程釋放資源了,隊頭的等待線程可以開始嘗試獲取了。
於是首節點的線程釋放同步狀態後,將會喚醒它的後繼節點(next)
而後繼節點將會在獲取同步狀態成功時將自己設置為首節點
註意在這個過程是不需要使用CAS來保證的,因為只有一個線程能夠成功獲取到同步狀態
AQS詳細資源獲取流程
1. tryAcquire嘗試獲取資源
AQS使用的設計模式是模板方法模式。
具體代碼如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 發現中斷過,則觸發中斷異常 selfInterrupt(); }
即AQS抽象基類AbstractQueuedSynchronizer給外部調用時,都是調的acquire(int arg)方法。這個方法的內容是寫死的。
而acquire中,需要調用tryAcquire(arg), 這個方法是需要子類實現的,作用是判斷資源是否足夠獲取arg個
(下麵部分代碼註釋選自: (2條消息) AQS子類的tryAcquire和tryRelease的實現_Mutou_ren的博客-CSDN博客_aqs tryacquire )
ReentrantLock中的tryAcquire實現
這裡暫時只談論一種容易理解的tryAcuire實現,其他附加特性的tryAcquire先不提。
裡面主要就做這幾件事:
- 獲取當前鎖的資源數
- 資源數為0,說明可以搶, 確認是前置節點是頭節點,進行CAS試圖爭搶,搶成功就返回true,並設置當前線程
- 沒搶成功,返回false
- 如果是重入的,則直接set設置增加後的狀態值,狀態值此時不一定為0和1了
protected final boolean tryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); // state==0代表當前沒有鎖,可以進行獲取 if (c == 0) { // 非公平才有的判斷,會判斷是否還有前驅節點,直接自己為頭節點了或者同步隊列空了才會繼續後面的鎖的獲取操作 if (!hasQueuedPredecessors() //CAS設置state為acquires,成功後標記exclusiveOwnerThread為當前線程 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 當前占有線程等於自己,代表重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 出現負數,說明溢出了 if (nextc < 0) // throw new Error("Maximum lock count exceeded"); // 因為是重入操作,可以直接進行state的增加,所以不需要CAS setState(nextc); return true; } return false; }
2.addWaiter 添加到等待隊列
當獲取資源失敗,會進行addWaiter(Node.EXCLUSIVE), arg)。
目的是創建一個等待節點Node,並添加到等待隊列
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 通過CAS競爭隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 競爭隊尾失敗,於是進行CAS頻繁迴圈競爭隊尾 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
3. acquireQueued迴圈阻塞-競爭
併在 "處於頭節點時嘗試獲取資源->睡眠->喚醒“中迴圈。
當已經跑完任務的線程釋放資源時,會喚醒之前阻塞的線程。
當被喚醒後,就會檢查自己是不是頭節點,如果不是,且認為可以阻塞,那就繼續睡覺去了
(下麵代碼註釋部分選自AQS(acquireQueued(Node, int) 3)–隊列同步器 - 小窩蝸 - 博客園 (http://cnblogs.com) )
final boolean acquireQueued(final Node node, int arg) { // 標識是否獲取資源失敗 boolean failed = true; try { // 標識當前線程是否被中斷過 boolean interrupted = false; // 自旋操作 for (;;) { // 獲取當前節點的前繼節點 final Node p = node.predecessor(); // 如果前繼節點為頭結點,說明排隊馬上排到自己了,可以嘗試獲取資源,若獲取資源成功,則執行下述操作 if (p == head && tryAcquire(arg)) { // 將當前節點設置為頭結點 setHead(node); // 說明前繼節點已經釋放掉資源了,將其next置空,好讓虛擬機提前回收掉前繼節點 p.next = null; // help GC // 獲取資源成功,修改標記位 failed = false; // 返回中斷標記 return interrupted; } // 若前繼節點不是頭結點,或者獲取資源失敗, // 則需要判斷是否需要阻塞該節點持有的線程 // 若可以阻塞,則繼續執行parkAndCheckInterrupt()函數, // 將該線程阻塞直至被喚醒 // 喚醒後會檢查是否已經被中斷,若返回true,則將interrupted標誌置於true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 最終獲取資源失敗,則當前節點放棄獲取資源 if (failed) cancelAcquire(node); } }
4.shouldParkAfterFailedAcquire 檢查是否可以阻塞
該方法不會直接阻塞線程,因為一旦線程掛起,後續就只能通過喚醒機制,中間還發生了內核態用戶態切換,消耗很大。
因此會先不斷確認前繼節點的實際狀態,在只能阻塞的情況下才會去阻塞。
並且會過濾掉cancel的線程節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取前繼節點的等待狀態 int ws = pred.waitStatus; // 如果等待狀態為Node.SIGNAL(-1),則直接返回true即可以阻塞 // 因為這說明前繼節點完成資源的釋放或者中斷後,會主動喚醒後繼節點的(這也即是signal信號的含義),因此方法外面不用再反覆CAS了,直接阻塞吧 if (ws == Node.SIGNAL) return true; // 如果前繼節點的等待值大於0即CANCELLED(1),說明前繼節點的線程發生過cancel動作 // 那就繼續往前遍歷,直到當前節點的前繼節點的狀態不為cancel if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 前繼節點的等待狀態不為SIGNAL(-1),也不為Cancel(1) // 那麼只能是PROPAGATE(-3)或者CONDITION(-2)或者INITIAL(0) // 直接設置成SIGNAL,下一次還沒CAS成功,就直接睡覺了 // 因此在前面所有節點沒辯護的情況下, 最多一次之後就會返回true讓外面阻塞 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
5.parkAndCheckInterrupt() 阻塞線程
使用LockSupport.park來阻塞當前這個對象所在的線程
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 確認是否是中斷導致的park結束,並清除中斷標記 return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
lockSupport.park()和普通的wait|notify都有啥區別?
- 面向的主體不一樣。LockSuport主要是針對Thread進進行阻塞處理,可以指定阻塞隊列的目標對象,每次可以指定具體的線程喚醒。Object.wait()是以對象為緯度,阻塞當前的線程和喚醒單個(隨機)或者所有線程。
- 實現機制不同。雖然LockSuport可以指定monitor的object對象,但和object.wait(),兩者的阻塞隊列並不交叉。可以看下測試例子。object.notifyAll()不能喚醒LockSupport的阻塞Thread.
如果還要深挖底層實現原理,可以詳細見該鏈接
簡而言之,是用mutex和condition保護了一個_counter的變數,當park時,這個變數置為了0,當unpark時,這個變數置為1。
底層用的C語言的pthread_mutex_unlock、pthread_cond_wait 、pthread_cond_signal ,但是針對了mutex和_cond兩個變數進行加鎖。
6.總體流程圖
代碼中頻繁出現的interruptd中斷標記是做什麼用的?
對線程調用 t1.interrupt();時
會導致 LockSupport.park() 阻塞的線程重新被喚醒
即有兩種喚醒情況: 被前置節點喚醒,或者被外部中斷喚醒
這時候要根據調用的acuire類型決定是否在中斷發生時結束鎖的獲取。
上面介紹的是不可中斷鎖。
在parkAndCheckInterrupt中,當park結束阻塞時時,使用的是 Thread.interrupted() 而不是 .isInterrupted() 來返回中斷狀態
因為前者會返回線程當前的中斷標記狀態同時清除中斷標誌位(置為false)
外層CAS迴圈時, 就不會讓線程受中斷標記影響,只是記錄一下是否發生過中斷
當獲取鎖成功後,如果發現有過線程中斷,則會觸發中斷異常,
之後便由獲取鎖的調用者自己決定是否要處理線程中斷。像下麵這樣:
reentrantLock.lock(); try { System.out.println("t1"); TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); }
那麼另一種情況就是可中斷鎖了。
ReentranLock有一個lockInterruptibly()方法就是這種情況
線程被喚醒時,如果發現自己被中斷過,就會直接拋異常而不是繼續獲取鎖
因此如果你的線程對中斷很敏感,那麼就是用可中斷鎖,及時響應。
如果不敏感,也要註意處理中斷異常。
AQS的詳細資源釋放流程
首先AQS提供的模板方法為release方法。
核心邏輯就是對資源進行嘗試性釋放
如果成功,就喚醒等待隊列中的第一個頭節點
public final boolean release(int arg) { // 是否釋放成功,tryRelease是子類要實現的方法 if (tryRelease(arg)) { Node h = head; // 判斷頭節點是否正在阻塞中,是的話喚醒 if (h != null && h.waitStatus != 0) // 喚醒頭節點 unparkSuccessor(h); return true; } return false; }
看一下ReteenLock中的tryRelease實現
就是減一下資源值。
當資源值清零,則說明可以解除了對當前點的占用
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 設置當前占用線程為null setExclusiveOwnerThread(null); } // 不需要CAS,因為只有持有鎖的人才能做釋放,不擔心競爭 setState(c); return free; }
AQS如何實現公平和非公平?
以ReteenLock為例,它內部tryAcquire有兩種同步器的實現
- 非公平同步器NonfairSync
- 公平同步器FairSync
公平同步器和非公平同步器都是ReentrantLock中定義的一個static內部類
ReentrantLock根據配置的不同,使用這2個同步器做資源的獲取和同步操作
他們二者的提供的lock操作,本質上就是AQS的acquire(1)
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); }
二者在公平和非公平的實現區別上,就是喚醒線程後,只有等待隊列的隊頭節點才會嘗試競爭。
而非公平鎖是只要喚醒了就可以嘗試競爭。
因此核心區別在於hasQueuedPredecessors方法!
公平和非公平鎖的優點和缺點
- 饑餓問題
非公平鎖可能引發“饑餓”,即一個線程反覆搶占獲取,而其他線程一直拿不到。
而公平鎖不存在饑餓,只要排上隊了就一定能拿到
- 性能問題
非公平鎖的平均性能比公平鎖要高, 因為非公平鎖中所有人都可以CAS搶占,如果同步塊的時間非常短,那麼可能所有人都不需要阻塞,減少CPU喚醒線程的開銷,整體的吞吐效率會高點,CPU也不必取喚醒所有線程,會減少喚起線程的數量。
性能測試中公平鎖的耗時是非公平鎖的94.3倍, 總切換次數是133倍
Lock類是預設公平還是非公平?
預設是非公平的,原因就是上文考慮的性能差距過大問題, 因此公平鎖只能用於特定對性能要求不高且饑餓發生概率不大的場景中。
獨占模式和共用模式的AQS區別
- 名字上, 共用模式都會帶一個shard
- 返回值上,獨占模式相關acuire方法放回的是boolean類型, 而共用模式返回的是int值
- 核心概念上, 區別在於同一時刻能否有多個線程可以獲取到其同步狀態
- 釋放時,共用模式需要用CAS進行釋放, 而獨占模式的release方法則不需要,直接setState即可。
- 共用模式應用:信號量、讀寫鎖
共用模式信號量Semaphore的Sync同步器
先實現了一個靜態內部類Sync
和上面的RLock類一個區別在於需要state初始化值,不一定為1
Sync(int permits) { setState(permits); }
再繼承實現了FairSync和NoFairSync
使用CAS實現值的增加或者減少
公平/非公平的區別同樣是hasQueuedPredecessors的判斷
protected int tryAcquireShared(int acquires) { for (;;) { // 隊頭判斷,公平鎖核心 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; // 信號量不足,直接返回負數 if (remaining < 0 || // 能搶成功,返回修改後的值,搶失敗則for迴圈繼續 compareAndSetState(available, remaining)) return remaining; } }
AQS如何處理重入
通過current == getExclusiveOwnerThread()來判斷併進行非CAS的setState操作
if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 出現負數,說明溢出了 if (nextc < 0) // throw new Error("Maximum lock count exceeded"); // 因為是重入操作,可以直接進行state的增加,所以不需要CAS setState(nextc); return true; }
註意處理重入問題時,如果是獨占鎖,是可以直接setState而不需要CAS的,因為不會競爭式地重入!
ReentrantLock釋放時,也會處理重入,關鍵點就是對getState() - release後的處理,是否返回true或者false
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 只有資源數為0才會解鎖 // 才算釋放成功,否則這鎖還是占住了 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
AQS如何響應超時
AQS提供的方法中帶有Nanos尾碼的方法就是支持超時中斷的方法。
核心邏輯就是每次阻塞前,確認nanosTimeout是否已經超時了。
每次喚醒時,將nanosTimeout減去阻塞所花的時間,重新確認,並修改lastTime
關鍵部分見下圖
spinForTimeoutThreshold是什麼?
首先這個值是寫死的1000L即1000納秒
1000納秒是個非常小的數字,而小於等於1000納秒的超時等待,無法做到十分的精確,那麼就不要使用這麼短的一個超時時間去影響超時計算的精確性,所以這時線程不做超時等待,直接做自旋就好了。