談到java中的併發,我們就避不開線程之間的同步和協作問題,談到線程同步和協作我們就不能不談談jdk中提供的AbstractQueuedSynchronizer(翻譯過來就是抽象的隊列同步器)機制; (一)、AQS中的state和Node含義: AQS中提供了一個int volatile state ...
談到java中的併發,我們就避不開線程之間的同步和協作問題,談到線程同步和協作我們就不能不談談jdk中提供的AbstractQueuedSynchronizer(翻譯過來就是抽象的隊列同步器)機制;
(一)、AQS中的state和Node含義:
AQS中提供了一個int volatile state狀態的變數用來標識共用資源,AQS定義了兩種資源的占用方式:
1、獨占模式(EXCLUSIVE):表示同一個資源,在同一時刻只能被一個線程持有,例如ReentrantLock等; 2、共用模式(SHARED):表示同一個資源,在同一時刻可以被多個線程同時持有,例如Semaphore,CountDownLatch等;同時也提供了一個LCH隊列,用來存放獲取共用資源時候發生阻塞的Node節點,這個節點是對需要獲取資源線程的一個封裝,包含了線程本身和Node節點的狀態waitStatus,一共分為五種:
/**表示當前節點中線程已經被取消調度,當timeout或者interrupt(假如會響應中斷的話)會觸發節點變更為此狀態,此節點的狀態再不會發生變化*/
static final int CANCELLED = 1;
/**表示當前節點中線程釋放資源後需要喚醒後繼節點線程,在採用尾插法將新結點加入到同步隊列的時候,會將新結點的前繼節點設置為SIGNAL */
static final int SIGNAL = -1;
/**表示當前節點中的線程在等待一個Condition喚醒,在其他線程中調用了這個Condition的signal()會將此Node從等待隊列的隊頭轉移到同步隊列的隊尾,嘗試競爭共用資源 */
static final int CONDITION = -2;
/**共用模式下,當前節點中的線程不僅需要喚醒後繼節點,還需要喚醒後繼節點的後繼節點*/
static final int PROPAGATE = -3;
除了上面這四種還有一個0,表示節點初始狀態,可以看出waitStatus<0才代表該節點是一個有效節點(即結點中的線程可以正常調度)。
AQS的設計其實是採用了模版方法的設計思想,在AbstractQueuedSynchronizer中這個頂層類中只提供了一些公共的方法實現如:同步隊列的維護等,而共用資源的獲取和釋放只提供了方法的定義,並不提供具體的實現(只是拋出了 unsupportedOperationException異常),通過這種方式就達到讓自定義的隊列同步器去強制實現的目的。
上面我們提到,AQS定義了資源的兩種占用方式:獨占和共用,主要也就對應tryAcquire()-tryRelease(),tryAcquireShared()-tryReleaseShared()兩組方法需要我們自己去實現了:
1、tryAcquire()獨占模式,嘗試獲取資源,成功為true,失敗為false;
2、tryRelease()獨占模式,嘗試釋放資源,成功為true,失敗為false;
3、tryAcquireShared()共用模式,嘗試獲取資源,負數為失敗,等於0為成功獲取,沒有剩餘資源,大於0為成功獲取,有剩餘共用資源;
4、tryReleaseShared()共用模式,嘗試釋放資源,釋放之後需要喚醒等待節點為true,否則為false;
(二)、代碼剖析:
1、acquire(int)方法:
此方法是獲取共用資源的入口方法,代碼如下:
public final void acquire(int arg) { /*嘗試獲取共用資源,嘗試成功直接返回*/ if (!tryAcquire(arg) /* 1、搶占共用資源失敗,則將當前節點放入到同步隊列的尾部,並標記為獨占模式; * 2、使線程阻塞在同步隊列中獲取資源,直到獲取成功才返回;如果整個過程中被中斷過就返回true,否則就返回false;*/ && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { /*阻塞獲取資源的過程中是不響應線程中斷的,內部進行了中斷檢測,檢測到了中斷請求,所以這塊進行線程中斷*/ selfInterrupt(); } }
上面代碼的大概執行流程是:
1、加塞搶占共用資源(因為同步隊列中可能還有其它節點等待),獲取成功則直接返回;
2、當前線程搶占共用資源失敗,調用addWaiter()將當前線程包裝成Node節點,加入同步隊列的尾部,並將當前節點標記為獨占模式(EXCLUSIVE),並返回這個節點;
3、當前線程調用acquireQueued()方法阻塞在同步隊列上獲取共用資源,該方法是一個同步方法,直到成功獲取了共用資源才會返回,在這個阻塞獲取資源的過程中,如果檢測到發生了線程的中斷會返回true,否則會返回false;
4、在第3步中阻塞獲取資源的過程中也不會響應中斷,所以在上個獲取共用過程中,檢測到了線程中斷標記會在acquireQueued()方法返回為true時候,再調用selfInterrupt()中斷一次線程;
2、addWaiter(Node mode)方法:
private Node addWaiter(Node mode) { /*以給定的模式將當前線程包裝成node節點*/ Node node = new Node(Thread.currentThread(), mode); /*快速採用尾插法,將當前節點插入到同步隊列的隊尾*/ Node predNode = tail; if (predNode != null) { /*preNode <-- node*/ node.prev = predNode; /*採用CAS將node設置阻塞隊列的尾節點,設置成功,說明沒有併發*/ if (compareAndSetTail(tail, node)) { predNode.next = node; /*尾插法插入成功,則直接返回當前節點*/ return node; } } /*"自旋"將節點加入到隊列的尾部,直到成功為止*/ enq(node); return node; }
此方法是當前線程首次搶占共用資源不成功,將當前線程以指定的模式(獨占或者共用)包裝成Node插入到同步隊列的尾節點的方法,其執行邏輯也在代碼中詳細註釋了,再概括下:
1、將當前線程包裝為一個新的Node節點,並標記為獨占模式;
2、如果當前同步隊列的尾節點不為空(表示當前隊列不為空),採用尾插法,將新的Node節點插入到同步隊列的尾部,考慮到多線程併發的安全問題採用了CAS方式設置同步隊列的尾節點,設置成功則就直接返回這個新結點;
3、如果快速插入不成功(可能有兩種情況:1、tail為空,即當前同步隊列為空;2、tail不為空,但是在使用CAS設置尾節點的時候出現了線程併發安全問題),則就調用enq(Node),採用“自旋”,直到
將新結點成功插入到同步隊列的尾部;
3、enq(Node) 方法:
private Node enq(final Node node) { /*"自旋",將給定的節點插入到同步隊列的尾部*/ for (;;) { Node t = tail; /*同步隊列為空,則創建一個thread為null的Node節點作為同步隊列的頭結點,並且將尾節點也設置為頭結點*/ if (t == null) { /*CAS操作,設置同步隊列的頭結點*/ if (compareAndSetHead(new Node())) { /*將尾節點設置為頭結點,進入下次"自旋"*/ tail = head; } }else { /*尾部節點不為空,則進行正常添加動作*/ node.prev = t; /*CAS操作,設置同步隊列的頭結點*/ if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
此方法採用了“自旋”操作,並結合CAS操作,在保證多線程併發線程安全的前提下,一定可以安全地將這個新結點插入到同步隊列的尾部。
此段代碼的執行流程是:
1、判斷這個同步隊列是否為空(判斷tail為空即可),為空則創建一個空節點(不包含任何線程),並向尾節點也指向頭結點,
2、然後進行下一次的自旋嘗試CAS操作,將方法傳入的Node節點嘗試加入到隊列的尾部,也通過一定次數的自旋操作,一定會加入到同步隊列的尾部,然後退出;
3、如果一開始進入這個方法,隊列不為空,則就執行第2步驟不斷嘗試,直到成功;
到此為止addWaiter()方法中的邏輯就分析完了,執行完畢之後返回,就行調用acquireQueued(Node)方法阻塞此線程,等待獲取資源了。
acquireQueued(Node,int)方法:
final boolean acquireQueued(final Node node, int arg) { /*標記阻塞獲取資源的過程中是否發生了異常*/ boolean failed = true; try { /*標記線程阻塞的過程中是否發生了中斷*/ boolean interrupted = false; /*線程自旋阻塞*/ for(;;){
/*獲取當前節點的前驅結點*/ final Node p = node.predecessor(); /*1、前驅結點是頭結點,則說明當前線程有資格獲取共用資源,嘗試獲取,獲取成功,將當前節點設置為頭結點*/ if (p == head && tryAcquire(arg)) { /*將當前節點設置為頭結點*/ setHead(node); p.next = null; //help GC failed = false;
/*返回線程在阻塞的過程中是否接受到了中斷請求*/ return interrupted; } /*2、3當前節點的前驅結點不是頭結點,判斷當前線程是否可以掛起*/ if (shouldParkAfterFailedAcquire(p, node) /*4、當前線程可以掛起,則掛起線程,並且線程被unpark()或者interrupt()喚醒,檢查線程的狀態*/ && parkAndCheckInterrupt()) { interrupted = true; } } }finally{
if (failed) { /*5、阻塞獲取同步資源的時候,發生了異常,將當前Node節點從同步隊列中出隊*/ cancelAcquire(node); } }
acquireQueued(Node)這個方法的目的就是使得當前線程阻塞,等待獲取資源,獲取成功之後才返回。那麼如何阻塞呢?看了上面的代碼,讀到這裡我想大家心裡都有了答案:要麼自旋,要麼主動park(),此方法中採用了這兩種方式結合的方法,其大致的執行流程如下:
1、自旋,首先當前節點的前繼節點是頭結點(走到這裡來了,說明前繼節點正在占用共用資源,有可能在這個過程中正好釋放了),那麼我們當前線程就有資格去嘗試獲取共用資源,如果獲取共用資源成功,則結束自旋阻塞;
2、如果前繼節點不是頭結點,或者爭搶共用資源失敗,那麼我們調用shouldParkAfterFailedAcquire(Node,Node)方法,判斷是否可以將此線程暫時掛起(不能無限制地自旋,會造成CPU占用率飆升,安全的做法是自旋找到一個合適的點,將當前線程park()阻塞掛起);
3、當前掛起之前,要向前尋找能將它喚醒的前繼節點,待前繼節點釋放資源之後,unpark()喚醒當前被阻塞的節點;
4、線程已經到達安全點,可以調用parkAndCheckInterrupt()阻塞並等待喚醒,喚醒之後再檢查下,阻塞的這個過程中是否發生了線程中斷請求,由於這個阻塞過程是不響應線程中斷的,所以需要將這個中斷請求的狀態傳播出去;
5、阻塞獲取同步資源的時候,發生了異常,取消當前線程的在同步隊列中的排隊;
shouldParkAfterFailedAcquire(Node, Node )方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; /*判斷前驅結點的狀態,只有前驅結點的狀態為SIGNAL,後繼節點才能被喚醒,所以其可以安心地掛起來了*/ if (ws == node.waitStatus) { return true; } /*ws>0表示前驅結點中的線程已經被取消調度了,則認為其是無效節點,繼續向前查找,直至找到有效狀態的節點*/ if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; }else { /*前驅結點狀態正常,將前驅結點狀態設置為SIGNAL,則前驅結點釋放資源的時候,就可以嘗試喚醒當前這個後繼節點了*/ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這個方法是在自旋中用來判斷是否可以將線程安全地park(),阻塞自旋的,那麼何時才能將當前線程安全地掛起呢?回想下,我們之前提到的節點的五種狀態中有一種SIGNAL,表示當前節點的線程釋放資源,可以喚醒後繼節點,所以我們就線上程掛起之前找到它的有效的前繼結點,將它的waitStatus狀態設置為SIGNAL,就可以保證前繼節點釋放資源之後,當前節點中的線程就可以被及時地喚醒,結束阻塞了,當前線程掛起之前的準備工作都做完了,那麼接下來就需要調用parkAndCheckInterrupt()方法,進行線程的掛起了。
parkAndCheckInterrupt()方法:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
執行到這裡了,說明線程可以阻塞,調用park()方法阻塞線程,等待其他線程中unpark()或者interrupt()喚醒次線程,喚醒之後執行Thread.interrupted()方法,檢測阻塞過程中的線程請求中斷,進入下次自旋,嘗試獲取共用資源。如果在阻塞獲取資源的過程中,發生了異常,failed = true,則執行finally中cancelAcquire(Node)方法,取消當前節點中線程的調度。
上面說了分析了這麼多,只說了線程間的獲取資源時候的同步問題,那麼線程間的協作在哪裡體現呢?答案就是在acquireQueued(Node,int)方法中的finally塊中,線程在阻塞獲取共用資源的時候發生了異常,就會執行此方法將此節點從同步隊列中出隊,下麵我們來分析下cancelAcquire(Node)方法:
cancelAcquire(Node)方法:
1 private void cancelAcquire(Node node) { 2 //當前節點為空,則說明當前線程永遠不會被調度到了,所以直接返回 3 if (node == null) { 4 return; 5 } 6 7 /** 8 * 接下來將點前Node節點從同步隊列出隊,主要做以下幾件事: 9 * 1、將當前節點不與任何線程綁定,設置當前節點為Node.CANCELLED狀態; 10 * 2、將當前取消節點的前置非取消節點和後置非取消節點"鏈接"起來; 11 * 3、如果前置節點釋放了鎖,那麼當前取消節點承擔起後續節點的喚醒職責。 12 */ 13 14 //1、取消當前節點與線程的綁定 15 node.thread = null; 16 17 //2、找到當前節點的有效前繼節點pred 18 Node pred = node.prev; 19 while (pred.waitStatus > 0) { 20 //為什麼雙向鏈表從後往前遍歷呢?而不是從前往後遍歷呢? 21 node.prev = pred = pred.prev; 22 } 23 //用作CAS操作時候的條件判斷需要使用的值 24 Node predNext = pred.next; 25 26 //3、將當前節點設置為取消狀態 27 node.waitStatus = Node.CANCELLED; 28 29 /** 30 * 接下來就需要將當前取消節點的前後兩個有效節點"鏈接"起來了,"達成讓當前node節點出隊的目的"。 31 * 這裡按照node節點在同步隊列中的不同位置分了三種情況: 32 * 1、node節點是同步隊列的尾節點tail; 33 * 2、node節點既不是同步隊列頭結點head的後繼節點,也不是尾節點tail; 34 * 3、node節點是同步隊列頭結點head的後繼節點; 35 */ 36 37 //1、node是尾節點,並且執行過程中沒有併發,直接將pred設置為同步隊列的tail 38 if (node == tail && compareAndSetTail(node, pred)) { 39 /* 40 * 此時pred已經設置為同步隊列的tail,需要通過CAS操作,將pred的next指向null,沒有節點再引用node,就完成了node節點的出隊42 */ 43 compareAndSetNext(pred, predNext, null); 44 }else { 45 /* 46 * 2、node不是尾節點,也不是頭結點head的後繼節點,那麼當前節點node出隊以後,node的有效前繼結點pred, 47 * 就有義務在它自身釋放資源的時候,喚醒node的有效後繼節點successor,即將pred的狀態設置為Node.SIGNAL; 48 */ 49 int ws; 50 //能執行到這裡,說明當前node節點不是head的後繼節點,也不是同步隊列tail節點 51 if (pred != head && 52 ((ws = pred.waitStatus) == Node.SIGNAL || 53 //前繼節點狀態雖然有效但不是SIGNAL,採用CAS操作設置為SIGNAL確保後繼有效節點可以被喚醒 54 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 55 pred.thread != null) { 56 Node next = node.next; 57 //只負責喚醒有效後繼節點 58 if (next != null && next.waitStatus <= 0) { 59 /** 60 * 下麵這段代碼相當於將pred-->next,我們提到這個同步隊列是個雙向隊列,那麼pred<--next這是誰執行的呢? 61 * 答案是其他線程:其它線程在後序的獲取共用資源在同步隊列中阻塞的時候,調用shouldParkAfterFailedAcquire()方法, 62 * 從後向前遍歷隊列,尋找能喚醒它的有效前繼節點,當找到node的時候,因為它的狀態已經是Node.CANCELLED,所以會忽略node節點, 63 * 直到遍歷到有效前繼節點pred,將next.prev執行pred,即next--->pred,沒有節點再引用node節點,所以node節點至此才完成出隊。 64 */ 65 compareAndSetNext(pred, predNext, next); 66 } 67 }else { 68 //3、說明node節點是同步隊列head的後繼節點,調用unparkSuccessor(Node)喚醒其他線程,達到讓當前node"出隊"。 69 unparkSuccessor(node); 70 } 71 72 node.next = node;//help GC 73 } 74 }
這個方法的中的註釋寫的很詳細了,線程間的協作主要體現在第65行的代碼中,原因也在註釋中寫明瞭,就不再贅述了。
還有一個非常關鍵的問題就是:為什麼我們在遍歷同步隊列的時候是從尾部向前遍歷,而不是從頭部向尾部遍歷呢?我們可以回過頭去看看入隊時候的enq(Node)方法:
關鍵在11行-14行這部分代碼中,11行保證了多線程環境下,採用自旋可以將當前線程順利地加入到同步隊列的尾部。
假如有線程A執行了滿足了if條件,成功將線程A放入了tail節點,還未執行到12行,t.next = null,此時發生了線程切換執行B線程,B線程也執行了此方法,並且執行完畢,尾插法會將B線程節點追加到A線程節點之後,這時候又有個C線程執行了遍歷操作,假設從隊頭向隊尾遍歷,遍歷到A節點時,那麼可能會出現t.next = null這種情況,停止遍歷,漏掉B線程節點的情況,而採用從同步隊列的尾部向頭部遍歷則可以避免這個問題,其最根本的原因在於:node.prev = t;
先於CAS執行,也就是說,你在將當前節點置為尾部之前,就已經把前驅節點賦值了,自然不會出現node.prev==null的情況。
下個問題就是unparkSuccessor(node)方法的原理是什麼呢?
unparkSuccessor(node)方法:
1 private void unparkSuccessor(Node node) { 2 /* 3 * If status is negative (i.e., possibly needing signal) try 4 * to clear in anticipation of signalling. It is OK if this 5 * fails or if status is changed by waiting thread. 6 */ 7 //在這裡,這個節點其實是同步隊列的頭結點,頭結點喚醒後繼節點之後,使命就完成了,所以應該將其狀態置為0 8 int ws = node.waitStatus; 9 if (ws < 0) 10 compareAndSetWaitStatus(node, ws, 0); 11 12 /* 13 * Thread to unpark is held in successor, which is normally 14 * just the next node. But if cancelled or apparently null, 15 * traverse backwards from tail to find the actual 16 * non-cancelled successor. 17 */ 18 Node s = node.next; 19 //因為s.next相當於從同步隊列的頭部遍歷所以可能會出現s == null的情況,上面分析過原因,不再贅述了。 20 if (s == null || s.waitStatus > 0) { 21 s = null; 22 //從同步隊列的尾部向前遍歷,找到當前node節點(頭結點)的最近的有效後繼節點 23 for (Node t = tail; t != null && t != node; t = t.prev) 24 if (t.waitStatus <= 0) 25 s = t; 26 } 27 28 /** 29 * 找到最近的有效後繼節點,則喚醒後繼節點中的線程在parkAndCheckInterrupt()方法上的阻塞,去嘗試競爭共用資源, 30 * 這就體現了線程之間的協作,而在這個競爭的過程中也會忽略這個Node.CANCELLED狀態的節點,這當前node節點也就放棄了競爭共用資源的機會,相當於出隊了。 31 */ 32 if (s != null) 33 LockSupport.unpark(s.thread); 34 }
以上就是對unparksuccessor(Node)方法的簡單分析了。再回過頭來,我們在cancelAcquire(Node)將當前要取消Node按照位置關係分為了三種,為什麼我們會忽略head位置呢?
從setHead()的實現以及所有調用的地方可以看出,head指向的節點必定是拿到鎖(或是競爭資源)的節點,而head的後繼節點則是有資格爭奪鎖的節點,我們不在需要甚至喚醒條件了。再後續的節點,就是阻塞著的了。head指向的節點,曾經關聯的線程必定已經獲取到資源,在執行了,所以head無需再關聯到該線程了。head所指向的節點,也無需再參與任何的競爭操作了。現在再來看node出隊時的分類,就好理解了。head既然不會參與任何資源競爭了,自然也就和cancelAquire()無關了。
仔細分析這個acquire()方法流程非常複雜,找了個一張網上一個博主畫的流程圖非常棒,這裡借鑒一下:
好了文章就寫到這裡了,鑒於水平有限,有說的不對的地方歡迎大家批評指正。
參考文章地址:
1、https://blog.csdn.net/weixin_38106322/article/details/107121149
2、https://www.jianshu.com/p/01f2046aab64
3、https://blog.csdn.net/foxException/article/details/108917338
本文來自博客園,作者:一隻烤鴨朝北走,僅用於技術學習,所有資源都來源於網路,部分是轉發,部分是個人總結。歡迎共同學習和轉載,轉載請在醒目位置標明原文。如有侵權,請留言告知,及時撤除。轉載請註明原文鏈接:https://www.cnblogs.com/wha6239/p/17122948.html