一、結論 首先說結論:深拷貝出來的對象就是完完全全的新對象,不管是對象本身(id),還是對象中包含的子對象,都和原始對象不一樣; 淺拷貝出來的對象就是外新內舊的對象,對象本身(id)和原始對象完全不同,但是子對象和原始對象的子對象是一樣的。 再補充說下賦值,賦值來的對象就是完完全全的原始對象,只是叫 ...
目錄
- 1 疑點todo和解疑
- 2 AbstractQueuedSynchronizer學習總結
- 3 AQS 簡介
- 4 AbstractQueuedSynchronizer源碼分析
- 4.1 類的繼承關係
- 4.2 類的常量/成員變數
- 4.3 靜態內部類Node
- 4.4 構造函數
- 4.5 核心方法分析
- 4.5.1 核心方法概覽
- 4.5.2 acquire()方法
- 4.5.3 addWaiter()方法
- 4.5.4 enq()方法
- 4.5.5 acquireQueue()方法
- 4.5.6 shouldParkAfterFailedAcquire()方法
- 4.5.7 parkAndCheckInterrupt()方法
- 4.5.8 cancelAcquire()方法
- 4.5.9 unparkSuccessor()方法
- 4.5.10 release()方法
- 4.5.11 acquireSharedInterruptibly()方法
- 4.5.12 doAcquireSharedInterruptibly()方法
- 4.5.13 setHeadAndPropagate()方法
- 4.5.14 doReleaseShared()方法
- 4.5.15 releaseShared()方法
- 5 取消節點移出鏈表分析
- 6 在shared模式中為什麼需要PROPAGATE狀態
1 疑點todo和解疑
同步狀態變數:state就是那個共用資源(private volatile int state;) Lock類繼承AQS類並定義lock()、unLock()的方法,表示獲取鎖和釋放鎖。多線程併發訪問同一個lock實例,lock()方法會cas修改state變數,修改成功的線程獲得鎖,其他線程進入AQS隊列等待。
沒有必要!sync隊列是雙向鏈表結構,出隊時,head交替方式,只需要修改head和head後繼2個節點引用關係;固定head,就要修改head,head後繼,以及head後繼的後繼 共3個節點。顯然前者效率更高
不存在的,因為經過判斷得出此時node就是head的後繼。並且必須由這個取消節點node來喚醒後繼,要不node線程結束後,就沒有線程能夠喚醒隊列里的其他節點了。
先說結果:由搶到鎖的那個線程來喚醒!
上述的場景是存在的,例如在非公平鎖模式中,B線程被A線程喚醒,A結束,B成為head,B去執行tryAcquire(),但此時C線程搶占到鎖,B執行tryAcquire()沒有拿到鎖,再次park阻塞。C線程執行結束後將A喚醒
只有將前置節點狀態改為SIGNAL,才能確保當前節點可以被前置unPark喚醒。也就是說阻塞自己前先保證一定能夠被喚醒。因為代碼中:
獨占模式下,喚醒後繼前先限制:h.waitStatus != 0
共用模式下,喚醒後繼前先限制:h.waitStatus=SIGNAL
表示本線程在獲取資源期間,如果被其他線程中斷,本線程不會因為中斷而取消獲取資源,只是將中斷標記傳遞下去。
When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not "understand" these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue.
- 共用模式:允許多個線程同時獲取資源;當一個節點的線程獲取共用資源後,需要要通知後繼共用節點的線程,也可以獲取了。共用節點具有傳播性,傳播性的目的也是儘快通知其他等待的線程儘快獲取鎖。
- 獨占模式: 只能夠一個線程占有資源,其它嘗試獲取資源的線程將會進入到隊列等待。
- 響應中斷並終止:線程只要被中斷就不會獲取資源:兩種情況的中斷:1、剛嘗試獲取、2、進入隊列中等待,前者立即停止獲取,後者執行取消邏輯,等待節點變為取消狀態
A、B先後進入隊列,狀態都是0。A獲得資源,進入setHeadAndPropagate晉升為head,A進入doReleaseShared嘗試喚醒B時,但B還沒將A改為signal,因為A還是0,A將狀態改為PROPAGATE
2 AbstractQueuedSynchronizer學習總結
2.1 AQS要點總結
對於AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。
- 每一個節點都是由前一個節點喚醒
- 當節點發現前驅節點是head並且嘗試獲取成功,則會輪到該線程運行。
- condition queue中的節點向sync queue中轉移是通過signal操作完成的。
- SIGNAL,表示後面的節點需要運行。
- PROPAGATE:就是為了避免線程無法會喚醒的窘境。因為共用鎖會有很多線程獲取到鎖或者釋放鎖,所以有些方法是併發執行的,就會產生很多中間狀態,而PROPAGATE就是為了讓這些中間狀態不影響程式的正常運行。
2.2 細節分析
2.2.1 插入節點時先更新prev再更新前驅next
//addWaiter():
node.prev = pred; // 1 更新node節點的prev域
if (compareAndSetTail(pred, node)) {
pred.next = node; //2 更新node前驅的next域
return node;
}
//enq():
node.prev = t; // 1 更新node節點的prev域
if (compareAndSetTail(t, node)) {
t.next = node;//2 更新node前驅的next域
return t;
}
//unparkSuccessor():
Node s = node.next; //通過.next來直接獲取到節點的後繼節點,這個節點的後繼的prev一定指向節點本身
//....
if (s != null)
LockSupport.unpark(s.thread);
- addWaiter() 或者enq()插入節點時,都是先更新節點的prev域,再更新它前驅的next域。那麼通過node.next()取到的後繼,後繼的prev域一定是指向node本身。如果先更新next域,在更新prev域時出現異常,那麼通過.next取到不是完整的節點
- unparkSuccessor()喚醒後繼時,Node s = node.next; 通過.next來獲取node的後繼,後繼的prev一定指向node本身
2.2.2 為什麼unparkSuccessor()要從尾部往前遍歷
因為取消節點的next域指向了自身,所以不能從通過next來遍歷,但prev是完整的,所以通過prev來遍歷。
2.2.3 AQS的設計,儘快喚醒其他等待線程體現在3個地方
- 共用鎖的傳播性。
- doReleaseShared()中head改變,會迴圈喚醒head的後繼節點。
- 線程獲取鎖失敗後入隊列並不會立刻阻塞,而是判斷是否應該阻塞shouldParkAfterFailedAcquire,如果前繼是head,會再給一次機會獲取鎖。
3 AQS 簡介
AQS是一個用來構建鎖和同步器的框架。理論參考:JUC同步器框架
三個基本組件相互協作:
- 同步狀態的原子性管理;
- 線程的阻塞與喚醒;
- 隊列的管理;
同步器一般包含兩種方法,一種是acquire,另一種是release。acquire操作阻塞調用的線程,直到或除非同步狀態允許其繼續執行。而release操作則是通過某種方式改變同步狀態,使得一或多個被acquire阻塞的線程繼續執行。
3.1 AQS核心思想
- 如果請求的共用資源空閑,則將當前請求線程設置為有效工作線程,並且將共用資源設置為鎖狀態
- 設計一套機制:【線程如何阻塞等待以及被喚醒時鎖如何分配】?這個機制AQS是用CLH隊列鎖實現的
- CLH隊列鎖:一個虛擬的雙向隊列,AQS是將每條請求共用資源的線程封裝成一個CLH鎖隊列的一個節點(Node)來實現鎖的分配。【嚴格的FIFO隊列,框架不支持基於優先順序的同步】
- 使用一個int成員變數來表示同步狀態,使用volatile修飾保證線程可見性,並使用CAS思想進行值維護。
3.2 對資源的共用方式
兩種方式:
- Exclusive(獨占):只有一個線程能執行。又可分為公平鎖和非公平鎖:
- 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖
- 非公平鎖:當線程要獲取鎖時,無視隊列順序直接去搶鎖,誰搶到就是誰的
- Share(共用):多個線程可同時執行
3.3 AQS數據結構
分析類,首先就要分析底層採用了何種數據結構,抓住核心點進行分析:
- Sync queue,即同步隊列,是雙向鏈表,包括head節點和tail節點,head節點主要用作後續的調度
- Condition queue不是必須的,其是一個單向鏈表,只有當使用Condition時,才會存在此單向鏈表。並且可能會有多個Condition queue
4 AbstractQueuedSynchronizer源碼分析
4.1 類的繼承關係
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
繼承自抽象類:AbstractOwnableSynchronizer,父類提供獨占線程的設置與獲取的方法
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }// 構造函數
private transient Thread exclusiveOwnerThread; //獨占模式下的線程
// 設置獨占線程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 獲取獨占線程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
4.1.1 AQS需要子類重寫的方法
protected boolean tryAcquire(int arg) {//獨占方式獲取鎖
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) { //釋放獨占的鎖
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) { //以共用方式獲取鎖
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {//釋放共用鎖
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {//是否獨占資源
throw new UnsupportedOperationException();
}
關於重寫說明:
目的是將共用資源state的讀寫交給子類管理,AQS專註在隊列的維護以及線程的阻塞與喚醒
4.2 類的常量/成員變數
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
// 頭節點
private transient volatile Node head;
// 尾節點
private transient volatile Node tail;
//0:表示沒有線程獲取到鎖;1表示有線程獲取到鎖;大於1:表示有線程獲得了鎖,且允許重入
private volatile int state;
// 自旋時間
static final long spinForTimeoutThreshold = 1000L;
// 以下跟cas有關
private static final Unsafe unsafe = Unsafe.getUnsafe(); // Unsafe類實例
private static final long stateOffset; // state記憶體偏移地址
private static final long headOffset; // head記憶體偏移地址
private static final long tailOffset; // state記憶體偏移地址
private static final long waitStatusOffset;// tail記憶體偏移地址
private static final long nextOffset; // next記憶體偏移地址
// 靜態初始化塊
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
}
說明:
- 屬性中包含了頭節點head,尾節點tail,狀態state、自旋時間spinForTimeoutThreshold
- AbstractQueuedSynchronizer抽象的屬性在記憶體中的偏移地址,通過該偏移地址,可以獲取和設置該屬性的值
- 同時還包括一個靜態初始化塊,用於載入記憶體偏移地址。
4.3 靜態內部類Node
線程封裝成Node並具備狀態
static final class Node
{
// 模式,分為共用與獨占
static final Node SHARED = new Node();// 共用模式
static final Node EXCLUSIVE = null; // 獨占模式
// 節點狀態
static final int CANCELLED = 1;//表示當前的線程被取消
static final int SIGNAL = -1;//表示當前節點的後繼節點包含的線程需要被運行【被unpark】,
static final int CONDITION = -2;//表示當前節點在等待condition,也就是在condition隊列中
static final int PROPAGATE = -3;//表示當前場景下後續的acquireShared能夠得以執行
volatile int waitStatus;//節點狀態;表示當前節點在sync隊列中,等待著獲取鎖
volatile Node prev; // 指向當前節點的前驅
volatile Node next;// 指向當前節點的後繼
volatile Thread thread;//節點所對應的線程
Node nextWaiter;// 下一個等待者 只跟condition有關
private transient volatile Node head; // 頭節點 懶載入
private transient volatile Node tail; //尾節點 懶載入
private volatile int state; // 同步狀態
// 節點是否在共用模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅節點,若前驅節點為空,拋出異常
final Node predecessor() throws NullPointerException {
Node p = prev;// 保存前驅節點
if (p == null)
throw new NullPointerException();
else
return p;
}
// 無參構造函數
Node() { // Used to establish initial head or SHARED marker
}
// 構造函數
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
關於Node說明:
每個被阻塞的線程都會被封裝成一個Node節點,放入隊列。Node包含了一個Thread類型的引用,並且有自己的狀態:
- CANCELLED:1,表示當前的線程被取消。
- SIGNAL:-1,表示負責unPark後繼【由前一個節點unPark下一個節點】。
- CONDITION:-2,表示當前節點在等待condition,也就是在condition queue中。
- PROPAGATE:-3,表示當前場景下後續的acquireShared能夠得以執行。
- 預設值:0,發生在:1、節點加入到隊列成為tail節點,2、節點成為head,並準備喚醒後繼
4.4 構造函數
protected AbstractQueuedSynchronizer() { } //預設的無參構造
4.5 核心方法分析
4.5.1 核心方法概覽
public final void acquireShared(int arg) {...} // 獲取共用資源的入口(忽略中斷)
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void doAcquireShared(int arg) {...} // AQS中獲取共用資源流程整合
private Node addWaiter(Node mode){...} // 將node加入到同步隊列的尾部
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void setHeadAndPropagate(Node node, int propagate) {...} // 設置 同步隊列的head節點,以及觸發"傳播"操作
private void doReleaseShared() {...} // 遍歷同步隊列,調整節點狀態,喚醒待申請節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {...} // 如果獲取資源失敗,則整理隊中節點狀態,並判斷是否要將線程掛起
private final boolean parkAndCheckInterrupt() {...} // 將線程掛起,併在掛起被喚醒後檢查是否要中斷線程(返回是否中斷)
private void cancelAcquire(Node node) {...} // 取消當前節點獲取資源,將其從同步隊列中移除
4.5.2 acquire()方法
該函數以獨占模式獲取(資源),忽略中斷。
流程如下:
源碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //來到這裡,表示線程拿到鎖,並且讀取到線程的中斷標識為true,調用selfInterrupt()來恢複線程的interrupted中斷標誌(被parkAndCheckInterrupt()擦除了,所以再設置一次)。
}
static void selfInterrupt() {
Thread.currentThread().interrupt();//線程設置interrupted中斷標誌
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
acquire()總結
- 先調用tryAcquire(),由子類實現來嘗試加鎖,如果獲取到鎖,則線程繼續執行;反則,節點加入隊列
- 調用addWaiter(),將調用線程封裝成為一個節點並放入AQS隊列。
- 調用acquireQueued(),先park阻塞等待,直到被unPark喚醒。
- 如果線程被設置中斷,那麼acquire結束前,需要重新設置中斷。
4.5.3 addWaiter()方法
addWaiter:快速添加的方式往sync queue尾部添加節點
// 添加等待者
private Node addWaiter(Node mode) {
// 新生成一個節點
Node node = new Node(Thread.currentThread(), mode);
// 創建臨時引用pred,跟tail指向相同地址
Node pred = tail;
if (pred != null) { // 尾節點不為空,即隊列已經初始化過
// 將node的prev域連接到尾節點
node.prev = pred;
if (compareAndSetTail(pred, node)) { // cas更新tail,指向新創建的node
// 設置尾節點的next域為node
pred.next = node; // 結合 node.prev = pred; 形成雙向鏈表
return node; // 返回新生成的節點
}
}
enq(node); // 隊列還未初始化,或者是compareAndSetTail操作失敗,則進入enq
return node;
}
//關於併發情景說明:
// 從 Node pred = tail; 到 compareAndSetTail(pred, node); 期間,隊列可能插入了新的節點,pred指向的不是最新的tail,那麼compareAndSetTail(pred, node) 就會執行失敗,同時 node.prev = pred; node的前驅也不是最新的tail。
// 通過enq()來解決併發問題,enq()通過自旋+cas來保證線程安全
addWaiter()說明:
- 使用快速添加的方式(失敗不重試)創建新節點並添加到往隊列尾部,更新tail
- 如果隊列還沒有初始化或者cas失敗,則調用enq()插入隊列
4.5.4 enq()方法
// 線程安全地創建隊列、或者將節點插入隊列、
private Node enq(final Node node) {
for (;;) { // 自旋+cas,確保節點能夠成功入隊列
Node t = tail;//尾節點
if (t == null) { // 尾節點為空,即還沒被初始化
if (compareAndSetHead(new Node())) // 設置head。 !!!!註意,這裡是new node,沒有使用參數的node,因此head節點不引用任何線程
tail = head; // 頭節點與尾節點都指向同一個新生節點。迴圈繼續,進入else後,參數node將插入到隊列
} else { // 尾節點不為空,即已經被初始化過
node.prev = t; // 將node節點的prev域連接到尾節點
if (compareAndSetTail(t, node)) { // 比較更新tail,node成為新的tail
// 設置尾節點的next域為node
t.next = node; // 結合 node.prev = t; 形成雙向鏈表
return t; // 返回Node的前驅節點
}
}
}
}
//CAS head field. Used only by enq.
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
//CAS head field. Used only by enq.
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
enq()方法總結:
- 功能:cas+自旋方式將節點插入隊列
- 如果隊列未初始化,先創建頭節點head(head不指向任務線程),再將節點插入到隊列(當第一個節點被創建後,隊列實際有兩個節點:head+業務節點)。
- 如果隊列已經初始化,則直接插入隊列
4.5.5 acquireQueue()方法
作用:sync隊列中的節點在獨占且忽略中斷的模式下獲取(資源)
源碼如下:
// sync隊列中的節點在獨占且忽略中斷的模式下獲取(資源):
final boolean acquireQueued(final Node node, int arg) {
// 標誌
boolean failed = true;
try {
// 中斷標識。如果線程喚醒後,中斷標識是true,外層的acquire()將進入selfInterrupt()。
boolean interrupted = false;
// 無限迴圈 :如果前驅不是head,那線程將park阻塞,等待前面的節點依次執行,直到被unPark喚醒
for (;;) {
// 獲取node的前驅,如果前驅是head,則表明前面已經沒有線程等待了,該線程可能成為工作線程
final Node p = node.predecessor();
// 前驅為頭節點並且成功獲得鎖
if (p == head && tryAcquire(arg)) {
setHead(node); // node晉升為head
p.next = null; // 舊head的next域指向null,將會被GC,移出隊列
failed = false; // 設置標誌
return interrupted; //拿到鎖,break迴圈,並返回中斷標識
}
//執行到這裡,前驅非head 或者 前驅是head但獲取鎖失敗,那麼:1、將前驅狀態改為signal 2、當前線程unPark阻塞
//shouldParkAfterFailedAcquire():尋找非取消狀態的前驅,如果狀態為signal返回true 反則,將前驅狀態改為signal、再返回false
//前驅是signal ,執行parkAndCheckInterrupt()後,當前線程park阻塞。一直到線程被unPark喚醒,再返回線程的中斷狀態
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//parkAndCheckInterrupt返回true表明線程中斷狀態為true
//上面if同時成立,才會執行。
interrupted = true; //那麼把中斷標識置為true
}
} finally { //(有異常,在拋出之前執行finally;沒有異常,在return之前執行finally)
if (failed)//只有try的代碼塊出現異常,failed才會是true。什麼情景會產生異常?cancelAcquire分析時有說明
cancelAcquire(node); //執行取消邏輯
}
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
private void setHead(Node node) {
head = node;
node.thread = null;//再次表明head的thread屬性是空的
node.prev = null;
}
acquireQueue()總結:
- 功能:節點進入AQS隊列後,先park阻塞等待,直到被unPark喚醒,或者中斷喚醒
- 找到非取消狀態的前驅(取消狀態的將會被移出隊列並GC),如果前驅是SIGNAL,那麼當前節點進入park阻塞,否則,先將前驅改為SIGNAL,再進入park阻塞。
- 被unPark喚醒後,判斷前驅是頭節點且獲取到資源(tryAcquire成功),當前節點晉升為頭節點。自此,線程獲取到鎖
- 調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt函數,表明只有當該節點的前驅節點的狀態為SIGNAL時,才可以對該節點所封裝的線程進行park操作。
4.5.6 shouldParkAfterFailedAcquire()方法
// 當獲取(資源)失敗後:1、判斷能否將當前線程park;2、修改前驅節點狀態為signal
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 狀態為SIGNAL
// 只有當前驅節點為 signal時,才返回true ,表示當前線程可以安全地park阻塞;其它情況返回false
return true;
//跳過那些CANCELLED狀態的前驅
if (ws > 0) { // 表示狀態為CANCELLED,為1
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 找到pred節點前面最近的一個狀態不為CANCELLED的節點;然後跳出迴圈並返回false
pred.next = node;
} else { // 為PROPAGATE -3 或者是0 ,(為CONDITION -2時,表示此節點在condition queue中)
// cas更新前驅的狀態為SIGNAL.如果前驅是頭節點,那麼頭節點ws=SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能進行park操作
return false;
}
//CAS waitStatus field of a node.
private static final boolean compareAndSetWaitStatus(Node node,int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
shouldParkAfterFailedAcquire()總結:
- 如果前驅狀態是:SIGNAL,返回true。表示當前節點可以安全地unPark()阻塞
- 遇到取消的前驅節點,則跳過。這些被取消的節點會從隊列中移除並GC
- 如果前驅狀態不是:SIGNAL,將前驅狀態改為:SIGNAL,返回false,回到1 繼續
4.5.7 parkAndCheckInterrupt()方法
// 進行park操作並且返回該線程的中斷標識
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //外面的for迴圈可能會導致多次park,不過沒關係,park允許多次執行
//被喚醒之後,返回中斷標記,即如果是正常喚醒則返回false,如果是由於中斷醒來,就返回true
return Thread.interrupted(); // acquireQueued() 中聲明的interrupted 將會被更新為這裡的返回結果
}
public static boolean interrupted() {
return currentThread().isInterrupted(true);//返回當前線程interrupted中斷標記,同時會清除此interrupted標記
}
方法總結:
- 執行park操作(前提:前驅狀態是SIGNAL),在隊列中阻塞等待。
- 被unPark()喚醒後,返回線程的interrupted中斷標識,並且清除interrupted標記
4.5.8 cancelAcquire()方法
什麼時候才會執行cancelAcquire?
在lockInterruptibly()會通過拋出中斷異常來執行cancelAcquire方法,lock方法過程中則不會執行該代碼,作者這麼些的意圖在於for迴圈內部如果出現不可控的因素導致產生未知的異常,則會執行cancelAcquire,很明顯這屬於一種相對偏保守的保險代碼。
// 取消獲取鎖
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null) // node為空,返回
return;
node.thread = null;// thread置空 備註1
// Skip cancelled predecessors
Node pred = node.prev;// pred表示:最靠近node並且狀態不等於取消的前驅節點
while (pred.waitStatus > 0)
node.prev = pred = pred.prev; //更新pred,往列頭推進
Node predNext = pred.next; //predNext表示:pred的後繼
// 設置node節點的狀態為CANCELLED
node.waitStatus = Node.CANCELLED; //備註2
if (node == tail && compareAndSetTail(node, pred)) { // 若node節點為尾節點,則pred成為尾節點 備註3
// pred的next域置為null
compareAndSetNext(pred, predNext, null);
} else { // 2、node節點不為尾節點,或者比較設置不成功
int ws;
//下麵一串判斷,最終目標:在node移除隊列前,將有效的前驅節點狀態改為signal
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// pred節點不為頭節點,並且
//pred節點的狀態為SIGNAL)或者
// pred節點狀態小於等於0,並且比較並設置等待狀態為SIGNAL成功,並且pred節點所封裝的線程不為空
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 後繼不為空並且後繼的狀態小於等於0
compareAndSetNext(pred, predNext, next); // 比較並設置pred.next = next; 到這裡:node的前驅節點指向node的後繼節點。 備註4
} else {
// 這裡,pred==head (3、即node是head的後繼)或者pred.status=0,-2時 【前面while (pred.waitStatus > 0) 已經限制了pred一定是<=0】,執行:
unparkSuccessor(node); // 喚醒node的後繼
}
node.next = node; // help GC 後繼節點指向自身 備註5
}
}
//修改參數node的next域
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
對cancelAcquire()總結之前,先明確以下兩點:
- 基於對acquire()方法的分析,調用鏈:addWait()->enq()->acquireQueue()->cancelAcquire(node),進入到cancelAcquire()時,節點node一定已經在隊列中,而且它不會是head,並且沒有持有鎖。
- AQS通過管理這些屬性:waitStatus、thread、prev、next、head、tail、nextWaiter ,成為一個虛擬的列隊。
cancelAcquire(node)總結:
cancelAcquire()負責將node移出隊列,並保持隊列中其他節點的順序關係不變,它做了以下工作:
- waitStatus更新為cancel (備註2)
- thread更新為null(備註1)
- tail:如果node是尾節點,更新tail引用 (備註3)
- head:不需要更新(node不會是head)
- prev:沒有更新
- next: node前置的next域更新指向node後繼,並且node的next指向了自身 (備註4、備註5)
- nextWaiter:不需要更新(跟condition有關,這裡不涉及)
執行cancelAcquire後,隊列變成這樣的:
發現:
- node沒有移出隊列,因為被後繼的prev所引用。
- node.next變了,指向了自身,這就能解釋為什麼unparkSuccessor()是從後往前遍歷:因為取消節點的next域指向了自身,所以不能從通過next來遍歷,但prev是完整的,所以通過prev來遍歷。
- 取消節點,暫存在隊列中,當後繼節點被喚醒,執行shouldParkAfterFailedAcquire後,取消節點的引用鏈清空,移出隊列,最後GC回收。
4.5.9 unparkSuccessor()方法
// 喚醒node節點的後繼
private void unparkSuccessor(Node node) {
// 獲取node節點的等待狀態
int ws = node.waitStatus;
if (ws < 0) // 狀態值小於0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
// cas節點狀態為0
compareAndSetWaitStatus(node, ws, 0);//如果head沒有後繼的情況下,狀態會一直=0
Node s = node.next;
//若後繼為空,或後繼已取消,則從尾部往前遍歷 找到最靠近的一個處於正常阻塞狀態的節點進行喚醒
// 什麼時候s==null ? node的後繼節點是取消狀態時,node.next為null
if (s == null || s.waitStatus > 0) {
s = null;
// 由尾節點向前倒著遍歷隊列,但不會超過node節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒s節點線程
}
unparkSuccessor()總結:
- 作用:找到有效的後繼節點unPark喚醒
- 尋找有效後繼時從尾往前倒著遍歷:因為取消節點的next域指向了自身,所以不能從通過next來遍歷
- 將發起unPark喚醒的節點(只能是head)狀態改為0(意味著在head喚醒後繼,到被後繼推出隊列的期間,狀態變為0)
4.5.10 release()方法
以獨占模式釋放對象,其源碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) { //如果釋放鎖成功
Node h = head;
// 線程A調用acquire()獲取到鎖之後,A線程節點變為head,然後A調用release 釋放鎖,存在兩種情況:
// 1、 如果有新的線程B入隊,B成為後繼節點,B會將A狀態改為SIGNAL,那麼(h != null && h.waitStatus != 0 )成立,unparkSuccessor()喚醒後繼節點
// 2、如果A後面沒有節點,A狀態是預設值:0 ,那麼h.waitStatus != 0 不成立,直接返回true,不需要喚醒後繼節點。
if (h != null && h.waitStatus != 0) // 頭節點不為空並且頭節點狀態不為0
unparkSuccessor(h); //由head喚醒後繼節點
return true;
}
return false;
}
release()總結:
- 功能:釋放獨占鎖
- 先調用tryRelease()由子類實現釋放鎖
- 如果釋放鎖成功,然後unPark喚醒後繼節點(沒有後繼就不需要喚醒)
4.5.11 acquireSharedInterruptibly()方法
//獲取共用資源,響應中斷
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //讀取線程中斷標記,然後擦除標記
throw new InterruptedException(); //中斷標記為true,拋出中斷異常,停止執行
if (tryAcquireShared(arg) < 0) //調用子類實現方法 獲取資源
doAcquireSharedInterruptibly(arg); //沒有獲取到,那麼再嘗試獲取(進入隊列排隊等待)
}
獲取共用資源流程圖:
acquireSharedInterruptibly()總結:
- 共用模式獲取對象,響應中斷並終止獲取
- 先調用子類實現獲取資源,沒有獲取到再加隊隊列等待。
4.5.12 doAcquireSharedInterruptibly()方法
//獲取共用資源,響應中斷
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //增加等待節點
boolean failed = true;
try {
for (;;) {//無限迴圈,直到r>0
final Node p = node.predecessor(); // p表示 剛插入節點的前驅
//1、如果前驅是head
if (p == head) {
int r = tryAcquireShared(arg);//調用子類實現方法 嘗試獲取共用資源
if (r >= 0) { // >0 表示 獲取到資源
// 1、如果是ReentrantReadWriteLock、CountDownLatch ,有可能r=1
// 2、如果是Semaphore,有可能r=0
// 1、2 都調用setHeadAndPropagate進行共用傳播判斷
setHeadAndPropagate(node, r);// 更新head併進行共用傳播
p.next = null; // 將隊列頭節點的next域置空,之後,這個節點將被GC回收
failed = false;
return;
}
}
// 2、前驅不是head
//線程park阻塞,直至被unPark喚醒,或者被其它線程中斷喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); //進入這裡表示線程中斷標記為true,那麼拋出中斷異常
}
} finally {
if (failed) //當try 代碼塊有異常:中斷異常 或 其他未知異常,failed才是true
cancelAcquire(node);//取消獲取資源
}
}
doAcquireSharedInterruptibly()總結:
- 創建節點並插入aqs隊列,將前驅狀態改為signal,park阻塞,等待unPark喚醒。
- 正常喚醒後,無限迴圈直到前驅是head並且調用子類方法獲取共用資源成功,調用setHeadAndPropagate()成為head併進行共用傳播。
- 被中斷喚醒、或者迴圈等待過程發生中斷異常,執行cancelAcquire()取消獲取資源
4.5.13 setHeadAndPropagate()方法
setHeadAndPropagate在獲取共用資源的時候被調用
// 設置 同步隊列的head節點,以及觸發"傳播"操作
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 記錄更新前的head
setHead(node); //參數node 成為新的head
//判斷:
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; //獲取node後繼
//後繼為空或者後繼是等待共用資源的節點
if (s == null || s.isShared())
doReleaseShared(); //釋放共用資源
}
}
滿足調用doReleaseShared的條件分析:
-
propagate > 0:
ReentrantReadWriteLock、CountDownLatch 調用tryAcquireShared()返回1進入,滿足條件;Semaphore 進入,propagate可能等於0,不滿足,繼續2 -
h == null:
h == null 表示舊head變為null,程式沒有地方設置head=null,並且這裡h引用著head意味著head不會被GC。 因此,h == null不滿足條件,繼續3 【不知道哪種情況下h==null todo】 -
h.waitStatus < 0
- h.waitStatus==1:取消,不能由取消節點喚醒後繼,不滿足條件
setHeadAndPropagate()總結:
- 方法功能:設置 同步隊列的head節點,以及觸發"傳播"操作:
- 如果head的後繼是共用類型節點或者為null,調用doReleaseShared()來喚醒後繼
4.5.14 doReleaseShared()方法
//遍歷同步隊列,調整節點狀態,喚醒待申請節點
private void doReleaseShared() {
for (;;) {
Node h = head;
//1、head 不等於 tail 且不等於 null
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //如果head狀態為signal ,cas修改為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //喚醒後繼
}
//如果節點的後繼還沒有將其前驅改為signal,這裡ws==0是成立的
else if (ws == 0 && //如果head狀態為0,cas修改為propagate
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 如果在int ws = h.waitStatus; 之後,後繼將head節點改為signal,那麼cas失敗,continue繼續迴圈後, if (ws == Node.SIGNAL) 滿足,那麼將會喚醒後繼。
}
// 只有head沒有發生變化,迴圈才會結束,若head改變,繼續迴圈
if (h == head) // loop if head changed
break;
}
}
doReleaseShared()總結:
- 如果頭節點狀態為signal,那麼CAS更新頭節點狀態為0,成功則調用unparkSuccessor()喚醒後繼,失敗則重試
- 如果頭節點狀態為0,那麼將CAS更新頭節點狀態為PROPAGTATE ,失敗則重試。
- 最後如果判斷head是否發生變化,有變化則重覆1、2,沒有變化則方法結束。
- PROPAGTATE狀態的意義是,增加一個狀態判斷,當前驅獲取資源,後繼同時也有機會獲取到資源
4.5.15 releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared()方法總結:
- 調用子類的實現方法tryReleaseShared()釋放n個共用資源,釋放成功則繼續調用doReleaseShared()來喚醒隊列中的等待節點
5 取消節點移出鏈表分析
有兩種情景,會將取消節點徹底移出鏈表:
- 頭節點unPark喚醒後繼時,後繼節點喚醒後重新進入shouldParkAfterFailedAcquire()
- 取消節點後面有新節點入列時,新節點執行shouldParkAfterFailedAcquire()
以第一個情景為例子分析:
6 在shared模式中為什麼需要PROPAGATE狀態
結論:在前驅節點獲取資源時,後繼也能夠有機會申請資源,不需要等待前驅通過releaseShare()來喚醒。
分析如下:
1:A B 先後進入隊列
2:A被喚醒,獲得資源,調用setHeadAndPropagate(),晉升為head
3、B調用shouldParkAfterFailedAcquire(),嘗試將A狀態改為signal但未執行
4、A進入doReleaseShared(),A狀態等於0(3還沒執行),進入ws == 0 分支處理。
5、此時3執行完成,B將A的狀態改為signal,然後B park阻塞
6、A執行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)失敗,continue繼續
7、A進入(ws == Node.SIGNAL)分支,執行compareAndSetWaitStatus(h, Node.SIGNAL, 0)成功,然後再執行unparkSuccessor(),將B喚醒。
8、A將B喚醒後,A去執行拿到資源後的操作,B也成功拿到資源並執行。
因為步驟6的continue,B不需要等待A執行releaseShare()被喚醒,在A獲取到資源時同時B也能快速獲取到資源,A、B可以同時執行獲得資源後的任務