一、簡介 JDK1.5之前都是通過synchronized關鍵字實現併發同步,而JDK1.5以後Doug Lea大師開發了current包下的類,通過JAVA代碼實現了synchronized關鍵的語義。然而在current包下的這些類的實現大部分都不離不開一個基礎組件 AQS(AbstractQu ...
一、簡介
JDK1.5之前都是通過synchronized關鍵字實現併發同步,而JDK1.5以後Doug Lea大師開發了current包下的類,通過JAVA代碼實現了synchronized關鍵字的語義。
然而在current包下的這些類的實現大部分都離不開一個基礎組件----AQS(AbstractQueuedSynchronizer)也就是同步隊列器。
AQS定義了一套多線程訪問共用資源的同步框架,比如ReentrantLock、CountDownLatch等都是依賴這個基礎組件實現的。深入瞭解AQS有助於對Lock機制實現的原理理解,對併發有更加深入的認知。
二、簡單使用示例
在使用AQS基礎組件前,先瞭解一下內部的基本介面。
tryAcquire(int arg):獨占式的獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然後通過CAS操作更改狀態。
tryRelease(int arg):釋放同步狀態,等待獲取同步狀態的線程將有機會獲取釋放的同步狀態。
tryAcquireShared(int arg):共用式的獲取同步狀態,返回大於0代表獲取成功,否則就是獲取失敗。
tryReleaseShared(int arg):共用式的釋放同步狀態。
isHeldExclusively():判斷當前的線程是否已經獲取到了同步狀態。
在AQS的源碼中,這些方法都是沒有實現的,都是通過子類自己去實現。這也是AQS的設計核心模版模式的設計方式。通過介面也可以發現,AQS主要提供的就是相關於獨占鎖的獲取釋放以及共用鎖的獲取釋放。
通過AQS自定義實現鎖的示例
1 class MyLock implements Lock{ 3 private static class Sync extends AbstractQueuedSynchronizer{ 5 //是否處於占用狀態,子類自定義方法 6 protected boolean isHeldExclusively(){ 7 //當狀態為1的時候代表為占用狀態 8 return getState() == 1; 9 } 10 //獲取鎖操作,子類自定義方法 11 public boolean tryAcquire(int acquire){
//通過CAS方式獲取更改同步狀態值 12 if(compareAndSetState(0,1)){ 13 setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程擁有 14 return true; 15 } 16 return false; 17 } 18 //釋放鎖操作,子類自定義方法 19 protected boolean tryRelease(int releases){ 20 if(getState() == 0){ 21 throw new IllegalMonitorStateException(); 22 } 23 setExclusiveOwnerThread(null);//當前獲取同步狀態線程為null 24 setState(0);//將狀態設置為0 25 return true; 26 }
實現AQS中定義的模版方法,通過CAS方法獲取同步狀態,如果記憶體中的同步狀態與預期的狀態值相同就用新值替換老值。當獲取到了鎖就將這個線程賦予這個鎖。而tryRelease釋放鎖就是將當前鎖線程賦予null,並將鎖同步狀態設置為0代表已經釋放了這個鎖。
上面的方法都是需要子類自己去實現的一些模版方法,而下麵的這些方法就是實現自定義同步組件時將會調用的AQS中的實現方法。
acquire(int arg):獨占式的獲取鎖,如果當前線程獲取同步狀態成功,則由該方法返回,否則將會進入同步隊列中等待
acquireInterrupted(int arg):響應中斷的獲取鎖,當前線程未獲取同步狀態而進入同步隊列中,如果當前線程被中斷,就會拋出InterruptedException並返回
tryAcquireNanos(int arg,long nanos):在響應獲取鎖的基礎上增加了超時獲取鎖的功能,如果在超時時間內獲取到了鎖就返回true,否則就返回false
acquireShared(int arg):共用式的獲取同步狀態,如果當前線程未獲取到同步狀態將會進入同步隊列等待,在同一時刻可以有多個線程可以獲取鎖狀態
acquireSharedInterrupted(int arg):響應中斷的共用獲取鎖
tryAcquiredSharedNanos(int arg,long nanos):超時獲取共用鎖
release(int arg):獨占式的獲取同步狀態,該方法會喚醒後繼節點
releaseShared(int arg):共用式的釋放同步狀態
1 private final Sync sync = new Sync();
2 public void lock(){
3 sync.acquire(1);//調用AQS中的acquire方法
4 }
5 public boolean tryLocK(){
6 return sync.tryAcquire(1);//調用AQS中的tryLock方法
7 }
8 public void unlock(){
9 sync.release();//調用AQS中的釋放鎖操作
10 }
怎麼樣評判是否已經獲取鎖是交由子類自己實現的,而鎖獲取以後的操作以及鎖未被獲取的操作都是AQS自己實現的。所以開發者關心的就是怎麼樣去評判線程是否獲取到了鎖。
三、源碼分析以及原理
AQS類結構
從圖中可以看出來,AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer,那麼AbstrcatOwnableSynchronizer是幹嘛的呢?這不是這篇文章研究的重點,當然這個類也很簡單就是提供了一些簡單的操作比如設置獨占鎖擁有線程等方法。Node是AbstractQueuedSynchronizer的一個內部類,有時候我會自己去考慮當眾多的線程去訪問一個鎖而獲取鎖的線程卻只有一個(針對獨占鎖而言)、當然對於共用式鎖也是可以有多個線程同時獲取鎖的。那麼肯定會有些線程無法獲取鎖,那麼它們存放在哪呢。Node就是它們的歸宿也就是第一個關鍵點同步隊列。那麼可以看一下Node類里實現了什麼,以及同步隊列又是怎麼組成的。
node類結構
同步隊列
同步隊列的頭節點也就是當前持有鎖的線程對象。waitStatus【volatile修飾、當前線程節點的狀態,volatile修飾是為了記憶體可見性】
SIGNAL 值為-1、後繼節點的線程處於等待的狀態、當前節點的線程如果釋放了同步狀態或者被取消、會通知後繼節點、後繼節點會獲取鎖並執行【當一個節點的狀態為SIGNAL時就意味著在等待獲取同步狀態,前節點是頭節點也就是獲取同步狀態的節點】
CANCELLED 值為1、因為超時或者中斷,結點會被設置為取消狀態,被取消狀態的結點不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態。處於這種狀態的結點會被踢出隊列,被GC回收【一旦一個節點進入這個狀態也就會從同步隊列中消失】
CONDITION 值為-2、節點在等待隊列中、節點線程等待在Condition、當其它線程對Condition調用了singal()方法該節點會從等待隊列中移到同步隊列中【從等待隊列到同步隊列】
PROPAGATE 值為-3、表示下一次共用式同步狀態獲取將會被無條件的被傳播下去【與共用模式相關代表線程可以執行】
initial 值為0、表示當前沒有線程獲取鎖【初始狀態】
瞭解了節點等待的狀態以及同步隊列的作用,AQS中還通過了一個volatile關鍵字修飾的status對象用來管理鎖的狀態並提供了getState()、setState()、compareAndSetStatus()三個方法改變status的狀態。知道了這些就可以開始真正看AQS是如何處理沒有獲取鎖的線程的。在真正瞭解底層實現AQS之前還要介紹一下獨占鎖和共用鎖:
獨占鎖:在同一個時刻只能有一個線程獲得同步狀態,一旦這個線程獲取同步狀態,其它線程就無法再獲取將會進入阻塞的狀態。
共用鎖:在同一個時刻可以存在多個線程獲取到同步狀態。
接下來就看看JDK底層又是怎麼去實現AQS的
acquire(int arg):獨占式的獲取鎖,此方法不響應中斷,在這過程中中斷,線程不會從同步隊列中移除
1 public final void acquire(int arg){
2 if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE))){
3 selfInterrupt();//如果這個過程中出現中斷,在整個過程結束後再自我中斷
4 }
5 }
acquire方法代碼很少,但是它做了很多事,首先前面介紹過tryAcquire()方法是子類實現的具體獲取鎖的方法,當鎖獲取到了就會立刻退出if條件也就代表獲取鎖具體的就是啥也不幹。那麼看鎖獲取失敗具體幹了啥呢。首先是addWaiter(Node.EXCLUSIVE)方法
addWaiter(Node mode):往同步隊列中添加元素
1 private Node addWaiter(Node mode){ 2 //通過當前線程和鎖模式創建了一個Node節點 3 Node node = new Node(Thread.currentThread(),mode); 4 //獲取尾節點 5 Node pred = tail; 6 if(pred != null){ 7 node.prev = pred;//新增的節點每次都是加在同步隊列的尾部 8 //通過CAS操作設置尾節點防止線程不安全 9 if(compareAndSetTail(pred,node)){ 10 pred.next = node; 11 return node; 12 } 13 } 14 enq(node);//防止CAS操作失敗,再次處理 15 return node; 16 }
addWaiter方法主要做的就是創建一個節點,如果通過CAS操作成功就直接將節點加入同步隊列的尾部,否則需要enq方法的幫忙再次進行處理。
enq(Node node):在addWaiter方法處理失敗的時候進一步進行處理
1 private Node enq(final Node node){
2 //死迴圈【發現很多的底層死迴圈都是這麼寫不知道是不是有什麼優化點】
3 for(;;){
4 Node t = tail;
5 if(t == null){//如果尾節點為null
6 if(compareAndSetHead(new Node())){//創建一個新的節點並添加到隊列中初始化
7 tail = head;
8 }else{
9 node.prev = t;
10 //還是通過CAS操作添加到尾部
11 if(compareAndSetTail(t,node)){
12 t.next = node;
13 return t;
14 }
15 }
16 }
17 }
18 }
enq方法就是通過死迴圈,不斷的通過CAS操作設置尾節點,直到添加成功才返回。比較狠,不到黃河不死心哈哈。
acquireQueued(final Node node,int arg):當線程獲取鎖失敗並加入同步隊列以後,就進入了一個自旋的狀態,如果獲取到了這個狀態就退出阻塞狀態否則就一直阻塞
1 final boolean acquireQueued(final Node node,int arg){
2 boolean failed = true;//用來判斷是否獲取了同步狀態
3 try{
4 boolean interrupted = false;//判斷自旋過程中是否被中斷過
5 for(;;){
6 final Node p = node.predecessor();//獲取前繼節點
7 if(p == head && tryAcquire(arg)){//如果當前的這個節點的前繼節點是頭節點就去嘗試獲取了同步狀態
8 setHead(node);//設為頭節點
9 p.next = null;
10 failed = false;//代表獲取了同步狀態
11 return interrupted;
12 }
13 //判斷自己是否已經阻塞了檢查這個過程中是否被中斷過
14 if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt() ){
15 interrupted = true;
16 }
17 }finally{
18 if(failed){
19 cancelAcquired(node);//取消加入同步隊列中
20 }
21 }
22 }
23 }
acquireQueued方法主要是讓線程通過自旋的方式去獲取同步狀態,當然也不是每個節點都有獲取的資格,因為是FIFO先進顯出隊列,acquireQueued方法保證了只有頭節點的後繼節點才有資格去獲取同步狀態,如果線程可以休息了就讓該線程休息然後記錄下這個過程中是否被中斷過,當線程獲取了同步狀態就會從這個同步隊列中移除這個節點。同時還會設置獲取同步狀態的線程為頭節點,在設置頭節點的過程中不需要任何的同步操作,因為獨占式鎖中能獲取同步狀態的必定是同一個線程。
設置頭節點操作
同步隊列中節點自旋操作
shouldParkAfterFailedAcquire(Node node,Node node):判斷一個線程是否阻塞
1 private static boolean shouldPArkAfterFailedAcquire(Node pred,Node node){
2 int ws = pred.waitStatus;//獲取節點的等待狀態
3 if(ws == Node.SIGNAL){//如果是SIGNAL就代表當頭節點釋放後,這個節點就會去嘗試獲取狀態
4 return true;//代表阻塞中
5 }
6 if(ws > 0){//代表前繼節點放棄了
7 do {
8 node.prev = pred = pred.prev;//迴圈不停的往前找知道找到節點的狀態是正常的
9 }while(pred.waitStatus > 0 );
10 pred.next = node;
11 }else{
12 compareAndSetWaitStatus(pred,ws,Node.SIGNAL);//通過CAS操作設置狀態為SIGNAL
13 }
14 return false;
15 }
整個流程中,如果前驅結點的狀態不是SIGNAL,那麼自己就不能安心去休息,也就是只有當前驅節點為SIGNAL時這個線程才可以進入等待狀態。
parkAndCheckInterrupt():前面的方法是判斷是否阻塞,而這個方法就是真正的執行阻塞的方法同時返回中斷狀態
1 private final boolean parkAndCheckInterupt(){
3 LockSupport.park(this);//阻塞當前線程
5 return Thread.interrupted();//返回中斷狀態
7 }
經過了上面的這麼多方法,再次回頭看acquire方法的時候。會發現其實整個流程也沒有想象中的那麼難以理解。acquire流程
首先通過子類判斷是否獲取了鎖,如果獲取了就什麼也不幹。
如果沒有獲取鎖、通過線程創建節點加入同步隊列的隊尾。
當線程在同步隊列中不斷的通過自旋去獲取同步狀態,如果獲取了鎖,就從同步隊列中移除。
如果在獲取同步狀態的過程中被中斷過最後自行調用interrupted方法進行中斷操作。
這裡可以看一下acquire也就是獨占式獲取鎖的整個流程
AQS之aquire獨占式獲取鎖流程
release(int arg):獨占式的釋放鎖
1 public final boolean release(int arg){
3 if(tryRelease(arg)){//子類自定義實現
4 Node h = head;
5 if(h != null && h.waitStatus != 0){
6 unparkSuccessor(h);//喚醒下一個節點
7 }
8 return true;
9 }
10 return false;
11 }
釋放鎖的流程很簡單,首先子類自定義的方法如果釋放了同步狀態,如果頭節點不為空並且頭節點的等待狀態不為0就喚醒其後繼節點。主要依賴的就是子類自定義實現的釋放操作。
unparkSuccessor(Node node):喚醒後繼節點獲取同步狀態
1 private void unparkSuccessor(Node node){
2 //獲取頭節點的狀態
3 int ws = node.waitStatus;
4 if(ws < 0){
5 compareAndSetWaitStatus(node,ws,0);//通過CAS將頭節點的狀態設置為初始狀態
6 }
7 Node s = node.next;//後繼節點
8 if(s == null || s.waitStatus >0){//不存在或者已經取消
9 s = null;
10 for(Node t = tail;t != null && t != node;t = t.prev){//從尾節點開始往前遍歷,尋找離頭節點最近的等待狀態正常的節點
11 if(t.waitStatus <= 0){
12 s = t;
13 }
14 }
15 }
16 if(s != null){
17 LockSupport.unpark(s.thread);//真正的喚醒操作
18 }
19 }
喚醒操作,通過判斷後繼節點是否存在,如果不存在就尋找等待時間最長的適合的節點將其喚醒喚醒操作通過LockSupport中的unpark方法喚醒底層也就是unsafe類的操作。
以上就是獨占式的獲取鎖以及釋放鎖的過程總結的來說:線程獲取鎖,如果獲取了鎖就啥也不幹,如果沒獲取就創造一個節點通過compareAndSetTail(CAS操作)操作的方式將創建的節點加入同步隊列的尾部,在同步隊列中的節點通過自旋的操作不斷去獲取同步狀態【當然由於FIFO先進先出的特性】等待時間越長就越先被喚醒。當頭節點釋放同步狀態的時候,首先查看是否存在後繼節點,如果存在就喚醒自己的後繼節點,如果不存在就獲取等待時間最長的符合條件的線程。
acquireShared(int arg):共用式的獲取鎖
1 public final void acquireShared(int arg){
2 //子類自定義實現的獲取狀態【也就是當返回為>=0的時候就代表獲取鎖】
3 if(tryAcquireShared(arg) < 0){
4 doAcquiredShared(arg);//具體的處理沒有獲取鎖的線程的方法
5 }
6 }
doAcquiredShared(int arg):處理未獲取同步狀態的線程
1 private void doAcquire(int arg){ 2 final Node node = addWaiter(Node.SHARED);//創建一個節點加入同步隊列尾部 3 boolean failed = true;//判斷獲取狀態 4 try{ 5 boolean interrupted = false;//是否被中斷過 6 for(;;){ 7 final Node p =node.predecessor();//獲取前驅節點 8 if(p == head){ 9 int r = tryAcquireShared(arg);//獲取同步狀態 10 if(r >= 0 ){//大於0代表獲取到了 11 setHeadAndPropagate(node,r);//設置為頭節點並且如果有多餘資源一併喚醒 12 p.next = null; 13 if(interrupted){ 14 selfInterrupted();//自我中斷 15 } 16 failed = false; 17 return; 18 } 19 } 20 //判斷線程是否可以進行休息如果可以休息就調用park方法 21 if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()){ 22 interrupted = true; 23 }
} 24 }finally{ 25 if(failed){ 26 cancelAcquire(node);//從同步隊列中刪除 27 } 28 } 29 }
共用式獲取鎖和獨占式唯一的區別在於setHeadAndPropagate這個方法,獨占式的鎖會去判斷是否為後繼節點,只有後繼節點才有資格在頭節點釋放了同步狀態以後獲取到同步狀態而共用式的實現依靠著setHeadAndPropagate這個方法
setHeadAndPorpagate(Node node,int arg):獲取共用同步狀態以後的操作
1 private void setHeadAndPropaGate(Node node,int propagate){
2 Node h = head;
3 setHead(node);//設置為頭節點
4 if(propagate >0 || h == null || h.waitStatus < 0){//大於0代表還有其他資源一併可以喚醒
5 Node s = node.next;//下一個節點
6 if(s == null || s.isShared()){
7 doReleaseShared();
8 }
9 }
10 }
這個方法主要的目的就是將獲取到同步狀態的節點設置為頭節點、如果存在多個資源就將多個資源一併喚醒
doReleaseShared():喚醒後繼節點
1 private void doReleaseShared(int arg){
2 for(;;){
3 Node h = head;
4 if(h != null && h != tail){
5 int ws = h.waitStatus;//獲取頭節點的等待狀態
6 if(!compareAndSetWaitStatus(h,Node.SIGNAL,0)){//設置不成功就一直進行設置
7 continue;
8 }
9 unparkSuccessor(h);//喚醒後繼節點
10 }else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
11 continue;
12 }
13 if (h == head)
14 break;
15 }
OK,至此,共用式的獲取鎖也研究過了。讓我們再梳理一下它的流程
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()併成功獲取到資源才返回。整個等待過程也是忽略中斷的。
其實跟acquire()的流程大同小異,只不過多了個自己拿到資源後,還會去喚醒後繼隊友的操作(這才是共用嘛)
releaseShared():釋放共用同步狀態
1 public final boolean releaseShared(int arg){
2 //子類自定義釋放鎖操作true代表釋放
3 if(tryReleaseShared(arg)){
4 doReleaseShared();//處理釋放的操作
5 return true;
6 }
7 }
通過子類自定義實現的釋放鎖操作判斷,如果未釋放就什麼也不幹,而doReleased方法就是去喚醒當前的後繼節點
四、總結
AQS在併發中是一個非常重要的基礎類,它定義了很多同步組件需要的方法。通過這些方法開發者可以簡單的實現一個相關的鎖。我們詳解了獨占和共用兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信大家都有一定認識了。值得註意的是,acquire()和acquireSahred()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支持響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,這裡相應的源碼跟acquire()和acquireSahred()差不多,這裡就不再詳解了。
==================================================================================
不管歲月里經歷多少辛酸和艱難,告訴自己風雨本身就是一種內涵,努力的面對,不過就是一場命運的漂流,既然在路上,那麼目的地必然也就是前方。
==================================================================================