SynchronousQueue介紹 【1】SynchronousQueue是一個沒有數據緩衝的BlockingQueue,生產者線程對其的插入操作put必須等待消費者的移除操作take。 【2】如圖所示,SynchronousQueue 最大的不同之處在於,它的容量為 0,所以沒有一個地方來暫存元 ...
SynchronousQueue介紹
【1】SynchronousQueue是一個沒有數據緩衝的BlockingQueue,生產者線程對其的插入操作put必須等待消費者的移除操作take。
【2】如圖所示,SynchronousQueue 最大的不同之處在於,它的容量為 0,所以沒有一個地方來暫存元素,導致每次取數據都要先阻塞,直到有數據被放入;同理,每次放數據的時候也會阻塞,直到有消費者來取。
【3】需要註意的是,SynchronousQueue 的容量不是 1 而是 0,因為 SynchronousQueue 不需要去持有元素,它所做的就是直接傳遞(direct handoff)。由於每當需要傳遞的時候,SynchronousQueue 會把元素直接從生產者傳給消費者,在此期間並不需要做存儲,所以如果運用得當,它的效率是很高的。
SynchronousQueue的源碼分析
【1】構造函數
//預設採用非公平 public SynchronousQueue() { this(false); } //可以選擇模式 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
【2】核心方法分析
//這些方法本質上都是調用屬性值transferer的transfer方法 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public boolean offer(E e) { if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); } public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return transferer.transfer(null, true, 0); }
s
Transferer分析
【1】Transferer是SynchronousQueue的內部抽象類,雙棧和雙隊列演算法共用該類。他只有一個transfer方法,用於轉移元素,從生產者轉移到消費者;或者消費者調用該方法從生產者取數據。
【2】Transferer有兩個實現類:TransferQueue和TransferStack。
【3】這兩個類的區別就在於是否公平。TransferQueue是公平的,TransferStack非公平。
【4】源碼展示
// 堆棧和隊列共同的介面,負責執行 put or take abstract static class Transferer<E> { // e 為空的,會直接返回特殊值,不為空會傳遞給消費者 // timed 為 true,說明會有超時時間 abstract E transfer(E e, boolean timed, long nanos); }
TransferQueue分析
【1】節點元素
//隊列節點元素 static final class QNode { // 當前元素的下一個元素 volatile QNode next; // 當前元素的值,如果當前元素被阻塞住了,等其他線程來喚醒自己時,其他線程會把自己 set 到 item 裡面 volatile Object item; // 可以阻塞住的當前線程 volatile Thread waiter; // 節點類型:true是 put,false是 take final boolean isData; .... }
【2】構造方法
//隊列頭結點指針 transient volatile QNode head; //隊列尾結點指針 transient volatile QNode tail; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }
【3】核心方法
@SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; //根據是否傳入數據 判斷是獲取還是存放 boolean isData = (e != null); for (;;) { // 隊列頭和尾的臨時變數,隊列是空的時候,t=h QNode t = tail; QNode h = head; // tail 和 head 沒有初始化時,無限迴圈,雖然這種 continue 非常耗cpu,但感覺不會碰到這種情況 // 因為 tail 和 head 在 TransferQueue 初始化的時候,就已經被賦值空節點了 if (t == null || h == null) // saw uninitialized value continue; // spin // 首尾節點相同,說明是空隊列 // 或者尾節點的操作和當前節點操作一致 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) //直至拿到尾節點 continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } //超時直接返回 null if (timed && nanos <= 0) // can't wait return null; //構建新節點 if (s == null) s = new QNode(e, isData); //將新建節點塞入隊列 if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // 阻塞住自己 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } // 隊列不為空,並且當前操作和隊尾不一致,也就是說當前操作是隊尾是對應的操作 // 比如說隊尾是因為 take 被阻塞的,那麼當前操作必然是 put else { // 也就是這行代碼體現出隊列的公平,每次操作時,從頭開始按照順序進行操作 QNode m = h.next; if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } // 當前操作放到隊頭 advanceHead(h, m); // 釋放隊頭阻塞節點 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
TransferStack分析
【1】節點元素
// 棧中節點的幾種類型: // 1. 消費者(請求數據的) static final int REQUEST = 0; // 2. 生產者(提供數據的) static final int DATA = 1; // 3. 二者正在匹配中 static final int FULFILLING = 2; // 棧中的節點 static final class SNode { // 下一個節點 volatile SNode next; volatile SNode match; // the node matched to this // 等待著的線程 volatile Thread waiter; Object item; // 模式,也就是節點的類型,是消費者,是生產者,還是正在匹配中 int mode; ... }
【2】核心方法
// TransferStack.transfer()方法 E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // 根據e是否為null決定是生產者還是消費者 int mode = (e == null) ? REQUEST : DATA; // 自旋+CAS for (;;) { // 棧頂元素 SNode h = head; // 棧頂沒有元素,或者棧頂元素跟當前元素是一個模式的 // 也就是都是生產者節點或者都是消費者節點 if (h == null || h.mode == mode) { // empty or same-mode // 如果有超時而且已到期 if (timed && nanos <= 0) { // can't wait // 如果頭節點不為空且是取消狀態 if (h != null && h.isCancelled()) // 就把頭節點彈出,併進入下一次迴圈 casHead(h, h.next); // pop cancelled node else // 否則,直接返回null(超時返回null) return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // 入棧成功(因為是模式相同的,所以只能入棧) // 調用awaitFulfill()方法自旋+阻塞當前入棧的線程並等待被匹配到 SNode m = awaitFulfill(s, timed, nanos); // 如果m等於s,說明取消了,那麼就把它清除掉,並返回null if (m == s) { // wait was cancelled clean(s); // 被取消了返回null return null; } // 到這裡說明匹配到元素了 // 因為從awaitFulfill()裡面出來要不被取消了要不就匹配到了 // 如果頭節點不為空,並且頭節點的下一個節點是s // 就把頭節點換成s的下一個節點 // 也就是把h和s都彈出了 // 也就是把棧頂兩個元素都彈出了 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 根據當前節點的模式判斷返回m還是s中的值 return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill // 到這裡說明頭節點和當前節點模式不一樣 // 如果頭節點不是正在匹配中 // 如果頭節點已經取消了,就把它彈出棧 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 頭節點沒有在匹配中,就讓當前節點先入隊,再讓他們嘗試匹配 // 且s成為了新的頭節點,它的狀態是正在匹配中 for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match // 如果m為null,說明除了s節點外的節點都被其它線程先一步匹配掉了 // 就清空棧並跳出內部迴圈,到外部迴圈再重新入棧判斷 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; // 如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m // 返回匹配結果 return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了 // 就協助清除它 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 到這裡說明當前節點和頭節點模式不一樣 // 且頭節點是正在匹配中 SNode m = h.next; // m is h's match if (m == null) // waiter is gone // 如果m為null,說明m已經被其它線程先一步匹配了 casHead(h, null); // pop fulfilling node else { SNode mn = m.next; // 協助匹配,如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s if (m.tryMatch(h)) // help match // 將棧頂的兩個元素彈出後,再讓s重新入棧 casHead(h, mn); // pop both h and m else // lost match // 嘗試匹配失敗,說明m已經先一步被其它線程匹配了 // 就協助清除它 h.casNext(m, mn); // help unlink } } } } // 三個參數:需要等待的節點,是否需要超時,超時時間 SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 到期時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 當前線程 Thread w = Thread.currentThread(); // 自旋次數 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 當前線程中斷了,嘗試清除s if (w.isInterrupted()) s.tryCancel(); // 檢查s是否匹配到了元素m(有可能是其它線程的m匹配到當前線程的s) SNode m = s.match; // 如果匹配到了,直接返回m if (m != null) return m; // 如果需要超時 if (timed) { // 檢查超時時間如果小於0了,嘗試清除s nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) // 如果還有自旋次數,自旋次數減一,併進入下一次自旋 spins = shouldSpin(s) ? (spins-1) : 0; // 後面的elseif都是自旋次數沒有了 else if (s.waiter == null) // 如果s的waiter為null,把當前線程註入進去,併進入下一次自旋 s.waiter = w; // establish waiter so can park next iter else if (!timed) // 如果不允許超時,直接阻塞,並等待被其它線程喚醒,喚醒後繼續自旋並查看是否匹配到了元素 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // 如果允許超時且還有剩餘時間,就阻塞相應時間 LockSupport.parkNanos(this, nanos); } } // SNode裡面的方向,調用者m是s的下一個節點 // 這時候m節點的線程應該是阻塞狀態的 boolean tryMatch(SNode s) { // 如果m還沒有匹配者,就把s作為它的匹配者 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; // 喚醒m中的線程,兩者匹配完畢 LockSupport.unpark(w); } // 匹配到了返回true return true; } // 可能其它線程先一步匹配了m,返回其是否是s return match == s; }
SynchronousQueue總結
【1】是一個沒有數據緩衝的BlockingQueue,容量為0,它不會為隊列中元素維護存儲空間,它只是多個線程之間數據交換的媒介。
【2】數據結構:鏈表,在其內部類中維護了數據
先消費(take),後生產(put);
第一個線程Thread0是消費者訪問,此時隊列為空,則入隊(創建Node結點並賦值)
第二個線程Thread1也是消費者訪問,與隊尾模式相同,繼續入隊
第三個線程Thread2是生產者,攜帶了數據e,與隊尾模式不同,不進行入隊操作。直接將該線程攜帶的數據e返回給隊首的消費者,並喚醒隊首線程Thread1(預設非公平策略是棧結構),出隊。
反之,先生產(put)後消費(take),原理一樣
【3】鎖:CAS+自旋(無鎖)【阻塞:自旋了一定次數後調用 LockSupport.park()】
【4】存取調用同一個方法:transfer()
put、offer 為生產者,攜帶了數據 e,為 Data 模式,設置到 SNode或QNode 屬性中。
take、poll 為消費者,不攜帯數據,為 Request 模式,設置到 SNode或QNode屬性中。
【5】過程
線程訪問阻塞隊列,先判斷隊尾節點或者棧頂節點的 Node 與當前入隊模式是否相同
相同則構造節點 Node 入隊,並阻塞當前線程,元素 e 和線程賦值給 Node 屬性
不同則將元素 e(不為 null) 返回給取數據線程,隊首或棧頂線程被喚醒,出隊
【6】公平模式:TransferQueue,隊尾匹配(判斷模式),隊頭出隊,先進先出
【7】非公平模式(預設策略):TransferStack,棧頂匹配,棧頂出棧,後進先出
【8】應用場景
SynchronousQueue非常適合傳遞性場景做交換工作,生產者的線程和消費者的線程同步傳遞某些信息、事件或者任務。
SynchronousQueue的一個使用場景是線上程池裡。如果我們不確定來自生產者請求數量,但是這些請求需要很快的處理掉,那麼配合SynchronousQueue為每個生產者請求分配一個消費線程是處理效率最高的辦法。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重覆使用,線程空閑了60秒後會被回收。
Transferer