一、AQS概念 1、隊列同步器是用來構建鎖或者其他同步組件的基礎框架,使用一個int型變數代表同步狀態,通過內置的隊列來完成線程的排隊工作。 2、下麵是JDK8文檔中對於AQS的部分介紹 總結來說就是: ①子類通過繼承AQS並實現其抽象方法來管理同步狀態,對於同步狀態的更改通過提供的getState ...
一、AQS概念
1、隊列同步器是用來構建鎖或者其他同步組件的基礎框架,使用一個int型變數代表同步狀態,通過內置的隊列來完成線程的排隊工作。
2、下麵是JDK8文檔中對於AQS的部分介紹
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable 提供一個框架,用於實現依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(信號量,事件等)。 該類被設計為大多數類型的同步器的有用依據,這些同步器依賴於單個原子int值來表示狀
態。子類必須定義改變此狀態的protected方法,以及根據該對象被獲取或釋放來定義該狀態的含義。給定這些,這個類中的其他方法執行所有排隊和阻塞機制。 子類可以保持其他狀態欄位,但只以
原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對於同步。 此類支持預設獨占模式和共用模式。 當以獨占模式獲取時,嘗試通過其他線程獲取不能成功。 多線程獲取的共用模式可能(但不需要)成功。 除了在機械意義上,這個類不理解這些差異,當共用
模式獲取成功時,下一個等待線程(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的線程共用相同的FIFO隊列。 通常,實現子類只支持這些模式之一,但是兩者都可以在
ReadWriteLock中發揮作用。僅支持獨占或僅共用模式的子類不需要定義支持未使用模式的方法。
總結來說就是:
①子類通過繼承AQS並實現其抽象方法來管理同步狀態,對於同步狀態的更改通過提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來進行操作,因為使用CAS操作保證同步狀態的改變是原子的。
②子類被推薦定義為自定義同步組件的靜態內部類,同步器本身並沒有實現任何的同步介面,僅僅是定義了若幹狀態獲取和釋放的方法來提供自定義同步組件的使用。
③同步器既可以支持獨占式的獲取同步狀態,也可以支持共用式的獲取同步狀態(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類型的同步組件)
3、同步器是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義;
二、AQS的介面和實例
1、同步器的設計實現原理
繼承同步器並且重寫指定的方法,然後將同步器組合在自定義同步組件的實現中,並且調用同步器提供的模板方法(這些模板方法會調用重寫的方法);而重寫指定的方法的時候,需要使用getState()、setState(int state)、compareAndSetState(int expect, int update)來訪問或者更新同步狀態。下麵是源碼中state變數和三個方法的定義聲明實現
1 /** 2 * .(同步狀態) 3 */ 4 private volatile int state; 5 6 /** 7 * (返回當前的同步狀態) 8 * 此操作的記憶體語義為@code volatile read 9 */ 10 protected final int getState() { 11 return state; 12 } 13 14 /** 15 * (設置新的同步狀態) 16 * 此操作的記憶體語義為@code volatile read 17 */ 18 protected final void setState(int newState) { 19 state = newState; 20 } 21 22 /** 23 * (如果要更新的狀態和期望的狀態相同,那就通過原子的方式更新狀態) 24 * ( 此操作的記憶體語義為@code volatile read 和 write) 25 * (如果更新的狀態和期望的狀態不同就返回false) 26 */ 27 protected final boolean compareAndSetState(int expect, int update) { 28 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 29 }
2、下麵介紹AQS提供可被重寫的方法
1 /** 2 * 獨占式的獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然後再進行CAS設置同步狀態 3 * 4 */ 5 protected boolean tryAcquire(int arg) { 6 throw new UnsupportedOperationException(); 7 } 8 9 /** 10 * 獨占式的釋放同步狀態,等待獲取同步狀態的線程可以有機會獲取同步狀態 11 * 12 */ 13 protected boolean tryRelease(int arg) { 14 throw new UnsupportedOperationException(); 15 } 16 17 /** 18 * 嘗試以共用模式獲取。 該方法應該查詢對象的狀態是否允許在共用模式下獲取該對象,如果是這樣,就可以獲取它。 該方法總是由執行獲取的線程調用。 19 * 如果此方法報告失敗,則獲取方法可能將線程排隊(如果尚未排隊),直到被其他線程釋放為止。 獲取失敗時返回負值,如果在獲取成共用模式下功但沒 20 * 有後續共用模式獲取可以成功,則為零; 並且如果以共用模式獲取成功並且隨後的共用模式獲取可能成功,則為正值,在這種情況下,後續等待線程必須檢查可用性。 21 */ 22 protected int tryAcquireShared(int arg) { 23 throw new UnsupportedOperationException(); //如果不支持共用模式 ,會拋出該異常 24 } 25 26 /** 27 * 嘗試將狀態設置為以共用模式釋放同步狀態。 該方法總是由執行釋放的線程調用。 28 */ 29 protected int tryReleaseShared(int arg) { 30 throw new UnsupportedOperationException(); //如果不支持共用模式 ,會拋出該異常 31 } 32 33 /** 34 * 當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占 35 */ 36 protected int isHeldExclusively(int arg) { 37 throw new UnsupportedOperationException(); //如果不支持共用模式 ,會拋出該異常 38 }
3、同步器提供的模板方法
在實現自定義同步組件的時候,需要重寫上面的方法,而下麵的模板方法會調用上面重寫的方法。下麵介紹同步器提供的模板方法
1 /** 2 * 以獨占模式獲取,忽略中斷。 通過調用至少一次tryAcquire(int)實現,成功返回。 否則線 3 * 程排隊,可能會重覆阻塞和解除阻塞,直到成功才調用tryAcquire(int) 4 */ 5 public final void acquire(int arg) {...} 6 7 /** 8 * 以獨占方式獲得,如果中斷,中止。 通過首先檢查中斷狀態,然後調用至少一次 9 * tryAcquire(int) ,成功返回。 否則線程排隊,可能會重覆阻塞和解除阻塞,調用 10 * tryAcquire(int)直到成功或線程中斷。 11 */ 12 public final void acquireInterruptibly(int arg) throws InterruptedException {...} 13 14 /** 15 * 嘗試以獨占模式獲取,如果中斷則中止,如果給定的超時時間失敗。 首先檢查中斷狀態,然 16 * 後調用至少一次tryAcquire(int) ,成功返回。 否則,線程排隊,可能會重覆阻塞和解除阻 17 * 塞,調用tryAcquire(int)直到成功或線程中斷或超時 18 */ 19 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...} 20 21 /** 22 * 以共用模式獲取,忽略中斷。 通過首次調用至少一次執行 tryAcquireShared(int),成功返 23 * 回。 否則線程排隊,可能會重覆阻塞和解除阻塞,直到成功調用tryAcquireShared(int) 。 24 */ 25 public final void acquireShared(int arg){...} 26 27 /** 28 * 以共用方式獲取,如果中斷,中止。 首先檢查中斷狀態,然後調用至少一次 29 * tryAcquireShared(int) ,成功返回。 否則線程排隊,可能會重覆阻塞和解除阻塞,調用 30 * tryAcquireShared(int)直到成功或線程中斷。 31 */ 32 public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...} 33 34 /** 35 * 嘗試以共用模式獲取,如果中斷則中止,如果給定的時間超過,則失敗。 通過首先檢查中斷 36 * 狀態,然後調用至少一次tryAcquireShared(int) ,成功返回。 否則,線程排隊,可能會重 37 * 復阻塞和解除阻塞,調用tryAcquireShared(int)直到成功或線程中斷或超時。 38 */ 39 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...} 40 41 /** 42 * 獨占式的釋放同步狀態,該方法會在釋放同步狀態之後,將同步隊列中的第一個節點包含的線程喚醒 43 */ 44 public final boolean release(int arg){...} 45 46 /** 47 * 共用式的釋放同步狀態 48 */ 49 public final boolean releaseShared(int arg){...} 50 51 /** 52 * 獲取在等待隊列上的線程集合 53 */ 54 public final Collection<Thread> getQueuedThreads(){...}
三、隊列同步器的實現分析
1、同步隊列
a)t同步隊列的實現原理
AQS內部維護一個同步隊列來完成同步狀態的管理,當前線程獲取同步狀態失敗的時候,AQS會將當前線程以及等待狀態信息構造成一個結點Node並將其加入同步隊列中,同時阻塞當前線程,當同步狀態由持有線程釋放的時候,會將同步隊列中的首節點喚醒使其再次嘗試獲取同步狀態。同步隊列中的結點用來保存獲取同步狀態失敗的線程的線程引用、等待狀態以及前驅結點和後繼結點。下麵是Node的屬性分析
1 static final class Node { 2 /** 共用模式下構造結點 */ 3 static final Node SHARED = new Node(); 4 /** 獨占模式下構造結點 */ 5 static final Node EXCLUSIVE = null; 6 7 /** 用於指示線程已經取消的waitStatus值(由於在同步隊列中等待的線程等待超時或者發生中斷,需要從同步隊列中取消等待,結點進入該狀態將不會發生變化)*/ 8 static final int CANCELLED = 1; 9 /** waitstatus值指示後續線程需要取消等待(後繼結點的線程處於等待狀態,而當前結點的線程如果釋放了同步狀態或者CANCELL,將會通知後繼結點的線程以運行) */ 10 static final int SIGNAL = -1; 11 /**waitStatus值表示線程正在等待條件(原本結點在等待隊列中,結點線程等待在Condition上,當其他線程對Condition調用了signal()方法之後)該結點會從
等待隊列中轉移到同步隊列中,進行同步狀態的獲取 */ 12 static final int CONDITION = -2; 13 /** 14 * waitStatus值表示下一個共用式同步狀態的獲取應該無條件傳播下去 15 */ 16 static final int PROPAGATE = -3; 17 18 /** 19 * 不同的等到狀態的int值 20 */ 21 volatile int waitStatus; 22 23 /** 24 * 前驅結點,當結點加入同步隊列將會被設置前驅結點信息 25 */ 26 volatile Node prev; 27 28 /** 29 * 後繼結點 30 */ 31 volatile Node next; 32 33 /** 34 * 當前獲取到同步狀態的線程 35 */ 36 volatile Thread thread; 37 38 /** 39 * 等待隊列中的後繼結點,如果當前結點是共用的,那麼這個欄位是一個SHARED常量;也就是說結點類型(獨占和共用)和等待隊列中的後繼結點公用一個欄位 40 */ 41 Node nextWaiter; 42 43 /** 44 * 如果是共用模式下等待,那麼返回true(因為上面的Node nextWaiter欄位在共用模式下是一個SHARED常量) 45 */ 46 final boolean isShared() { 47 return nextWaiter == SHARED; 48 } 49 50 final Node predecessor() throws NullPointerException { 51 Node p = prev; 52 if (p == null) 53 throw new NullPointerException(); 54 else 55 return p; 56 } 57 58 Node() { // 用於建立初始頭結點或SHARED標記 59 } 60 61 Node(Thread thread, Node mode) { // 用於添加到等待隊列 62 this.nextWaiter = mode; 63 this.thread = thread; 64 } 65 66 Node(Thread thread, int waitStatus) { // Used by Condition 67 this.waitStatus = waitStatus; 68 this.thread = thread; 69 } 70 }
b)同步隊列示意圖和簡單分析
①同步隊列示意圖:當一個線程獲取了同步狀態後,其他線程不能獲取到該同步狀態,就會被構造稱為Node然後添加到同步隊列之中,這個添加的過程基於CAS保證線程安全性。
②同步隊列遵循先進先出(FIFO),首節點是獲取到同步狀態的結點,首節點的線程在釋放同步狀態的時候將會喚醒後繼結點(然後後繼結點就會變成新的首節點等待獲取同步狀態)
2、獨占式同步狀態的獲取和釋放
①前面說過,同步器的acquire()方法會獲取同步狀態,這個方法對不會響應中斷,也就是說當線程獲取通同步狀態失敗後會被構造成結點加入到同步隊列中,當線程被中斷時不會從同步隊列中移除。
1 /** 2 * ①首先調用tryAcquire方法嘗試獲取同步狀態,如果獲取同步狀態失敗,就進行下麵的操作 3 * ②獲取失敗:按照獨占式的模式構造同步結點並通過addWaiter方法將結點添加到同步隊列的尾部 4 * ③通過acquireQueue方法自旋獲取同步狀態。 5 * ④如果獲取不到同步狀態,就阻塞結點中的線程,而結點中的線程喚醒主要是通過前驅結點的出隊或者被中斷來實現 6 */ 7 public final void acquire(int arg) { 8 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 9 selfInterrupt(); 10 }
②下麵是addWaiter、enq和自旋獲取同步狀態acquireQueue方法的實現(該方法的主要作用就是將獲取同步狀態失敗的線程構造成結點然後添加到同步隊列的隊尾)
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 //嘗試直接放在隊尾 4 Node pred = tail; //直接獲取同步器的tail結點 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { 8 //隊尾結點不為空通過原子操作將構造的結點置為隊尾結點 9 pred.next = node; 10 return node; 11 } 12 } 13 //採用自旋方式保證構造的結點添加到同步隊列中 14 enq(node); 15 return node; 16 } 17 private Node enq(final Node node) { 18 for (;;) { //死迴圈知道添加成功 19 Node t = tail; 20 if (t == null) { // Must initialize 21 if (compareAndSetHead(new Node())) 22 tail = head; 23 } else { 24 node.prev = t; 25 //通過CAS方式將結點添加到同步隊列之後才會返回,否則就會不斷嘗試添加(這樣實際上就是在併發情況下,把向同步隊列添加Node變得串列化了) 26 if (compareAndSetTail(t, node)) { 27 t.next = node; 28 return t; 29 } 30 } 31 } 32 } 33 /** 34 * 通過tryAcquire()和addWaiter(),表示該線程獲取同步狀態已經失敗,被放入同步 35 * 隊列尾部了。線程阻塞等待直到其他線程(前驅結點獲得同步裝填或者被中斷)釋放同步狀 36 * 態後喚醒自己,自己才能獲得。 37 */ 38 final boolean acquireQueued(final Node node, int arg) { 39 boolean failed = true; 40 try { 41 boolean interrupted = false; 42 //線程在死迴圈的方式中嘗試獲取同步狀態 43 for (;;) { 44 final Node p = node.predecessor(); //獲取前驅結點 45 //只有前驅接待是頭結點的時候才能嘗試獲取同步狀態 46 if (p == head && tryAcquire(arg)) { 47 setHead(node); //獲取到同步狀態之後,就將自己設置為頭結點 48 p.next = null; //前驅結點已經獲得同步狀態去執行自己的程式了,所以需要釋放掉占用的同步隊列的資源,由JVM回收 49 failed = false; 50 return interrupted; 51 } 52 //如果獲取同步狀態失敗,應該自旋等待繼續獲取並且校驗自己的中斷標誌位信息 53 if (shouldParkAfterFailedAcquire(p, node) && 54 parkAndCheckInterrupt()) 55 interrupted = true; //如果被中斷,就改變自己的中斷標誌位狀態信息 56 } 57 } finally { 58 if (failed) 59 cancelAcquire(node); 60 } 61 }
③獨占式獲取同步狀態的整個流程
④獨占式同步器的釋放:release方法執行時,會喚醒頭結點的後繼結點線程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;//頭結點 //喚醒頭結點的後繼結點線程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
3、共用式同步狀態的獲取和釋放
①共用式獲取和獨占式獲取最主要的區別是能否有多個線程同時獲取到同步狀態。如圖所示簡易描述二者的區別(共用式訪問的時候,可以允許多個線程訪問資源,但是存在獨占式訪問的時候,同一時刻其他的不管是共用還是獨占都會被阻塞)
②關於共用式獲取同步狀態的方法
1 /** 2 * 此方法是共用模式下線程獲取共用同步狀態的頂層入口。它會嘗試去獲取同步狀態,獲取成功則直接返回, 3 * 獲取失敗則進入等待隊列一直嘗試獲取(執行doAcquireShared方法體中的內容),直到獲取到資源為止(條件就是tryAcquireShared方法返回值大於等於0),整個過程忽略中斷 4 */ 5 public final void acquireShared(int arg) { 6 if (tryAcquireShared(arg) < 0) 7 doAcquireShared(arg); 8 } 9 /** 10 * "自旋"嘗試獲取同步狀態 11 */ 12 private void doAcquireShared(int arg) { 13 //首先將該線程包括線程引用、等待狀態、前驅結點和後繼結點的信息封裝台Node中,然後添加到等待隊列裡面(一共用模式添加) 14 final Node node = addWaiter(Node.SHARED); 15 boolean failed = true; 16 try { 17 boolean interrupted = false; //當前線程的中斷標誌 18 for (;;) { 19 final Node p = node.predecessor(); //獲取前驅結點 20 if (p == head) { 21 //當前驅結點是頭結點的時候就會以共用的方式去嘗試獲取同步狀態 22 int r = tryAcquireShared(arg); 23 //判斷tryAcquireShared的返回值 24 if (r >= 0) { 25 //如果返回值大於等於0,表示獲取同步狀態成功,就修改當前的頭結點並將信息傳播都後續的結點隊列中 26 setHeadAndPropagate(node, r); 27 p.next = null; // 釋放掉已經獲取到同步狀態的前驅結點的資源 28 if (interrupted) 29 selfInterrupt(); //檢查中斷標誌 30 failed = false; 31 return; 32 } 33 } 34 if (shouldParkAfterFailedAcquire(p, node) && 35 parkAndCheckInterrupt()) 36 interrupted = true; 37 } 38 } finally { 39 if (failed) 40 cancelAcquire(node); 41 } 42 }
根據源代碼我們可以瞭解共用式獲取同步狀態的整個過程
首先同步器會調用tryAcquireShared方法來嘗試獲取同步狀態,然後根據這個返回值來判斷是否獲取到同步狀態(當返回值大於等於0可視為獲取到同步狀態);如果第一次獲取失敗的話,就進入'自旋'狀態(執行doAcquireShared方法)一直嘗試去獲取同步狀態;在自旋獲取中,如果檢查到當前前驅結點是頭結點的話,就會嘗試獲取同步狀態,而一旦獲取成功(tryAcquireShared方法返回值大於等於0)就可以從自旋狀態退出。
另外,還有一點就是上面說到的一個處於等待隊列的線程要想開始嘗試去獲取同步狀態,需要滿足的條件就是前驅結點是頭結點,那麼它本身就是整個隊列中的第二個結點。當頭結點釋放掉所有的臨界資源之後,我們考慮每個線程運行所需資源的不同數量問題,如下圖所示
③共用式同步狀態的釋放
對於支持共用式的同步組件(即多個線程同同時訪問),它們和獨占式的主要區別就是tryReleaseShared方法必須確保同步狀態的釋放是線程安全的(CAS的模式來釋放同步狀態,因為既然是多個線程能夠訪問,那麼釋放的時候也會是多個線程的,就需要保證釋放時候的線程安全)
1 /** 2 * 該方法是共用模式下線程釋放共用資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。 3 */ 4 public final boolean releaseShared(int arg) { 5 if (tryReleaseShared(arg)) { 6 doReleaseShared(); // 7 return true; 8 } 9 return false; 10 }
四、自定義同步組件的實現
1、共用式鎖的實現
①、自定義一個同步組件,可以允許兩個線程訪問(共用式同步組件),超過兩個線程就會被阻塞。
②、既然是共用式同步組件,按照前面所說的,組件本身需要使用AQS提供的共用式模板方法acquireShared等;組件的內部類需要實現AQS,並且重寫關於共用式獲取同步狀態的方法(tryAcquireShared()、tryReleaseShared()等共用模式下的方法)。
③、既然是兩個線程能夠同時訪問的話,那麼狀態數的取值範圍就是0、1、2了,每當一個線程獲取到同步狀態的時候state值減1,反之就會增加1;當state值為0的時候就會阻塞其他想要獲取同步狀態的線程。對於同步狀態的更改需要使用CAS來進行保證原子性。
1 package cn.source.concurrent; 2 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 5 import java.util.concurrent.locks.Condition; 6 import java.util.concurrent.locks.Lock; 7 8 public class TestAQS implements Lock{ 9 10 private Sync sync = new Sync(2); 11 12 private static class Sync extends AbstractQueuedSynchronizer { 13 14 Sync(int num) { 15 if(num <= 0) { 16 throw new RuntimeException("num需要大於0"); 17 } 18 setState(num); 19 } 20 21 @Override 22 protected int tryAcquireShared(int arg) { 23 for(; ;) { 24 int currentState = getState(); 25 int newState = currentState - arg; 26 if(newState < 0 || compareAndSetState(currentState, newState)) { 27 return newState; 28 } 29 } 30 } 31 32 @Override 33 protected boolean tryReleaseShared(int arg) { 34 for(; ;) { 35 int currentState = getState(); 36 int newState = currentState + arg; 37 if(compareAndSetState(currentState, newState)) { 38 return true; 39 } 40 } 41 } 42 43 44 } 45 @Override 46 public void lock() { 47 sync.acquireShared(1); 48 } 49 50 @Override 51 public void unlock() { 52 sync.releaseShared(1); 53 } 54 55 //...... 56 }共用式鎖
1 /** 2 * 測試結果:輸出的線程名稱是成對的,保證同一時刻只有兩個線程能夠獲取到鎖 3 * 4 */ 5 public class TestLockShare { 6 @Test 7 public void test() { 8