SynchronousQueue的實現方式? SynchronousQueue真的是無緩衝的嗎? SynchronousQueue在高併發情景下會有什麼問題? ...
問題
(1)SynchronousQueue的實現方式?
(2)SynchronousQueue真的是無緩衝的嗎?
(3)SynchronousQueue在高併發情景下會有什麼問題?
簡介
SynchronousQueue是java併發包下無緩衝阻塞隊列,它用來在兩個線程之間移交元素,但是它有個很大的問題,你知道是什麼嗎?請看下麵的分析。
源碼分析
主要屬性
// CPU的數量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超時的情況自旋多少次,當CPU數量小於2的時候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 沒有超時的情況自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 針對有超時的情況,自旋了多少次後,如果剩餘時間大於1000納秒就使用帶時間的LockSupport.parkNanos()這個方法
static final long spinForTimeoutThreshold = 1000L;
// 傳輸器,即兩個線程交換元素使用的東西
private transient volatile Transferer<E> transferer;
通過屬性我們可以Get到兩個點:
(1)這個阻塞隊列裡面是會自旋的;
(2)它使用了一個叫做transferer的東西來交換元素;
主要內部類
// Transferer抽象類,主要定義了一個transfer方法用來傳輸元素
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
// 以棧方式實現的Transferer
static final class TransferStack<E> extends Transferer<E> {
// 棧中節點的幾種類型:
// 1. 消費者(請求數據的)
static final int REQUEST = 0;
// 2. 生產者(提供數據的)
static final int DATA = 1;
// 3. 二者正在匹配中
static final int FULFILLING = 2;
// 棧中的節點
static final class SNode {
// 下一個節點
volatile SNode next; // next node in stack
// 匹配者
volatile SNode match; // the node matched to this
// 等待著的線程
volatile Thread waiter; // to control park/unpark
// 元素
Object item; // data; or null for REQUESTs
// 模式,也就是節點的類型,是消費者,是生產者,還是正在匹配中
int mode;
}
// 棧的頭節點
volatile SNode head;
}
// 以隊列方式實現的Transferer
static final class TransferQueue<E> extends Transferer<E> {
// 隊列中的節點
static final class QNode {
// 下一個節點
volatile QNode next; // next node in queue
// 存儲的元素
volatile Object item; // CAS'ed to or from null
// 等待著的線程
volatile Thread waiter; // to control park/unpark
// 是否是數據節點
final boolean isData;
}
// 隊列的頭節點
transient volatile QNode head;
// 隊列的尾節點
transient volatile QNode tail;
}
(1)定義了一個抽象類Transferer,裡面定義了一個傳輸元素的方法;
(2)有兩種傳輸元素的方法,一種是棧,一種是隊列;
(3)棧的特點是後進先出,隊列的特點是先進行出;
(4)棧只需要保存一個頭節點就可以了,因為存取元素都是操作頭節點;
(5)隊列需要保存一個頭節點一個尾節點,因為存元素操作尾節點,取元素操作頭節點;
(6)每個節點中保存著存儲的元素、等待著的線程,以及下一個節點;
(7)棧和隊列兩種方式有什麼不同呢?請看下麵的分析。
主要構造方法
public SynchronousQueue() {
// 預設非公平模式
this(false);
}
public SynchronousQueue(boolean fair) {
// 如果是公平模式就使用隊列,如果是非公平模式就使用棧
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
(1)預設使用非公平模式,也就是棧結構;
(2)公平模式使用隊列,非公平模式使用棧;
入隊
我們這裡主要介紹以棧方式實現的傳輸模式,以put(E e)方法為例。
public void put(E e) throws InterruptedException {
// 元素不可為空
if (e == null) throw new NullPointerException();
// 直接調用傳輸器的transfer()方法
// 三個參數分別是:傳輸的元素,是否需要超時,超時的時間
if (transferer.transfer(e, false, 0) == null) {
// 如果傳輸失敗,直接讓線程中斷並拋出中斷異常
Thread.interrupted();
throw new InterruptedException();
}
}
調用transferer的transfer()方法,傳入元素e,說明是生產者
出隊
我們這裡主要介紹以棧方式實現的傳輸模式,以take()方法為例。
public E take() throws InterruptedException {
// 直接調用傳輸器的transfer()方法
// 三個參數分別是:null,是否需要超時,超時的時間
// 第一個參數為null表示是消費者,要取元素
E e = transferer.transfer(null, false, 0);
// 如果取到了元素就返回
if (e != null)
return e;
// 否則讓線程中斷並拋出中斷異常
Thread.interrupted();
throw new InterruptedException();
}
調用transferer的transfer()方法,傳入null,說明是消費者。
transfer()方法
transfer()方法同時實現了取元素和放元素的功能,下麵我再來看看這個transfer()方法里究竟幹了什麼。
// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// 根據e是否為null決定是生產者還是消費者
int mode = (e == null) ? REQUEST : DATA;
// 自旋+CAS,熟悉的套路,熟悉的味道
for (;;) {
// 棧頂元素
SNode h = head;
// 棧頂沒有元素,或者棧頂元素跟當前元素是一個模式的
// 也就是都是生產者節點或者都是消費者節點
if (h == null || h.mode == mode) { // empty or same-mode
// 如果有超時而且已到期
if (timed && nanos <= 0) { // can't wait
// 如果頭節點不為空且是取消狀態
if (h != null && h.isCancelled())
// 就把頭節點彈出,併進入下一次迴圈
casHead(h, h.next); // pop cancelled node
else
// 否則,直接返回null(超時返回null)
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 入棧成功(因為是模式相同的,所以只能入棧)
// 調用awaitFulfill()方法自旋+阻塞當前入棧的線程並等待被匹配到
SNode m = awaitFulfill(s, timed, nanos);
// 如果m等於s,說明取消了,那麼就把它清除掉,並返回null
if (m == s) { // wait was cancelled
clean(s);
// 被取消了返回null
return null;
}
// 到這裡說明匹配到元素了
// 因為從awaitFulfill()裡面出來要不被取消了要不就匹配到了
// 如果頭節點不為空,並且頭節點的下一個節點是s
// 就把頭節點換成s的下一個節點
// 也就是把h和s都彈出了
// 也就是把棧頂兩個元素都彈出了
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// 根據當前節點的模式判斷返回m還是s中的值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 到這裡說明頭節點和當前節點模式不一樣
// 如果頭節點不是正在匹配中
// 如果頭節點已經取消了,就把它彈出棧
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 頭節點沒有在匹配中,就讓當前節點先入隊,再讓他們嘗試匹配
// 且s成為了新的頭節點,它的狀態是正在匹配中
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
// 如果m為null,說明除了s節點外的節點都被其它線程先一步匹配掉了
// 就清空棧並跳出內部迴圈,到外部迴圈再重新入棧判斷
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
// 如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
// 返回匹配結果
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
// 就協助清除它
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
// 到這裡說明當前節點和頭節點模式不一樣
// 且頭節點是正在匹配中
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
// 如果m為null,說明m已經被其它線程先一步匹配了
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
// 協助匹配,如果m和s嘗試匹配成功,就彈出棧頂的兩個元素m和s
if (m.tryMatch(h)) // help match
// 將棧頂的兩個元素彈出後,再讓s重新入棧
casHead(h, mn); // pop both h and m
else // lost match
// 嘗試匹配失敗,說明m已經先一步被其它線程匹配了
// 就協助清除它
h.casNext(m, mn); // help unlink
}
}
}
}
// 三個參數:需要等待的節點,是否需要超時,超時時間
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 到期時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當前線程
Thread w = Thread.currentThread();
// 自旋次數
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 當前線程中斷了,嘗試清除s
if (w.isInterrupted())
s.tryCancel();
// 檢查s是否匹配到了元素m(有可能是其它線程的m匹配到當前線程的s)
SNode m = s.match;
// 如果匹配到了,直接返回m
if (m != null)
return m;
// 如果需要超時
if (timed) {
// 檢查超時時間如果小於0了,嘗試清除s
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0)
// 如果還有自旋次數,自旋次數減一,併進入下一次自旋
spins = shouldSpin(s) ? (spins-1) : 0;
// 後面的elseif都是自旋次數沒有了
else if (s.waiter == null)
// 如果s的waiter為null,把當前線程註入進去,併進入下一次自旋
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
// 如果不允許超時,直接阻塞,並等待被其它線程喚醒,喚醒後繼續自旋並查看是否匹配到了元素
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
// 如果允許超時且還有剩餘時間,就阻塞相應時間
LockSupport.parkNanos(this, nanos);
}
}
// SNode裡面的方向,調用者m是s的下一個節點
// 這時候m節點的線程應該是阻塞狀態的
boolean tryMatch(SNode s) {
// 如果m還沒有匹配者,就把s作為它的匹配者
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
// 喚醒m中的線程,兩者匹配完畢
LockSupport.unpark(w);
}
// 匹配到了返回true
return true;
}
// 可能其它線程先一步匹配了m,返回其是否是s
return match == s;
}
整個邏輯比較複雜,這裡為了簡單起見,屏蔽掉多線程處理的細節,只描述正常業務場景下的邏輯:
(1)如果棧中沒有元素,或者棧頂元素跟將要入棧的元素模式一樣,就入棧;
(2)入棧後自旋等待一會看有沒有其它線程匹配到它,自旋完了還沒匹配到元素就阻塞等待;
(3)阻塞等待被喚醒了說明其它線程匹配到了當前的元素,就返回匹配到的元素;
(4)如果兩者模式不一樣,且頭節點沒有在匹配中,就拿當前節點跟它匹配,匹配成功了就返回匹配到的元素;
(5)如果兩者模式不一樣,且頭節點正在匹配中,當前線程就協助去匹配,匹配完成了再讓當前節點重新入棧重新匹配;
如果直接閱讀這部分代碼還是比較困難的,建議寫個測試用例,打個斷點一步一步跟蹤調試。
下麵是我的測試用例,可以參考下,在IDEA中可以讓斷點只阻塞線程:
public class TestSynchronousQueue {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);
new Thread(()->{
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(500);
System.out.println(queue.take());
}
}
修改斷點只阻塞線程的方法,右擊斷點,選擇Thread:
交給你了
上面的源碼分析都是基於Stack的方式來分析的,那麼隊列是怎麼運行的呢?很簡單哦,測試用例中的false改成true就可以了,這就交給你了。
總結
(1)SynchronousQueue是java里的無緩衝隊列,用於在兩個線程之間直接移交元素;
(2)SynchronousQueue有兩種實現方式,一種是公平(隊列)方式,一種是非公平(棧)方式;
(3)棧方式中的節點有三種模式:生產者、消費者、正在匹配中;
(4)棧方式的大致思路是如果棧頂元素跟自己一樣的模式就入棧並等待被匹配,否則就匹配,匹配到了就返回;
(5)隊列方式的大致思路是……不告訴你^^(兩者的邏輯差別還是挺大的)
彩蛋
(1)SynchronousQueue真的是無緩衝的隊列嗎?
通過源碼分析,我們可以發現其實SynchronousQueue內部或者使用棧或者使用隊列來存儲包含線程和元素值的節點,如果同一個模式的節點過多的話,它們都會存儲進來,且都會阻塞著,所以,嚴格上來說,SynchronousQueue並不能算是一個無緩衝隊列。
(2)SynchronousQueue有什麼缺點呢?
試想一下,如果有多個生產者,但只有一個消費者,如果消費者處理不過來,是不是生產者都會阻塞起來?反之亦然。
這是一件很危險的事,所以,SynchronousQueue一般用於生產、消費的速度大致相當的情況,這樣才不會導致系統中過多的線程處於阻塞狀態。
歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。