LinkedBlockingQueue的實現方式? LinkedBlockingQueue是有界的還是無界的隊列? LinkedBlockingQueue相比ArrayBlockingQueue有什麼改進? ...
問題
(1)LinkedBlockingQueue的實現方式?
(2)LinkedBlockingQueue是有界的還是無界的隊列?
(3)LinkedBlockingQueue相比ArrayBlockingQueue有什麼改進?
簡介
LinkedBlockingQueue是java併發包下一個以單鏈表實現的阻塞隊列,它是線程安全的,至於它是不是有界的,請看下麵的分析。
源碼分析
主要屬性
// 容量
private final int capacity;
// 元素數量
private final AtomicInteger count = new AtomicInteger();
// 鏈表頭
transient Node<E> head;
// 鏈表尾
private transient Node<E> last;
// take鎖
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty條件
// 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
private final Condition notEmpty = takeLock.newCondition();
// 放鎖
private final ReentrantLock putLock = new ReentrantLock();
// notFull條件
// 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
private final Condition notFull = putLock.newCondition();
(1)capacity,有容量,可以理解為LinkedBlockingQueue是有界隊列
(2)head, last,鏈表頭、鏈表尾指針
(3)takeLock,notEmpty,take鎖及其對應的條件
(4)putLock, notFull,put鎖及其對應的條件
(5)入隊、出隊使用兩個不同的鎖控制,鎖分離,提高效率
內部類
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
典型的單鏈表結構。
主要構造方法
public LinkedBlockingQueue() {
// 如果沒傳容量,就使用最大int值初始化其容量
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化head和last指針為空值節點
last = head = new Node<E>(null);
}
入隊
入隊同樣有四個方法,我們這裡只分析最重要的一個,put(E e)方法:
public void put(E e) throws InterruptedException {
// 不允許null元素
if (e == null) throw new NullPointerException();
int c = -1;
// 新建一個節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用put鎖加鎖
putLock.lockInterruptibly();
try {
// 如果隊列滿了,就阻塞在notFull條件上
// 等待被其它線程喚醒
while (count.get() == capacity) {
notFull.await();
}
// 隊列不滿了,就入隊
enqueue(node);
// 隊列長度加1
c = count.getAndIncrement();
// 如果現隊列長度如果小於容量
// 就再喚醒一個阻塞在notFull條件上的線程
// 這裡為啥要喚醒一下呢?
// 因為可能有很多線程阻塞在notFull這個條件上的
// 而取元素時只有取之前隊列是滿的才會喚醒notFull
// 為什麼隊列滿的才喚醒notFull呢?
// 因為喚醒是需要加putLock的,這是為了減少鎖的次數
// 所以,這裡索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程
// 說白了,這也是鎖分離帶來的代價
if (c + 1 < capacity)
notFull.signal();
} finally {
// 釋放鎖
putLock.unlock();
}
// 如果原隊列長度為0,現在加了一個元素後立即喚醒notEmpty條件
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// 直接加到last後面
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 加take鎖
takeLock.lock();
try {
// 喚醒notEmpty條件
notEmpty.signal();
} finally {
// 解鎖
takeLock.unlock();
}
}
(1)使用putLock加鎖;
(2)如果隊列滿了就阻塞在notFull條件上;
(3)否則就入隊;
(4)如果入隊後元素數量小於容量,喚醒其它阻塞在notFull條件上的線程;
(5)釋放鎖;
(6)如果放元素之前隊列長度為0,就喚醒notEmpty條件;
出隊
出隊同樣也有四個方法,我們這裡只分析最重要的那一個,take()方法:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 使用takeLock加鎖
takeLock.lockInterruptibly();
try {
// 如果隊列無元素,則阻塞在notEmpty條件上
while (count.get() == 0) {
notEmpty.await();
}
// 否則,出隊
x = dequeue();
// 獲取出隊前隊列的長度
c = count.getAndDecrement();
// 如果取之前隊列長度大於1,則喚醒notEmpty
if (c > 1)
notEmpty.signal();
} finally {
// 釋放鎖
takeLock.unlock();
}
// 如果取之前隊列長度等於容量
// 則喚醒notFull
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// head節點本身是不存儲任何元素的
// 這裡把head刪除,並把head下一個節點作為新的值
// 並把其值置空,返回原來的值
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 喚醒notFull
notFull.signal();
} finally {
putLock.unlock();
}
}
(1)使用takeLock加鎖;
(2)如果隊列空了就阻塞在notEmpty條件上;
(3)否則就出隊;
(4)如果出隊前元素數量大於1,喚醒其它阻塞在notEmpty條件上的線程;
(5)釋放鎖;
(6)如果取元素之前隊列長度等於容量,就喚醒notFull條件;
總結
(1)LinkedBlockingQueue採用單鏈表的形式實現;
(2)LinkedBlockingQueue採用兩把鎖的鎖分離技術實現入隊出隊互不阻塞;
(3)LinkedBlockingQueue是有界隊列,不傳入容量時預設為最大int值;
彩蛋
(1)LinkedBlockingQueue與ArrayBlockingQueue對比?
a)後者入隊出隊採用一把鎖,導致入隊出隊相互阻塞,效率低下;
b)前才入隊出隊採用兩把鎖,入隊出隊互不幹擾,效率較高;
c)二者都是有界隊列,如果長度相等且出隊速度跟不上入隊速度,都會導致大量線程阻塞;
d)前者如果初始化不傳入初始容量,則使用最大int值,如果出隊速度跟不上入隊速度,會導致隊列特別長,占用大量記憶體;
歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。