ArrayBlockingQueue的實現方式? ArrayBlockingQueue是否需要擴容? ArrayBlockingQueue有什麼缺點? ...
問題
(1)ArrayBlockingQueue的實現方式?
(2)ArrayBlockingQueue是否需要擴容?
(3)ArrayBlockingQueue有什麼缺點?
簡介
ArrayBlockingQueue是java併發包下一個以數組實現的阻塞隊列,它是線程安全的,至於是否需要擴容,請看下麵的分析。
隊列
隊列,是一種線性表,它的特點是先進先出,又叫FIFO,就像我們平常排隊一樣,先到先得,即先進入隊列的人先出隊。
源碼分析
主要屬性
// 使用數組存儲元素
final Object[] items;
// 取元素的指針
int takeIndex;
// 放元素的指針
int putIndex;
// 元素數量
int count;
// 保證併發訪問的鎖
final ReentrantLock lock;
// 非空條件
private final Condition notEmpty;
// 非滿條件
private final Condition notFull;
通過屬性我們可以得出以下幾個重要信息:
(1)利用數組存儲元素;
(2)通過放指針和取指針來標記下一次操作的位置;
(3)利用重入鎖來保證併發安全;
主要構造方法
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化數組
this.items = new Object[capacity];
// 創建重入鎖及兩個條件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
通過構造方法我們可以得出以下兩個結論:
(1)ArrayBlockingQueue初始化時必須傳入容量,也就是數組的大小;
(2)可以通過構造方法控制重入鎖的類型是公平鎖還是非公平鎖;
入隊
入隊有四個方法,它們分別是add(E e)、offer(E e)、put(E e)、offer(E e, long timeout, TimeUnit unit),它們有什麼區別呢?
public boolean add(E e) {
// 調用父類的add(e)方法
return super.add(e);
}
// super.add(e)
public boolean add(E e) {
// 調用offer(e)如果成功返回true,如果失敗拋出異常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
// 元素不可為空
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
if (count == items.length)
// 如果數組滿了就返回false
return false;
else {
// 如果數組沒滿就調用入隊方法並返回true
enqueue(e);
return true;
}
} finally {
// 解鎖
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加鎖,如果線程中斷了拋出異常
lock.lockInterruptibly();
try {
// 如果數組滿了,使用notFull等待
// notFull等待的意思是說現在隊列滿了
// 只有取走一個元素後,隊列才不滿
// 然後喚醒notFull,然後繼續現在的邏輯
// 這裡之所以使用while而不是if
// 是因為有可能多個線程阻塞在lock上
// 即使喚醒了可能其它線程先一步修改了隊列又變成滿的了
// 這時候需要再次等待
while (count == items.length)
notFull.await();
// 入隊
enqueue(e);
} finally {
// 解鎖
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
try {
// 如果數組滿了,就阻塞nanos納秒
// 如果喚醒這個線程時依然沒有空間且時間到了就返回false
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 入隊
enqueue(e);
return true;
} finally {
// 解鎖
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
// 把元素直接放在放指針的位置上
items[putIndex] = x;
// 如果放指針到數組盡頭了,就返回頭部
if (++putIndex == items.length)
putIndex = 0;
// 數量加1
count++;
// 喚醒notEmpty,因為入隊了一個元素,所以肯定不為空了
notEmpty.signal();
}
(1)add(e)時如果隊列滿了則拋出異常;
(2)offer(e)時如果隊列滿了則返回false;
(3)put(e)時如果隊列滿了則使用notFull等待;
(4)offer(e, timeout, unit)時如果隊列滿了則等待一段時間後如果隊列依然滿就返回false;
(5)利用放指針迴圈使用數組來存儲元素;
出隊
出隊有四個方法,它們分別是remove()、poll()、take()、poll(long timeout, TimeUnit unit),它們有什麼區別呢?
public E remove() {
// 調用poll()方法出隊
E x = poll();
if (x != null)
// 如果有元素出隊就返回這個元素
return x;
else
// 如果沒有元素出隊就拋出異常
throw new NoSuchElementException();
}
public E poll() {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 如果隊列沒有元素則返回null,否則出隊
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
try {
// 如果隊列無元素,則阻塞等待在條件notEmpty上
while (count == 0)
notEmpty.await();
// 有元素了再出隊
return dequeue();
} finally {
// 解鎖
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
try {
// 如果隊列無元素,則阻塞等待nanos納秒
// 如果下一次這個線程獲得了鎖但隊列依然無元素且已超時就返回null
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 取取指針位置的元素
E x = (E) items[takeIndex];
// 把取指針位置設為null
items[takeIndex] = null;
// 取指針前移,如果數組到頭了就返回數組前端迴圈利用
if (++takeIndex == items.length)
takeIndex = 0;
// 元素數量減1
count--;
if (itrs != null)
itrs.elementDequeued();
// 喚醒notFull條件
notFull.signal();
return x;
}
(1)remove()時如果隊列為空則拋出異常;
(2)poll()時如果隊列為空則返回null;
(3)take()時如果隊列為空則阻塞等待在條件notEmpty上;
(4)poll(timeout, unit)時如果隊列為空則阻塞等待一段時間後如果還為空就返回null;
(5)利用取指針迴圈從數組中取元素;
總結
(1)ArrayBlockingQueue不需要擴容,因為是初始化時指定容量,並迴圈利用數組;
(2)ArrayBlockingQueue利用takeIndex和putIndex迴圈利用數組;
(3)入隊和出隊各定義了四組方法為滿足不同的用途;
(4)利用重入鎖和兩個條件保證併發安全;
彩蛋
(1)論BlockingQueue中的那些方法?
BlockingQueue是所有阻塞隊列的頂級介面,它裡面定義了一批方法,它們有什麼區別呢?
操作 | 拋出異常 | 返回特定值 | 阻塞 | 超時 |
---|---|---|---|---|
入隊 | add(e) | offer(e)——false | put(e) | offer(e, timeout, unit) |
出隊 | remove() | poll()——null | take() | poll(timeout, unit) |
檢查 | element() | peek()——null | - | - |
(2)ArrayBlockingQueue有哪些缺點呢?
a)隊列長度固定且必須在初始化時指定,所以使用之前一定要慎重考慮好容量;
b)如果消費速度跟不上入隊速度,則會導致提供者線程一直阻塞,且越阻塞越多,非常危險;
c)只使用了一個鎖來控制入隊出隊,效率較低,那是不是可以藉助分段的思想把入隊出隊分裂成兩個鎖呢?且聽下回分解。
歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。