上篇文章12分鐘從Executor自頂向下徹底搞懂線程池中我們聊到線程池,而線程池中包含阻塞隊列 這篇文章我們主要聊聊併發包下的阻塞隊列 阻塞隊列 什麼是隊列? 隊列的實現可以是數組、也可以是鏈表,可以實現先進先出的順序隊列,也可以實現先進後出的棧隊列 那什麼是阻塞隊列? 在經典的生產者/消費者模型 ...
上篇文章12分鐘從Executor自頂向下徹底搞懂線程池中我們聊到線程池,而線程池中包含阻塞隊列
這篇文章我們主要聊聊併發包下的阻塞隊列
阻塞隊列
什麼是隊列?
隊列的實現可以是數組、也可以是鏈表,可以實現先進先出的順序隊列,也可以實現先進後出的棧隊列
那什麼是阻塞隊列?
在經典的生產者/消費者模型中,生產者們將生產的元素放入隊列,而消費者們從隊列獲取元素消費
當隊列已滿,我們會手動阻塞生產者,直到消費者消費再來手動喚醒生產者
當隊列為空,我們會手動阻塞消費者,直到生產者生產再來手動喚醒消費者
在這個過程中由於使用的是普通隊列,阻塞與喚醒我們需要手動操作,保證同步機制
阻塞隊列在隊列的基礎上提供等待/通知功能,用於線程間的通信,避免線程競爭死鎖
生產者可以看成往線程池添加任務的用戶線程,而消費者則是線程池中的工作線程
當阻塞隊列為空時阻塞工作線程獲取任務,當阻塞隊列已滿時阻塞用戶線程向隊列中添加任務(創建非核心線程、拒絕策略)
API
阻塞隊列提供一下四種添加、刪除元素的API,我們常用阻塞等待/超時阻塞等待的API
方法名 | 拋出異常 | 返回true/false | 阻塞等待 | 超時阻塞等待 |
---|---|---|---|---|
添加 | add(Object) | offer(Object) | put(Object) | offer(Object,long,TimeUnit) |
刪除 | remove() | poll() | take() | poll(long,TimeUnit) |
- 拋出異常:隊滿add 拋出異常
IllegalStateExceptio
;隊空remove 拋出異常NoSuchElementException
- 返回值: 隊滿offer返回false,隊空poll返回null
- 阻塞等待: 隊滿時put會阻塞線程 或 隊空時take會阻塞線程
- 超時阻塞等待: 在阻塞等待、返回true/false的基礎上增加超時等待(等待一定時間就退出等待)
阻塞隊列的公平與不公平
什麼是阻塞隊列的公平與不公平?
當阻塞隊列已滿時,如果是公平的,那麼阻塞的線程根據先後順序從阻塞隊列中獲取元素,不公平則反之
實際上阻塞隊列的公平與不公平,要看實現阻塞隊列的鎖是否公平
阻塞隊列一般預設使用不公平鎖
ArrayBlockingQueue
從名稱看就可以知道它是數組實現的,我們先來看看它有哪些重要欄位
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//存儲元素的數組
final Object[] items;
//記錄元素出隊的下標
int takeIndex;
//記錄元素入隊的下標
int putIndex;
//隊列中元素數量
int count;
//使用的鎖
final ReentrantLock lock;
//出隊的等待隊列,作用於消費者
private final Condition notEmpty;
//入隊的等待隊列,作用於生產者
private final Condition notFull;
}
看完關鍵欄位,我們可以知道:ArrayBlockingQueue
由數組實現、使用併發包下的可重入鎖、同時用兩個等待隊列作用生產者和消費者
為什麼出隊、入隊要使用兩個下標記錄?
實際上它是一個環形數組,在初始化後就不改變大小,後續查看源碼自然能明白它是環形數組
在構造器中、初始化數組容量,同時使用非公平鎖
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//鎖是否為公平鎖
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue的公平性是由ReentrantLock來實現的
我們來看看入隊方法,入隊方法都大同小異,我們本文都查看支持超時、響應中斷的方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//檢查空指針
checkNotNull(e);
//獲取超時納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//如果隊列已滿
while (count == items.length) {
//超時則返回入隊失敗,否則生產者等待對應時間
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//入隊
enqueue(e);
return true;
} finally {
//解鎖
lock.unlock();
}
}
直接使用可重入鎖保證同步,如果隊列已滿,在此期間判斷是否超時,超時就返回,未超時等待;未滿則執行入隊方法
private void enqueue(E x) {
//隊列數組
final Object[] items = this.items;
//往入隊下標添加值
items[putIndex] = x;
//自增入隊下標 如果已滿則定位到0 成環
if (++putIndex == items.length)
putIndex = 0;
//統計數量增加
count++;
//喚醒消費者
notEmpty.signal();
}
在入隊中,主要是添加元素、修改下次添加的下標、統計隊列中的元素和喚醒消費者,到這以及可以說明它的實現是環形數組
ArrayBlockingQueue
由環形數組實現的阻塞隊列,固定容量不支持動態擴容,使用非公平的ReertrantLock
保證入隊、出隊操作的原子性,使用兩個等待隊列存儲等待的生產者、消費者,適用於在併發量不大的場景
LinkedBlockingQueue
LinkedBlockingQueue
從名稱上來看,就是使用鏈表實現的,我們來看看它的關鍵欄位
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//節點
static class Node<E> {
//存儲元素
E item;
//下一個節點
Node<E> next;
//...
}
//容量上限
private final int capacity;
//隊列元素數量
private final AtomicInteger count = new AtomicInteger();
//頭節點
transient Node<E> head;
//尾節點
private transient Node<E> last;
//出隊的鎖
private final ReentrantLock takeLock = new ReentrantLock();
//出隊的等待隊列
private final Condition notEmpty = takeLock.newCondition();
//入隊的鎖
private final ReentrantLock putLock = new ReentrantLock();
//入隊的等待隊列
private final Condition notFull = putLock.newCondition();
}
從欄位中,我們可以知道它使用單向鏈表的節點、且用首尾節點記錄隊列的頭尾,並且它使用兩把鎖、兩個等待隊列作用於隊頭、尾,與ArrayBlockingQueue
相比能夠增加併發性能
有個奇怪的地方:都使用鎖了,為什麼記錄元素數量count卻使用原子類呢?
這是由於兩把鎖,作用於入隊與出隊的操作,入隊與出隊也可能併發執行,同時修改count,因此要使用原子類保證修改數量的原子性
在初始化時需要設置容量大小,否則會設置成無界的阻塞隊列(容量是int的最大值)
當消費速度小於生產速度時,阻塞隊列中會堆積任務,進而導致容易發生OOM
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
來看看入隊操作
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//加鎖
putLock.lockInterruptibly();
try {
//隊列已滿,超時返回,不超時等待
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//入隊
enqueue(new Node<E>(e));
// 先獲取再自增 c中存儲的是舊值
c = count.getAndIncrement();
//如果數量沒滿 喚醒生產者
if (c + 1 < capacity)
notFull.signal();
} finally {
//解鎖
putLock.unlock();
}
//如果舊值為0 說明該入隊操作前是空隊列,喚醒消費者來消費
if (c == 0)
signalNotEmpty();
return true;
}
入隊操作類似,只不過在此期間如果數量沒滿喚醒生產者生產,隊列為空喚醒消費者來消費,從而增加併發性能
入隊只是改變指向關係
//添加節點到末尾
private void enqueue(Node<E> node) {
last = last.next = node;
}
喚醒消費者前要先獲取鎖
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
出隊操作也類似
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 隊列為空 超時返回空,否則等待
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//出隊
x = dequeue();
c = count.getAndDecrement();
//隊列中除了當前線程獲取的任務外還有任務就去喚醒消費者消費
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//原來隊列已滿就去喚醒生產者 生產
if (c == capacity)
signalNotFull();
return x;
}
LinkedBlockingQueue
與ArrayBlockingQueue
的出隊、入隊實現類似
只不過LinkedBlockingQueue
入隊、出隊獲取/釋放的鎖不同,並且在此過程中不同情況回去喚醒其他的生產者、消費者從而進一步提升併發性能
LinkedBlockingQueue 由單向鏈表實現的阻塞隊列,記錄首尾節點;預設是無界、非公平的阻塞隊列(初始化時要設置容量否則可能OOM),使用兩把鎖、兩個等待隊列,分別操作入隊、出隊的生產者、消費者,在入隊、出隊操作期間不同情況還會去喚醒生產者、消費者,從而進一步提升併發性能,適用於併發量大的場景
LinkedBlockingDeque
LinkedBlockingDeque
實現與LinkedBlockQueue
類似,在LinkedBlockQueue
的基礎上支持從隊頭、隊尾進行添加、刪除的操作
它是一個雙向鏈表,帶有一系列First、Last的方法,比如:offerLast
、pollFirst
由於LinkedBlockingDeque
雙向,常用其來實現工作竊取演算法,從而減少線程的競爭
什麼是工作竊取演算法?
比如多線程處理多個阻塞隊列的任務(一一對應),每個線程從隊頭獲取任務處理,當A線程處理完它負責的阻塞隊列所有任務時,它再從隊尾竊取其他阻塞隊列的任務,這樣就不會發生競爭,除非隊列中只剩一個任務,才會發生競爭
ForkJoin
框架就使用其來充當阻塞隊列,我們後文再聊這個框架
PriorityBlockingQueue
PriorityBlockingQueue是優先順序排序的無界阻塞隊列,阻塞隊列按照優先順序進行排序
使用堆排序,具體排序演算法由Comparable
或Comparator
實現比較規則
- 預設:泛型中的對象需要實現
Comparable
比較規則 ,根據compareTo方法規則排序 - 構造器中指定比較器
Comparator
根據比較器規則排序
@Test
public void testPriorityBlockingQeque() {
//預設使用Integer實現Comparable的升序
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(6);
queue.offer(99);
queue.offer(1099);
queue.offer(299);
queue.offer(992);
queue.offer(99288);
queue.offer(995);
//99 299 992 995 1099 99288
while (!queue.isEmpty()){
System.out.print(" "+queue.poll());
}
System.out.println();
//指定Comparator 降序
queue = new PriorityBlockingQueue<>(6, (o1, o2) -> o2-o1);
queue.offer(99);
queue.offer(1099);
queue.offer(299);
queue.offer(992);
queue.offer(99288);
queue.offer(995);
//99288 1099 995 992 299 99
while (!queue.isEmpty()){
System.out.print(" "+queue.poll());
}
}
適用於需要根據優先順序排序處理的場景
DelayQueue
Delay是一個延時獲取元素的無界阻塞隊列, 延時最長排在隊尾
Delay隊列元素實現Delayed介面通過getDelay
獲取延時時間
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
}
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
DelayQueue應用場景
- 緩存系統的設計:DelayQueue存放緩存有效期,當可以獲取到元素時,說明緩存過期
- 定時任務調度: 將定時任務的時間設置為延時時間,一旦可以獲取到任務就開始執行
以定時線程池ScheduledThreadPoolExecutor
的定時任務ScheduledFutureTask
為例,它實現Delayed
獲取延遲執行的時間
-
創建對象時,初始化數據
ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); //time記錄當前對象延遲到什麼時候可以使用,單位是納秒 this.time = ns; this.period = period; //sequenceNumber記錄元素在隊列中先後順序 sequencer原子自增 //AtomicLong sequencer = new AtomicLong(); this.sequenceNumber = sequencer.getAndIncrement(); }
-
實現Delayed介面的getDelay方法
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
-
Delay介面繼承了Comparable介面,目的是要實現compareTo方法來繼續排序
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
SynchronousQueue
SynchronousQueue是一個預設下支持非公平不存儲元素的阻塞隊列
每個put操作要等待一個take操作,否則不能繼續添加元素會阻塞
使用公平鎖
@Test
public void testSynchronousQueue() throws InterruptedException {
final SynchronousQueue<Integer> queue = new SynchronousQueue(true);
new Thread(() -> {
try {
queue.put(1);
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "put12線程").start();
new Thread(() -> {
try {
queue.put(3);
queue.put(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "put34線程").start();
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
}
//結果 因為使用公平鎖 1在2前,3在4前
//main拿出1
//main拿出3
//main拿出2
//main拿出4
SynchronousQueue隊列本身不存儲元素,負責把生產者的數據傳遞給消費者,適合傳遞性的場景
在該場景下吞吐量會比ArrayBlockingQueue,LinkedBlockingQueue高
LinkedTransferQueue
LinkedTransferQueue是一個鏈表組成的無界阻塞隊列,擁有transfer()
和tryTransfer()
方法
transfer()
如果有消費者在等待接收元素,transfer(e)會把元素e傳輸給消費者
如果沒有消費者在等待接收元素,transfer(e)會將元素e存放在隊尾,直到有消費者獲取了才返回
@Test
public void testTransfer() throws InterruptedException {
LinkedTransferQueue queue = new LinkedTransferQueue();
new Thread(()->{
try {
//阻塞直到被獲取
queue.transfer(1);
//生產者放入的1被取走了
System.out.println(Thread.currentThread().getName()+"放入的1被取走了");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"生產者").start();
TimeUnit.SECONDS.sleep(3);
//main取出隊列中的元素
System.out.println(Thread.currentThread().getName()+"取出隊列中的元素");
queue.poll();
}
tryTransfer()
無論消費者是否消費都直接返回
@Test
public void testTryTransfer() throws InterruptedException {
LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
//false
System.out.println(queue.tryTransfer(1));
//null
System.out.println(queue.poll());
new Thread(()->{
try {
//消費者取出2
System.out.println(Thread.currentThread().getName()+"取出"+queue.poll(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
},"消費者").start();
TimeUnit.SECONDS.sleep(1);
//true
System.out.println(queue.tryTransfer(2));
}
tryTransfer(long,TimeUnit)
在超時時間內消費者消費元素返回true,反之返回false
總結
ArrayBlockingQueue由環形數組實現,固定容量無法擴容,使用非公平的可重入鎖鎖、兩個等待隊列操作入隊、出隊操作,適合併發小的場景
LinkedBlockingQueue由單向鏈表實現,預設無界,使用兩個可重入鎖、兩個等待隊列進行入隊、出隊操作,併在此期間可能喚醒生產者或消費者線程,以此提高併發性能
LinkedBlockingDeque由雙向鏈表實現,在LinkedBlockingQueue的基礎上,能夠在隊頭、隊尾都進行添加、刪除操作,適用工作竊取演算法1
PriorityBlockingQueue由堆排序實現的優先順序隊列,具體排序演算法由Comparable、Comparator來實現,適用於需要根據優先順序排序處理任務的場景
DelayQueue 是一個延時隊列,隊列中存儲的元素需要實現Delayed
介面來獲取延時時間,適用於緩存失效、定時任務的場景
SynchronousQueue不存儲元素,只將生產者生產的元素傳遞給消費者, 適用於傳遞性的場景,比如不同線程間傳遞數據
LinkedTransgerQueue是傳輸形的阻塞隊列,適用於單個元素傳遞的場景
在使用無界的阻塞隊列時,需要設置容量,避免存儲任務太多導致OOM
最後(不要白嫖,一鍵三連求求拉~)
本篇文章被收入專欄 由點到線,由線到面,深入淺出構建Java併發編程知識體系,感興趣的同學可以持續關註喔
本篇文章筆記以及案例被收入 gitee-StudyJava、 github-StudyJava 感興趣的同學可以stat下持續關註喔~
案例地址:
Gitee-JavaConcurrentProgramming/src/main/java/E_BlockQueue
Github-JavaConcurrentProgramming/src/main/java/E_BlockQueue
有什麼問題可以在評論區交流,如果覺得菜菜寫的不錯,可以點贊、關註、收藏支持一下~
關註菜菜,分享更多乾貨,公眾號:菜菜的後端私房菜
本文由博客一文多發平臺 OpenWrite 發佈!