java.util.concurrent包中的大多數同步器實現都是圍繞著共同的基礎行為,比如等待隊列、條件隊列、獨占獲取、共用獲取等,而這些行為的抽象就是基於AbstractQueuedSynchronizer(簡稱AQS)實現的,AQS是一個抽象同步框架,可以用來實現一個依賴狀態的同步器 ...
什麼是AQS
1.java.util.concurrent包中的大多數同步器實現都是圍繞著共同的基礎行為,比如等待隊列、條件隊列、獨占獲取、共用獲取等,而這些行為的抽象就是基於AbstractQueuedSynchronizer(簡稱AQS)實現的,AQS是一個抽象同步框架,可以用來實現一個依賴狀態的同步器。
2.JDK中提供的大多數的同步器如Lock, Latch, Barrier等,都是基於AQS框架來實現的
【1】一般是通過一個內部類Sync繼承 AQS
【2】將同步器所有調用都映射到Sync對應的方法
AQS具備的特性:
1.阻塞等待隊列 , 2.共用/獨占 , 3.公平/非公平 , 4.可重入 , 5.允許中斷
AQS定義兩種資源共用方式
1.Exclusive-獨占,只有一個線程能執行,如ReentrantLock(詳情可查看 深入理解ReentrantLock類鎖)
2.Share-共用,多個線程可以同時執行,如Semaphore/CountDownLatch
AQS定義兩種隊列
1.同步等待隊列【主要用於維護獲取鎖失敗時入隊的線程】
【1】AQS當中的同步等待隊列也稱CLH隊列,CLH隊列是Craig、Landin、Hagersten三人發明的一種基於雙向鏈表數據結構的隊列,是FIFO先進先出線程等待隊列,Java中的CLH隊列是原CLH隊列的一個變種,線程由原自旋機制改為阻塞機制。
【2】AQS 依賴CLH同步隊列來完成同步狀態的管理:
1)當前線程如果獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構造成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程
2)當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。
3)通過signal或signalAll將條件隊列中的節點轉移到同步隊列。(由條件隊列轉化為同步隊列)
【3】圖示:
2.條件等待隊列【調用await()的時候會釋放鎖,然後線程會加入到條件隊列,調用signal()喚醒的時候會把條件隊列中的線程節點移動到同步隊列中,等待再次獲得鎖】
【1】AQS中條件隊列是使用單向列表保存的,用nextWaiter來連接:
1)調用await方法阻塞線程;
2)當前線程存在於同步隊列的頭結點,調用await方法進行阻塞(從同步隊列轉化到條件隊列)
3.AQS 定義了5個隊列中節點狀態:
1)值為0,初始化狀態,表示當前節點在sync隊列中,等待著獲取鎖。
2)CANCELLED,值為1,表示當前的線程被取消;
3)SIGNAL,值為-1,表示當前節點的後繼節點包含的線程需要運行,也就是unpark;
4)CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
5)PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行;
源碼詳解(將源碼拆分為三塊,抽象同步器AbstractQueuedSynchronizer類,節點Node類,條件對象ConditionObject類)
AbstractQueuedSynchronizer類解析
1.屬性值解析
//用鏈表來表示隊列 private transient volatile Node head; private transient volatile Node tail; private volatile int state; //可以表示鎖的加鎖狀態【獨占鎖只為1,共用鎖可以大於1】,又可以表示鎖的重入次數,0為沒有加鎖
2.方法解析
//定義了主體的大體邏輯,如入隊,如嘗試加鎖 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } //模板方法的處理,如果子類沒有實現,則子類中調用的話會報錯 //提供給子類去實現的公平與非公平的邏輯 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //釋放鎖的邏輯 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
Node類詳解
1.代碼展示
static final class Node { static final Node SHARED = new Node(); // 共用模式標記 static final Node EXCLUSIVE = null; // 獨占模式標記 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; //值為0,初始化狀態,表示當前節點在sync隊列中,等待著獲取鎖。 //CANCELLED,值為1,表示當前的線程被取消; //SIGNAL,值為-1,表示當前節點的後繼節點包含的線程需要運行,也就是unpark; //CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中; //PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行; volatile int waitStatus; volatile Node prev;//前驅結點 volatile Node next;//後繼結點 volatile Thread thread; //與節點綁定的線程 Node nextWaiter; // 存儲condition隊列中的後繼節點 final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
Condition介面詳解
1.代碼展示
//Condition用來替代synchronized鎖的監視器的功能,而且更加靈活 //一個Condition實例需要與一個lock進行綁定 public interface Condition { //調用此方法的線程將加入等待隊列,阻塞直到被通知或者線程中斷 void await() throws InterruptedException; //調用此方法的線程將加入等待隊列,阻塞直到被通知(線程中斷忽略) void awaitUninterruptibly(); //調用此方法的線程將加入等待隊列,阻塞直到被通知或者線程中斷或等待超時 long awaitNanos(long nanosTimeout) throws InterruptedException; //調用此方法的線程將加入等待隊列,阻塞直到被通知或者線程中斷或等待超時 boolean await(long time, TimeUnit unit) throws InterruptedException; //調用此方法的線程將加入等待隊列,阻塞直到被通知或者線程中斷或超出指定日期 boolean awaitUntil(Date deadline) throws InterruptedException; //喚醒一個等待中的線程 void signal(); //喚醒所以等待中的線程 void signalAll(); }
2.發現說明
【1】在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通信方式,Condition都可以實現,這裡註意,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。Condition的強大之處在於,對於一個鎖,我們可以為多個線程間建立不同的Condition。如果採用Object類中的wait(), notify(), notifyAll()實現的話,當寫入數據之後需要喚醒讀線程時,不可能通過notify()或notifyAll()明確的指定喚醒讀線程,而只能通過notifyAll喚醒所有線程,但是notifyAll無法區分喚醒的線程是讀線程,還是寫線程。所以,通過Condition能夠更加精細的控制多線程的休眠與喚醒。
【2】但,condition的使用必須依賴於lock對象,通過lock對象的newCondition()方法初始化一個condition對象。
ConditionObject類詳解【Condition介面的實現類】
1.屬性值解析
//由頭尾兩個節點指針形成的鏈表來達到隊列的效果 private transient Node firstWaiter; private transient Node lastWaiter;
2.方法解析
【1】核心await方法
public final void await() throws InterruptedException { //如果線程中斷,直接拋出異常 if (Thread.interrupted()) throw new InterruptedException(); //進入等待隊列中 Node node = addConditionWaiter(); //釋放當前線程持有的鎖,並獲取當前同步器狀態 int savedState = fullyRelease(node); int interruptMode = 0; //如果不在同步隊列中,那麼直接阻塞當前線程;直到被喚醒時,加入到同步隊列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //此時已經被喚醒,那麼嘗試獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //如果節點中斷取消,那麼清除節點 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //addConditionWaiter將一個節點添加到condition隊列中。在入隊時,判斷當前尾節點是不是CONDITION。如果不是則判斷當前尾節點已經被取消,將當前節點出隊。那麼也就是說在隊列中的節點狀態,要麼是CONDITION,要麼是CANCELLED private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } //方法的作用是移除取消的節點。方法本身只有在持有鎖的時候會被調用。方法會遍歷當前condition隊列,將所有非Condition狀態的節點移除。 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
【2】核心signal方法與signalAll方法
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } final boolean transferForSignal(Node node) { //如果不能更改waitStatus,則表示該節點已被取消 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }