通過前面三篇的分析,我們深入瞭解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步隊列和條件隊列。我們還是拿公共廁所做比喻,同步隊列是主要的排隊區,如果公 ...
通過前面三篇的分析,我們深入瞭解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步隊列和條件隊列。我們還是拿公共廁所做比喻,同步隊列是主要的排隊區,如果公共廁所沒開放,所有想要進入廁所的人都得在這裡排隊。而條件隊列主要是為條件等待設置的,我們想象一下如果一個人通過排隊終於成功獲取鎖進入了廁所,但在方便之前發現自己沒帶手紙,碰到這種情況雖然很無奈,但是它也必須接受這個事實,這時它只好乖乖的出去先準備好手紙(進入條件隊列等待),當然在出去之前還得把鎖給釋放了好讓其他人能夠進來,在準備好了手紙(條件滿足)之後它又得重新回到同步隊列中去排隊。當然進入房間的人並不都是因為沒帶手紙,可能還有其他一些原因必須中斷操作先去條件隊列中去排隊,所以條件隊列可以有多個,依不同的等待條件而設置不同的條件隊列。條件隊列是一條單向鏈表,Condition介面定義了條件隊列中的所有操作,AbstractQueuedSynchronizer內部的ConditionObject類實現了Condition介面,下麵我們看看Condition介面都定義了哪些操作。
1 public interface Condition { 2 3 //響應線程中斷的條件等待 4 void await() throws InterruptedException; 5 6 //不響應線程中斷的條件等待 7 void awaitUninterruptibly(); 8 9 //設置相對時間的條件等待(不進行自旋) 10 long awaitNanos(long nanosTimeout) throws InterruptedException; 11 12 //設置相對時間的條件等待(進行自旋) 13 boolean await(long time, TimeUnit unit) throws InterruptedException; 14 15 //設置絕對時間的條件等待 16 boolean awaitUntil(Date deadline) throws InterruptedException; 17 18 //喚醒條件隊列中的頭結點 19 void signal(); 20 21 //喚醒條件隊列的所有結點 22 void signalAll(); 23 24 }
Condition介面雖然定義了這麼多方法,但總共就分為兩類,以await開頭的是線程進入條件隊列等待的方法,以signal開頭的是將條件隊列中的線程“喚醒”的方法。這裡要註意的是,調用signal方法可能喚醒線程也可能不會喚醒線程,什麼時候會喚醒線程這得看情況,後面會講到,但是調用signal方法一定會將線程從條件隊列中移到同步隊列尾部。這裡為了敘述方便,我們先暫時不糾結這麼多,統一稱signal方法為喚醒條件隊列線程的操作。大家註意看一下,await方法分為5種,分別是響應線程中斷等待,不響應線程中斷等待,設置相對時間不自旋等待,設置相對時間自旋等待,設置絕對時間等待;signal方法只有2種,分別是只喚醒條件隊列頭結點和喚醒條件隊列所有結點的操作。同一類的方法基本上是相通的,由於篇幅所限,我們不可能也不需要將這些方法全部仔細的講到,只需要將一個代表方法搞懂了再看其他方法就能夠觸類旁通。所以在本文中我只會細講await方法和signal方法,其他方法不細講但會貼出源碼來以供大家參考。
1. 響應線程中斷的條件等待
1 //響應線程中斷的條件等待 2 public final void await() throws InterruptedException { 3 //如果線程被中斷則拋出異常 4 if (Thread.interrupted()) { 5 throw new InterruptedException(); 6 } 7 //將當前線程添加到條件隊列尾部 8 Node node = addConditionWaiter(); 9 //在進入條件等待之前先完全釋放鎖 10 int savedState = fullyRelease(node); 11 int interruptMode = 0; 12 //線程一直在while迴圈里進行條件等待 13 while (!isOnSyncQueue(node)) { 14 //進行條件等待的線程都在這裡被掛起, 線程被喚醒的情況有以下幾種: 15 //1.同步隊列的前繼結點已取消 16 //2.設置同步隊列的前繼結點的狀態為SIGNAL失敗 17 //3.前繼結點釋放鎖後喚醒當前結點 18 //4.由於LockSupport.park(this)能夠響應中斷, 所以線程中斷後也會醒來 19 LockSupport.park(this); 20 //當前線程醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件隊列 21 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { 22 break; 23 } 24 } 25 //線程醒來後就會以獨占模式獲取鎖 26 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { 27 interruptMode = REINTERRUPT; 28 } 29 //這步操作主要為防止線程在signal之前中斷而導致沒與條件隊列斷絕聯繫 30 if (node.nextWaiter != null) { 31 unlinkCancelledWaiters(); 32 } 33 //根據中斷模式進行響應的中斷處理 34 if (interruptMode != 0) { 35 reportInterruptAfterWait(interruptMode); 36 } 37 }
await方法是經常使用到的方法,調用該方法會將當前線程阻塞到條件隊列中,直到等待的條件滿足後才會被喚醒,在調用signal方法喚醒前,這整個過程是可以響應線程中斷的,如果線程收到中斷請求就會立刻結束while迴圈。接下來我們詳細的分析整個的流程。
第一步:添加結點到條件隊列
1 //添加結點到條件隊列尾部 2 private Node addConditionWaiter() { 3 //獲取條件隊列的尾結點的引用 4 Node t = lastWaiter; 5 //檢查條件隊列中是否存在已取消的結點 6 if (t != null && t.waitStatus != Node.CONDITION) { 7 //清除條件隊列中所有取消條件等待的結點 8 unlinkCancelledWaiters(); 9 t = lastWaiter; 10 } 11 //將當前線程包裝成結點, 等待狀態設置為CONDITION 12 Node node = new Node(Thread.currentThread(), Node.CONDITION); 13 //將新的結點添加到條件隊列尾部 14 if (t == null) { 15 firstWaiter = node; 16 }else { 17 t.nextWaiter = node; 18 } 19 //更新尾結點引用 20 lastWaiter = node; 21 return node; 22 } 23 24 //清除條件隊列中所有取消條件等待的結點 25 private void unlinkCancelledWaiters() { 26 Node t = firstWaiter; 27 Node trail = null; 28 //從前向後遍歷條件隊列 29 while (t != null) { 30 Node next = t.nextWaiter; 31 //如果等待狀態不是CONDITION 32 if (t.waitStatus != Node.CONDITION) { 33 //1.將當前結點的後繼結點引用置空 34 t.nextWaiter = null; 35 if (trail == null) { 36 //2.前繼結點的後繼結點指向當前結點的後繼結點 37 firstWaiter = next; 38 }else { 39 //2.前繼結點的後繼結點指向當前結點的後繼結點 40 trail.nextWaiter = next; 41 } 42 //這裡表明已經到達了尾結點 43 if (next == null) { 44 //更新尾結點引用 45 lastWaiter = trail; 46 } 47 }else { 48 //如果狀態為CONDITION就更新追蹤結點的引用 49 trail = t; 50 } 51 //指向下一個結點 52 t = next; 53 } 54 }
當線程調用await方法的時候,首先會將當前線程包裝成node結點放入條件隊列尾部。在addConditionWaiter方法中,如果發現條件隊列尾結點已取消就會調用unlinkCancelledWaiters方法將條件隊列所有的已取消結點清空。這步操作是插入結點的準備工作,那麼確保了尾結點的狀態也是CONDITION之後,就會新建一個node結點將當前線程包裝起來然後放入條件隊列尾部。註意,這個過程只是將結點添加到同步隊列尾部而沒有掛起線程哦。
第二步:完全將鎖釋放
1 //完全釋放鎖 2 final int fullyRelease(Node node) { 3 boolean failed = true; 4 try { 5 //獲取當前的同步狀態 6 int savedState = getState(); 7 //使用當前的同步狀態去釋放鎖 8 if (release(savedState)) { 9 failed = false; 10 //如果釋放鎖成功就返回當前同步狀態 11 return savedState; 12 } else { 13 //如果釋放鎖失敗就拋出運行時異常 14 throw new IllegalMonitorStateException(); 15 } 16 } finally { 17 //保證沒有成功釋放鎖就將該結點設置為取消狀態 18 if (failed) { 19 node.waitStatus = Node.CANCELLED; 20 } 21 } 22 }
將當前線程包裝成結點添加到條件隊列尾部後,緊接著就調用fullyRelease方法釋放鎖。註意,方法名為fullyRelease也就這步操作會完全的釋放鎖,因為鎖是可重入的,所以在進行條件等待前需要將鎖全部釋放了,不然的話別人就獲取不了鎖了。如果釋放鎖失敗的話就會拋出一個運行時異常,如果成功釋放了鎖的話就返回之前的同步狀態。
第三步:進行條件等待
1 //線程一直在while迴圈里進行條件等待 2 while (!isOnSyncQueue(node)) { 3 //進行條件等待的線程都在這裡被掛起, 線程被喚醒的情況有以下幾種: 4 //1.同步隊列的前繼結點已取消 5 //2.設置同步隊列的前繼結點的狀態為SIGNAL失敗 6 //3.前繼結點釋放鎖後喚醒當前結點 7 //4.由於LockSupport.park(this)能夠響應中斷, 所以線程中斷後也會醒來 8 LockSupport.park(this); 9 //當前線程醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件隊列 10 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { 11 break; 12 } 13 } 14 15 //檢查條件等待時的線程中斷情況 16 private int checkInterruptWhileWaiting(Node node) { 17 //中斷請求在signal操作之前:THROW_IE 18 //中斷請求在signal操作之後:REINTERRUPT 19 //期間沒有收到任何中斷請求:0 20 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; 21 } 22 23 //將取消條件等待的結點從條件隊列轉移到同步隊列中 24 final boolean transferAfterCancelledWait(Node node) { 25 //如果這步CAS操作成功的話就表明中斷發生在signal方法之前 26 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 27 //狀態修改成功後就將該結點放入同步隊列尾部 28 enq(node); 29 return true; 30 } 31 //到這裡表明CAS操作失敗, 說明中斷發生在signal方法之後 32 while (!isOnSyncQueue(node)) { 33 //如果sinal方法還沒有將結點轉移到同步隊列, 就通過自旋等待一下 34 Thread.yield(); 35 } 36 return false; 37 }
在以上兩個操作完成了之後就會進入while迴圈,可以看到while迴圈裡面首先調用LockSupport.park(this)將線程掛起了,所以線程就會一直在這裡阻塞。在調用signal方法後僅僅只是將結點從條件隊列轉移到同步隊列中去,至於會不會喚醒線程需要看情況。如果轉移結點時發現同步隊列中的前繼結點已取消,或者是更新前繼結點的狀態為SIGNAL失敗,這兩種情況都會立即喚醒線程,否則的話在signal方法結束時就不會去喚醒已在同步隊列中的線程,而是等到它的前繼結點來喚醒。當然,線程阻塞在這裡除了可以調用signal方法喚醒之外,線程還可以響應中斷,如果線程在這裡收到中斷請求就會繼續往下執行。可以看到線程醒來後會馬上檢查是否是由於中斷喚醒的還是通過signal方法喚醒的,如果是因為中斷喚醒的同樣會將這個結點轉移到同步隊列中去,只不過是通過調用transferAfterCancelledWait方法來實現的。最後執行完這一步之後就會返回中斷情況並跳出while迴圈。
第四步:結點移出條件隊列後的操作
1 //線程醒來後就會以獨占模式獲取鎖 2 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { 3 interruptMode = REINTERRUPT; 4 } 5 //這步操作主要為防止線程在signal之前中斷而導致沒與條件隊列斷絕聯繫 6 if (node.nextWaiter != null) { 7 unlinkCancelledWaiters(); 8 } 9 //根據中斷模式進行響應的中斷處理 10 if (interruptMode != 0) { 11 reportInterruptAfterWait(interruptMode); 12 } 13 14 //結束條件等待後根據中斷情況做出相應處理 15 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { 16 //如果中斷模式是THROW_IE就拋出異常 17 if (interruptMode == THROW_IE) { 18 throw new InterruptedException(); 19 //如果中斷模式是REINTERRUPT就自己掛起 20 } else if (interruptMode == REINTERRUPT) { 21 selfInterrupt(); 22 } 23 }
當線程終止了while迴圈也就是條件等待後,就會回到同步隊列中。不管是因為調用signal方法回去的還是因為線程中斷導致的,結點最終都會在同步隊列中。這時就會調用acquireQueued方法執行在同步隊列中獲取鎖的操作,這個方法我們在獨占模式這一篇已經詳細的講過。也就是說,結點從條件隊列出來後又是乖乖的走獨占模式下獲取鎖的那一套,等這個結點再次獲得鎖之後,就會調用reportInterruptAfterWait方法來根據這期間的中斷情況做出相應的響應。如果中斷發生在signal方法之前,interruptMode就為THROW_IE,再次獲得鎖後就拋出異常;如果中斷發生在signal方法之後,interruptMode就為REINTERRUPT,再次獲得鎖後就重新中斷。
2.不響應線程中斷的條件等待
1 //不響應線程中斷的條件等待 2 public final void awaitUninterruptibly() { 3 //將當前線程添加到條件隊列尾部 4 Node node = addConditionWaiter(); 5 //完全釋放鎖並返回當前同步狀態 6 int savedState = fullyRelease(node); 7 boolean interrupted = false; 8 //結點一直在while迴圈里進行條件等待 9 while (!isOnSyncQueue(node)) { 10 //條件隊列中所有的線程都在這裡被掛起 11 LockSupport.park(this); 12 //線程醒來發現中斷並不會馬上去響應 13 if (Thread.interrupted()) { 14 interrupted = true; 15 } 16 } 17 if (acquireQueued(node, savedState) || interrupted) { 18 //在這裡響應所有中斷請求, 滿足以下兩個條件之一就會將自己掛起 19 //1.線程在條件等待時收到中斷請求 20 //2.線程在acquireQueued方法里收到中斷請求 21 selfInterrupt(); 22 } 23 }
3.設置相對時間的條件等待(不進行自旋)
1 //設置定時條件等待(相對時間), 不進行自旋等待 2 public final long awaitNanos(long nanosTimeout) throws InterruptedException { 3 //如果線程被中斷則拋出異常 4 if (Thread.interrupted()) { 5 throw new InterruptedException(); 6 } 7 //將當前線程添加到條件隊列尾部 8 Node node = addConditionWaiter(); 9 //在進入條件等待之前先完全釋放鎖 10 int savedState = fullyRelease(node); 11 long lastTime = System.nanoTime(); 12 int interruptMode = 0; 13 while (!isOnSyncQueue(node)) { 14 //判斷超時時間是否用完了 15 if (nanosTimeout <= 0L) { 16 //如果已超時就需要執行取消條件等待操作 17 transferAfterCancelledWait(node); 18 break; 19 } 20 //將當前線程掛起一段時間, 線程在這期間可能被喚醒, 也可能自己醒來 21 LockSupport.parkNanos(this, nanosTimeout); 22 //線程醒來後先檢查中斷信息 23 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { 24 break; 25 } 26 long now = System.nanoTime(); 27 //超時時間每次減去條件等待的時間 28 nanosTimeout -= now - lastTime; 29 lastTime = now; 30 } 31 //線程醒來後就會以獨占模式獲取鎖 32 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { 33 interruptMode = REINTERRUPT; 34 } 35 //由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍 36 if (node.nextWaiter != null) { 37 unlinkCancelledWaiters(); 38 } 39 //根據中斷模式進行響應的中斷處理 40 if (interruptMode != 0) { 41 reportInterruptAfterWait(interruptMode); 42 } 43 //返回剩餘時間 44 return nanosTimeout - (System.nanoTime() - lastTime); 45 }
4.設置相對時間的條件等待(進行自旋)
1 //設置定時條件等待(相對時間), 進行自旋等待 2 public final boolean await(long time, TimeUnit unit) throws InterruptedException { 3 if (unit == null) { throw new NullPointerException(); } 4 //獲取超時時間的毫秒數 5 long nanosTimeout = unit.toNanos(time); 6 //如果線程被中斷則拋出異常 7 if (Thread.interrupted()) { throw new InterruptedException(); } 8 //將當前線程添加條件隊列尾部 9 Node node = addConditionWaiter(); 10 //在進入條件等待之前先完全釋放鎖 11 int savedState = fullyRelease(node); 12 //獲取當前時間的毫秒數 13 long lastTime = System.nanoTime(); 14 boolean timedout = false; 15 int interruptMode = 0; 16 while (!isOnSyncQueue(node)) { 17 //如果超時就需要執行取消條件等待操作 18 if (nanosTimeout <= 0L) { 19 timedout = transferAfterCancelledWait(node); 20 break; 21 } 22 //如果超時時間大於自旋時間, 就將線程掛起一段時間 23 if (nanosTimeout >= spinForTimeoutThreshold) { 24 LockSupport.parkNanos(this, nanosTimeout); 25 } 26 //線程醒來後先檢查中斷信息 27 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { 28 break; 29 } 30 long now = System.nanoTime(); 31 //超時時間每次減去條件等待的時間 32 nanosTimeout -= now - lastTime; 33 lastTime = now; 34 } 35 //線程醒來後就會以獨占模式獲取鎖 36 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { 37 interruptMode = REINTERRUPT; 38 } 39 //由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍 40 if (node.nextWaiter != null) { 41 unlinkCancelledWaiters(); 42 } 43 //根據中斷模式進行響應的中斷處理 44 if (interruptMode != 0) { 45 reportInterruptAfterWait(interruptMode); 46 } 47 //返回是否超時標誌 48 return !timedout; 49 }
5.設置絕對時間的條件等待
1 //設置定時條件等待(絕對時間) 2 public final boolean awaitUntil(Date deadline) throws InterruptedException { 3 if (deadline == null) { throw new NullPointerException(); } 4 //獲取絕對時間的毫秒數 5 long abstime = deadline.getTime(); 6 //如果線程被中斷則拋出異常 7 if (Thread.interrupted()) { throw new InterruptedException(); } 8 //將當前線程添加到條件隊列尾部 9 Node node = addConditionWaiter(); 10 //在進入條件等待之前先完全釋放鎖 11 int savedState = fullyRelease(node); 12 boolean timedout = false; 13 int interruptMode = 0; 14 while (!isOnSyncQueue(node)) { 15 //如果超時就需要執行取消條件等待操作 16 if (System.currentTimeMillis() > abstime) { 17 timedout = transferAfterCancelledWait(node); 18 break; 19 } 20 //將線程掛起一段時間, 期間線程可能被喚醒, 也可能到了點自己醒來 21 LockSupport.parkUntil(this, abstime); 22 //線程醒來後先檢查中斷信息 23 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { 24 break; 25 } 26 } 27 //線程醒來後就會以獨占模式獲取鎖 28 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { 29 interruptMode = REINTERRUPT; 30 } 31 //由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍 32 if (node.nextWaiter != null) { 33 unlinkCancelledWaiters(); 34 } 35 //根據中斷模式進行響應的中斷處理 36 if (interruptMode != 0) { 37 reportInterruptAfterWait(interruptMode); 38 } 39 //返回是否超時標誌 40 return !timedout; 41 }
6.喚醒條件隊列中的頭結點
1 //喚醒條件隊列中的下一個結點 2 public final void signal() { 3 //判斷當前線程是否持有鎖 4 if (!isHeldExclusively()) { 5 throw new IllegalMonitorStateException(); 6 } 7 Node first = firstWaiter; 8 //如果條件隊列中有排隊者 9 if (first != null) { 10 //喚醒條件隊列中的頭結點 11 doSignal(first); 12 } 13 } 14 15 //喚醒條件隊列中的頭結點 16 private void doSignal(Node first) { 17 do { 18 //1.將firstWaiter引用向後移動一位 19 if ( (firstWaiter = first.nextWaiter) == null) { 20 lastWaiter = null; 21 } 22 //2.將頭結點的後繼結點引用置空 23 first.nextWaiter = null; 24 //3.將頭結點轉移到同步隊列, 轉移完成後有可能喚醒線程 25 //4.如果transferForSignal操作失敗就去喚醒下一個結點 26 } while (!transferForSignal(first) && (first = firstWaiter) != null); 27 } 28 29 //將指定結點從條件隊列轉移到同步隊列中 30 final boolean transferForSignal(Node node) { 31 //將等待狀態從CONDITION設置為0 32 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 33 //如果更新狀態的操作失敗就直接返回false 34 //可能是transferAfterCancelledWait方法先將狀態改變了, 導致這步CAS操作失敗 35 return false; 36 } 37 //將該結點添加到同步隊列尾部 38 Node p = enq(node); 39 int ws = p.waitStatus; 40 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) { 41 //出現以下情況就會喚醒當前線程 42 //1.前繼結點是取消狀態 43 //2.更新前繼結點的狀態為SIGNAL操作失敗 44 LockSupport.unpark(node.thread); 45 } 46 return true; 47 }
可以看到signal方法最終的核心就是去調用transferForSignal方法,在transferForSignal方法中首先會用CAS操作將結點的狀態從CONDITION設置為0,然後再調用enq方法將該結點添加到同步隊列尾部。我們再看到接下來的if判斷語句,這個判斷語句主要是用來判斷什麼時候會去喚醒線程,出現這兩種情況就會立即喚醒線程,一種是當發現前繼結點的狀態是取消狀態時,還有一種是更新前繼結點的狀態失敗時。這兩種情況都會馬上去喚醒線程,否則的話就僅僅只是將結點從條件隊列中轉移到同步隊列中就完了,而不會立馬去喚醒結點中的線程。signalAll方法也大致類似,只不過它是去迴圈遍歷條件隊列中的所有結點,並將它們轉移到同步隊列,轉移結點的方法也還是調用transferForSignal方法。
7.喚醒條件隊列的所有結點
1 //喚醒條件隊列後面的全部結點 2 public final void signalAll() { 3 //判斷當