ConcurrentLinkedQueue是阻塞隊列嗎? ConcurrentLinkedQueue如何保證併發安全? ConcurrentLinkedQueue能用於線程池嗎? ...
問題
(1)ConcurrentLinkedQueue是阻塞隊列嗎?
(2)ConcurrentLinkedQueue如何保證併發安全?
(3)ConcurrentLinkedQueue能用於線程池嗎?
簡介
ConcurrentLinkedQueue只實現了Queue介面,並沒有實現BlockingQueue介面,所以它不是阻塞隊列,也不能用於線程池中,但是它是線程安全的,可用於多線程環境中。
那麼,它的線程安全又是如何實現的呢?讓我們一起來瞧一瞧。
源碼分析
主要屬性
// 鏈表頭節點
private transient volatile Node<E> head;
// 鏈表尾節點
private transient volatile Node<E> tail;
就這兩個主要屬性,一個頭節點,一個尾節點。
主要內部類
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
典型的單鏈表結構,非常純粹。
主要構造方法
public ConcurrentLinkedQueue() {
// 初始化頭尾節點
head = tail = new Node<E>(null);
}
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
// 遍歷c,並把它元素全部添加到單鏈表中
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
這兩個構造方法也很簡單,可以看到這是一個無界的單鏈表實現的隊列。
入隊
因為它不是阻塞隊列,所以只有兩個入隊的方法,add(e)和offer(e)。
因為是無界隊列,所以add(e)方法也不用拋出異常了。
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// 不能添加空元素
checkNotNull(e);
// 新節點
final Node<E> newNode = new Node<E>(e);
// 入隊到鏈表尾
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 如果沒有next,說明到鏈表尾部了,就入隊
if (q == null) {
// CAS更新p的next為新節點
// 如果成功了,就返回true
// 如果不成功就重新取next重新嘗試
if (p.casNext(null, newNode)) {
// 如果p不等於t,說明有其它線程先一步更新tail
// 也就不會走到q==null這個分支了
// p取到的可能是t後面的值
// 把tail原子更新為新節點
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
// 返回入隊成功
return true;
}
}
else if (p == q)
// 如果p的next等於p,說明p已經被刪除了(已經出隊了)
// 重新設置p的值
p = (t != (t = tail)) ? t : head;
else
// t後面還有值,重新設置p的值
p = (p != t && t != (t = tail)) ? t : q;
}
}
入隊整個流程還是比較清晰的,這裡有個前提是出隊時會把出隊的那個節點的next設置為節點本身。
(1)定位到鏈表尾部,嘗試把新節點放到後面;
(2)如果尾部變化了,則重新獲取尾部,再重試;
出隊
因為它不是阻塞隊列,所以只有兩個出隊的方法,remove()和poll()。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
restartFromHead:
for (;;) {
// 嘗試彈出鏈表的頭節點
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果節點的值不為空,並且將其更新為null成功了
if (item != null && p.casItem(item, null)) {
// 如果頭節點變了,則不會走到這個分支
// 會先走下麵的分支拿到新的頭節點
// 這時候p就不等於h了,就更新頭節點
// 在updateHead()中會把head更新為新節點
// 並讓head的next指向其自己
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
// 上面的casItem()成功,就可以返回出隊的元素了
return item;
}
// 下麵三個分支說明頭節點變了
// 且p的item肯定為null
else if ((q = p.next) == null) {
// 如果p的next為空,說明隊列中沒有元素了
// 更新h為p,也就是空元素的節點
updateHead(h, p);
// 返回null
return null;
}
else if (p == q)
// 如果p等於p的next,說明p已經出隊了,重試
continue restartFromHead;
else
// 將p設置為p的next
p = q;
}
}
}
// 更新頭節點的方法
final void updateHead(Node<E> h, Node<E> p) {
// 原子更新h為p成功後,延遲更新h的next為它自己
// 這裡用延遲更新是安全的,因為head節點已經變了
// 只要入隊出隊的時候檢查head有沒有變化就行了,跟它的next關係不大
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
出隊的整個邏輯也是比較清晰的:
(1)定位到頭節點,嘗試更新其值為null;
(2)如果成功了,就成功出隊;
(3)如果失敗或者頭節點變化了,就重新尋找頭節點,並重試;
(4)整個出隊過程沒有一點阻塞相關的代碼,所以出隊的時候不會阻塞線程,沒找到元素就返回null;
總結
(1)ConcurrentLinkedQueue不是阻塞隊列;
(2)ConcurrentLinkedQueue不能用線上程池中;
(3)ConcurrentLinkedQueue使用(CAS+自旋)更新頭尾節點控制出隊入隊操作;
彩蛋
ConcurrentLinkedQueue與LinkedBlockingQueue對比?
(1)兩者都是線程安全的隊列;
(2)兩者都可以實現取元素時隊列為空直接返回null,後者的poll()方法可以實現此功能;
(3)前者全程無鎖,後者全部都是使用重入鎖控制的;
(4)前者效率較高,後者效率較低;
(5)前者無法實現如果隊列為空等待元素到來的操作;
(6)前者是非阻塞隊列,後者是阻塞隊列;
(7)前者無法用線上程池中,後者可以;
歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。