在上一篇《Java併發系列[1] AbstractQueuedSynchronizer源碼分析之概要分析》中我們介紹了AbstractQueuedSynchronizer基本的一些概念,主要講了AQS的排隊區是怎樣實現的,什麼是獨占模式和共用模式以及如何理解結點的等待狀態。理解並掌握這些內容是後續閱 ...
在上一篇《Java併發系列[1]----AbstractQueuedSynchronizer源碼分析之概要分析》中我們介紹了AbstractQueuedSynchronizer基本的一些概念,主要講了AQS的排隊區是怎樣實現的,什麼是獨占模式和共用模式以及如何理解結點的等待狀態。理解並掌握這些內容是後續閱讀AQS源碼的關鍵,所以建議讀者先看完我的上一篇文章再回過頭來看這篇就比較容易理解。在本篇中會介紹在獨占模式下結點是怎樣進入同步隊列排隊的,以及離開同步隊列之前會進行哪些操作。AQS為在獨占模式和共用模式下獲取鎖分別提供三種獲取方式:不響應線程中斷獲取,響應線程中斷獲取,設置超時時間獲取。這三種方式整體步驟大致是相同的,只有少部分不同的地方,所以理解了一種方式再看其他方式的實現都是大同小異。在本篇中我會著重講不響應線程中斷的獲取方式,其他兩種方式也會順帶講一下不一致的地方。
1. 怎樣以不響應線程中斷獲取鎖?
1 //不響應中斷方式獲取(獨占模式) 2 public final void acquire(int arg) { 3 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 4 selfInterrupt(); 5 } 6 }
上面代碼中雖然看起來簡單,但是它按照順序執行了下圖所示的4個步驟。下麵我們會逐個步驟進行演示分析。
第一步:!tryAcquire(arg)
1 //嘗試去獲取鎖(獨占模式) 2 protected boolean tryAcquire(int arg) { 3 throw new UnsupportedOperationException(); 4 }
這時候來了一個人,他首先嘗試著去敲了敲門,如果發現門沒鎖(tryAcquire(arg)=true),那就直接進去了。如果發現門鎖了(tryAcquire(arg)=false),就執行下一步。這個tryAcquire方法決定了什麼時候鎖是開著的,什麼時候鎖是關閉的。這個方法必須要讓子類去覆蓋,重寫裡面的判斷邏輯。
第二步:addWaiter(Node.EXCLUSIVE)
1 //將當前線程包裝成結點並添加到同步隊列尾部 2 private Node addWaiter(Node mode) { 3 //指定持有鎖的模式 4 Node node = new Node(Thread.currentThread(), mode); 5 //獲取同步隊列尾結點引用 6 Node pred = tail; 7 //如果尾結點不為空, 表明同步隊列已存在結點 8 if (pred != null) { 9 //1.指向當前尾結點 10 node.prev = pred; 11 //2.設置當前結點為尾結點 12 if (compareAndSetTail(pred, node)) { 13 //3.將舊的尾結點的後繼指向新的尾結點 14 pred.next = node; 15 return node; 16 } 17 } 18 //否則表明同步隊列還沒有進行初始化 19 enq(node); 20 return node; 21 } 22 23 //結點入隊操作 24 private Node enq(final Node node) { 25 for (;;) { 26 //獲取同步隊列尾結點引用 27 Node t = tail; 28 //如果尾結點為空說明同步隊列還沒有初始化 29 if (t == null) { 30 //初始化同步隊列 31 if (compareAndSetHead(new Node())) { 32 tail = head; 33 } 34 } else { 35 //1.指向當前尾結點 36 node.prev = t; 37 //2.設置當前結點為尾結點 38 if (compareAndSetTail(t, node)) { 39 //3.將舊的尾結點的後繼指向新的尾結點 40 t.next = node; 41 return t; 42 } 43 } 44 } 45 }
執行到這一步表明第一次獲取鎖失敗,那麼這個人就給自己領了塊號碼牌進入排隊區去排隊了,在領號碼牌的時候會聲明自己想要以什麼樣的方式來占用房間(獨占模式or共用模式)。註意,這時候他並沒有坐下來休息(將自己掛起)哦。
第三步:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
1 //以不可中斷方式獲取鎖(獨占模式) 2 final boolean acquireQueued(final Node node, int arg) { 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 //獲取給定結點的前繼結點的引用 8 final Node p = node.predecessor(); 9 //如果當前結點是同步隊列的第一個結點, 就嘗試去獲取鎖 10 if (p == head && tryAcquire(arg)) { 11 //將給定結點設置為head結點 12 setHead(node); 13 //為了幫助垃圾收集, 將上一個head結點的後繼清空 14 p.next = null; 15 //設置獲取成功狀態 16 failed = false; 17 //返回中斷的狀態, 整個迴圈執行到這裡才是出口 18 return interrupted; 19 } 20 //否則說明鎖的狀態還是不可獲取, 這時判斷是否可以掛起當前線程 21 //如果判斷結果為真則掛起當前線程, 否則繼續迴圈, 在這期間線程不響應中斷 22 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { 23 interrupted = true; 24 } 25 } 26 } finally { 27 //在最後確保如果獲取失敗就取消獲取 28 if (failed) { 29 cancelAcquire(node); 30 } 31 } 32 } 33 34 //判斷是否可以將當前結點掛起 35 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 36 //獲取前繼結點的等待狀態 37 int ws = pred.waitStatus; 38 //如果前繼結點狀態為SIGNAL, 表明前繼結點會喚醒當前結點, 所以當前結點可以安心的掛起了 39 if (ws == Node.SIGNAL) { 40 return true; 41 } 42 43 if (ws > 0) { 44 //下麵的操作是清理同步隊列中所有已取消的前繼結點 45 do { 46 node.prev = pred = pred.prev; 47 } while (pred.waitStatus > 0); 48 pred.next = node; 49 } else { 50 //到這裡表示前繼結點狀態不是SIGNAL, 很可能還是等於0, 這樣的話前繼結點就不會去喚醒當前結點了 51 //所以當前結點必須要確保前繼結點的狀態為SIGNAL才能安心的掛起自己 52 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 53 } 54 return false; 55 } 56 57 //掛起當前線程 58 private final boolean parkAndCheckInterrupt() { 59 LockSupport.park(this); 60 return Thread.interrupted(); 61 }
領完號碼牌進入排隊區後就會立馬執行這個方法,當一個結點首次進入排隊區後有兩種情況,一種是發現他前面的那個人已經離開座位進入房間了,那他就不坐下來休息了,會再次去敲一敲門看看那小子有沒有完事。如果裡面的人剛好完事出來了,都不用他叫自己就直接衝進去了。否則,就要考慮坐下來休息一會兒了,但是他還是不放心,如果他坐下來睡著後沒人提醒他怎麼辦?他就在前面那人的座位上留一個小紙條,好讓從裡面出來的人看到紙條後能夠喚醒他。還有一種情況是,當他進入排隊區後發現前面還有好幾個人在座位上排隊呢,那他就可以安心的坐下來咪一會兒了,但在此之前他還是會在前面那人(此時已經睡著了)的座位上留一個紙條,好讓這個人在走之前能夠去喚醒自己。當一切事情辦妥了之後,他就安安心心的睡覺了,註意,我們看到整個for迴圈就只有一個出口,那就是等線程成功的獲取到鎖之後才能出去,在沒有獲取到鎖之前就一直是掛在for迴圈的parkAndCheckInterrupt()方法裡頭。線程被喚醒後也是從這個地方繼續執行for迴圈。
第四步:selfInterrupt()
1 //當前線程將自己中斷 2 private static void selfInterrupt() { 3 Thread.currentThread().interrupt(); 4 }
由於上面整個線程一直是掛在for迴圈的parkAndCheckInterrupt()方法裡頭,沒有成功獲取到鎖之前不響應任何形式的線程中斷,只有當線程成功獲取到鎖並從for迴圈出來後,他才會查看在這期間是否有人要求中斷線程,如果是的話再去調用selfInterrupt()方法將自己掛起。
2. 怎樣以響應線程中斷獲取鎖?
1 //以可中斷模式獲取鎖(獨占模式) 2 private void doAcquireInterruptibly(int arg) throws InterruptedException { 3 //將當前線程包裝成結點添加到同步隊列中 4 final Node node = addWaiter(Node.EXCLUSIVE); 5 boolean failed = true; 6 try { 7 for (;;) { 8 //獲取當前結點的前繼結點 9 final Node p = node.predecessor(); 10 //如果p是head結點, 那麼當前線程就再次嘗試獲取鎖 11 if (p == head && tryAcquire(arg)) { 12 setHead(node); 13 p.next = null; // help GC 14 failed = false; 15 //獲取鎖成功後返回 16 return; 17 } 18 //如果滿足條件就掛起當前線程, 此時響應中斷並拋出異常 19 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { 20 //線程被喚醒後如果發現中斷請求就拋出異常 21 throw new InterruptedException(); 22 } 23 } 24 } finally { 25 if (failed) { 26 cancelAcquire(node); 27 } 28 } 29 }
響應線程中斷方式和不響應線程中斷方式獲取鎖流程上大致上是相同的。唯一的一點區別就是線程從parkAndCheckInterrupt方法中醒來後會檢查線程是否中斷,如果是的話就拋出InterruptedException異常,而不響應線程中斷獲取鎖是在收到中斷請求後只是設置一下中斷狀態,並不會立馬結束當前獲取鎖的方法,一直到結點成功獲取到鎖之後才會根據中斷狀態決定是否將自己掛起。
3. 怎樣設置超時時間獲取鎖?
1 //以限定超時時間獲取鎖(獨占模式) 2 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { 3 //獲取系統當前時間 4 long lastTime = System.nanoTime(); 5 //將當前線程包裝成結點添加到同步隊列中 6 final Node node = addWaiter(Node.EXCLUSIVE); 7 boolean failed = true; 8 try { 9 for (;;) { 10 //獲取當前結點的前繼結點 11 final Node p = node.predecessor(); 12 //如果前繼是head結點, 那麼當前線程就再次嘗試獲取鎖 13 if (p == head && tryAcquire(arg)) { 14 //更新head結點 15 setHead(node); 16 p.next = null; 17 failed = false; 18 return true; 19 } 20 //超時時間用完了就直接退出迴圈 21 if (nanosTimeout <= 0) { 22 return false; 23 } 24 //如果超時時間大於自旋時間, 那麼等判斷可以掛起線程之後就會將線程掛起一段時間 25 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) { 26 //將當前線程掛起一段時間, 之後再自己醒來 27 LockSupport.parkNanos(this, nanosTimeout); 28 } 29 //獲取系統當前時間 30 long now = System.nanoTime(); 31 //超時時間每次都減去獲取鎖的時間間隔 32 nanosTimeout -= now - lastTime; 33 //再次更新lastTime 34 lastTime = now; 35 //在獲取鎖的期間收到中斷請求就拋出異常 36 if (Thread.interrupted()) { 37 throw new InterruptedException(); 38 } 39 } 40 } finally { 41 if (failed) { 42 cancelAcquire(node); 43 } 44 } 45 }
設置超時時間獲取首先會去獲取一下鎖,第一次獲取鎖失敗後會根據情況,如果傳入的超時時間大於自旋時間那麼就會將線程掛起一段時間,否則的話就會進行自旋,每次獲取鎖之後都會將超時時間減去獲取一次鎖所用的時間。一直到超時時間小於0也就說明超時時間用完了,那麼這時就會結束獲取鎖的操作然後返回獲取失敗標誌。註意在以超時時間獲取鎖的過程中是可以響應線程中斷請求的。
4. 線程釋放鎖並離開同步隊列是怎樣進行的?
1 //釋放鎖的操作(獨占模式) 2 public final boolean release(int arg) { 3 //撥動密碼鎖, 看看是否能夠開鎖 4 if (tryRelease(arg)) { 5 //獲取head結點 6 Node h = head; 7 //如果head結點不為空並且等待狀態不等於0就去喚醒後繼結點 8 if (h != null && h.waitStatus != 0) { 9 //喚醒後繼結點 10 unparkSuccessor(h); 11 } 12 return true; 13 } 14 return false; 15 } 16 17 //喚醒後繼結點 18 private void unparkSuccessor(Node node) { 19 //獲取給定結點的等待狀態 20 int ws = node.waitStatus; 21 //將等待狀態更新為0 22 if (ws < 0) { 23 compareAndSetWaitStatus(node, ws, 0); 24 } 25 //獲取給定結點的後繼結點 26 Node s = node.next; 27 //後繼結點為空或者等待狀態為取消狀態 28 if (s == null || s.waitStatus > 0) { 29 s = null; 30 //從後向前遍歷隊列找到第一個不是取消狀態的結點 31 for (Node t = tail; t != null && t != node; t = t.prev) { 32 if (t.waitStatus <= 0) { 33 s = t; 34 } 35 } 36 } 37 //喚醒給定結點後面首個不是取消狀態的結點 38 if (s != null) { 39 LockSupport.unpark(s.thread); 40 } 41 }
線程持有鎖進入房間後就會去辦自己的事情,等事情辦完後它就會釋放鎖並離開房間。通過tryRelease方法可以撥動密碼鎖進行解鎖,我們知道tryRelease方法是需要讓子類去覆蓋的,不同的子類實現的規則不一樣,也就是說不同的子類設置的密碼不一樣。像在ReentrantLock當中,房間裡面的人每調用tryRelease方法一次,state就減1,直到state減到0的時候密碼鎖就開了。大家想想這個過程像不像我們在不停的轉動密碼鎖的轉輪,而每次轉動轉輪數字只是減少1。CountDownLatch和這個也有點類似,只不過它不是一個人在轉,而是多個人每人都去轉一下,集中大家的力量把鎖給開了。線程出了房間後它會找到自己原先的座位,也就是找到head結點。看看座位上有沒有人給它留了小紙條,如果有的話它就知道有人睡著了需要讓它幫忙喚醒,那麼它就會去喚醒那個線程。如果沒有的話就表明同步隊列中暫時還沒有人在等待,也沒有人需要它喚醒,所以它就可以安心的離去了。以上過程就是在獨占模式下釋放鎖的過程。
註:以上全部分析基於JDK1.7,不同版本間會有差異,讀者需要註意