阻塞隊列是一種常用的併發編程工具,它能夠在多線程環境下提供一種安全而高效的數據傳輸機制。本文將介紹阻塞隊列的原理和使用場景,並通過實例演示其在多線程編程中的應用。 # 一、什麼是阻塞隊列 阻塞隊列是一種特殊的隊列,它具有以下幾個特點: 1. 阻塞特性:當隊列為空時,從隊列中獲取元素的操作將會被阻塞, ...
阻塞隊列是一種常用的併發編程工具,它能夠在多線程環境下提供一種安全而高效的數據傳輸機制。本文將介紹阻塞隊列的原理和使用場景,並通過實例演示其在多線程編程中的應用。
一、什麼是阻塞隊列
阻塞隊列是一種特殊的隊列,它具有以下幾個特點:
- 阻塞特性:當隊列為空時,從隊列中獲取元素的操作將會被阻塞,直到隊列中有新的元素被添加;當隊列已滿時,向隊列中添加元素的操作將會被阻塞,直到隊列中有空的位置,這就是等待喚醒機制。
- 線程安全:阻塞隊列內部通過鎖或其他同步機制來保證多線程環境下的數據一致性。
- 有界性:阻塞隊列可以設置容量上限,當隊列滿時,後續的元素將無法添加。
- 公平性:阻塞隊列可以選擇公平或非公平的策略來決定線程的獲取順序。公平隊列會按照線程的請求順序進行處理(線程按先來後到順序排隊獲取元素),而非公平隊列則允許新的線程插隊執行(線程競爭)。比如:SynchronousQueue。
阻塞隊列常用於解決生產者-消費者問題,它能夠有效地銜接生產者和消費者之間的速度差異,提供一種協調和安全的數據交互方式。
阻塞隊列底層一般採用數組和鏈表這兩種數據結構存儲元素,ArrayBlockingQueue和PriorityBlockingQueue底層都是採用數組存儲的,但是ArrayBlockingQueue是必須指定數組大小,不能擴容,而PriorityBlockingQueue可以進行動態擴容(擴容的最大長度也是Integer.MAX_VALUE),LinkedBlockingQueue底層是鏈表結構存儲,雖然是鏈表,但是也有長度限制,預設是Integer.MAX_VALUE,一般認為的無界阻塞隊列,其實最大的隊列長度也就是Integer.MAX_VALUE。
二、阻塞隊列的核心方法
- 添加
方法 | 描述 | 是否阻塞 |
---|---|---|
add方法 | 往隊列尾部添加元素,內部是調用offer方法 | 否 |
put方法 | 往隊列尾部添加元素,如果隊列已滿,則阻塞等待 | 是 |
offer方法 | 往隊列尾部添加元素,如果隊列已滿,則返回false,不會阻塞 | 否 |
- 獲取
方法 | 描述 | 是否阻塞 |
---|---|---|
take方法 | take方法:移除並返回隊列頭部的元素,如果隊列為空,則阻塞等待 | 是 |
poll方法 | 移除並返回隊列頭部的元素,如果隊列為空,則返回null,不會阻塞 | 否 |
peek方法 | 返回隊列頭部的元素(不移除),如果隊列為空,則返回null,不會阻塞 | 否 |
三、常見的阻塞隊列實現
通過圖中可以看到,BlockingQueue集成了Queue介面的功能,有多種子類實現,常用的如下:
- ArrayBlockingQueue:基於數組實現的有界阻塞隊列,它的容量在創建時指定,並且不能動態擴展。
- LinkedBlockingQueue:基於鏈表實現的有界阻塞隊列,鏈表的長度可以通過構造函數顯式指定,如果使用預設的構造函數,則預設大小是Integer.MAX_VALUE。
- PriorityBlockingQueue:基於優先順序堆排序實現的阻塞隊列(可擴容),元素按照優先順序順序進行排序。
- SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個相應的刪除操作,反之亦然。
四、阻塞隊列的原理
常用的阻塞隊列,比如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue底層都是採用ReentrantLock鎖來實現線程的互斥,而ReentrantLock底層採用了AQS框架實現線程隊列的同步,線程的阻塞是調用LockSupport.park實現,喚醒是調用LockSupport.unpark實現,具體可以看我之前的文章,SynchronousQueue底層雖然沒有用AQS框架,但也用的是LockSupport實現線程的阻塞與喚醒。
一文讀懂LockSupport
AQS源碼分析
阻塞隊列的原理可以通過兩個關鍵組件來解釋:鎖和條件變數。
- 鎖
阻塞隊列使用鎖來保護共用資源,控制線程的互斥訪問。在隊列為空或已滿時,線程需要等待相應的條件滿足才能繼續執行。
- 條件變數
條件變數是鎖的一個補充,在某些特定的條件下,線程會進入等待狀態。當條件滿足時,其他線程會通過調用條件變數的喚醒方法(比如signal()或signalAll())來通知等待的線程進行下一步操作。
當一個線程試圖從空的阻塞隊列中獲取元素時,它會獲取隊列的鎖,並檢查隊列是否為空。如果為空,這個線程將進入等待狀態,直到其他線程向隊列中插入元素並通過條件變數喚醒它。當一個線程試圖向已滿的阻塞隊列插入元素時,它會獲取隊列的鎖,並檢查隊列是否已滿。如果已滿,這個線程將進入等待狀態,直到其他線程從隊列中獲取元素並通過條件變數喚醒它。
接下來我們看下阻塞隊列的獲取元素和插入元素的核心代碼:
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的帶阻塞的插入和獲取方法都是基於ReentrantLock鎖+條件變數的等待和通知來實現的。
主要看看ArrayBlockingQueue帶阻塞的插入和獲取元素的主要方法吧。
/**
* 插入元素,帶阻塞
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 這裡使用的是ReentrantLock鎖
final ReentrantLock lock = this.lock;
// 獲取鎖並支持響應中斷,註意:獲取鎖的過程中不響應中斷,是在獲取到鎖後根據當前線程的中斷標識來處理。
lock.lockInterruptibly();
try {
// 元素大小等於數組長度時阻塞,說明放滿了,生產者需要暫停,阻塞在條件變數上,等待被喚醒
while (count == items.length)
notFull.await();
// 放入元素到數組指定的下標處
enqueue(e);
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 插入元素,喚醒等待獲取元素的線程
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 放入元素後,通知消費線程繼續獲取元素
notEmpty.signal();
}
/**
* 獲取元素,帶阻塞
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 數組無元素時阻塞,阻塞在條件變數上,等待被喚醒
// 元素大小等於0時阻塞,說明數組被取空了,消費者需要暫停,阻塞在條件變數上,等待被喚醒
while (count == 0)
notEmpty.await();
// 移除元素並返回
return dequeue();
} finally {
lock.unlock();
}
}
/**
* 移除元素並返回
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 數組時迴圈使用的,取元素的index到達數組長度時,下次需要從第0個位置
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 移除元素後,通知消費者線程可以繼續放入元素
notFull.signal();
return x;
}
SynchronousQueue不存儲元素,插入和刪除是配套使用的,它的插入和刪除有公平和非公平之分,公平是通過內部類TransferQueue實現的,非公平是通過TransferStack實現的,具體可以看transfer方法,最終會調用LockSupport.park實現線程阻塞,LockSupport.unpark實現線程繼續執行,這個就不貼代碼了。
五、阻塞隊列的使用場景
- 生產者-消費者模型:阻塞隊列能夠很好地平衡生產者和消費者之間的速度差異,既能保護消費者不會消費到空數據,也能保護生產者不會造成隊列溢出,能夠有效地解耦生產者和消費者,提高系統的穩定性和吞吐量。
- 線程池:線上程池中,阻塞隊列可以作為任務緩衝區,將待執行的任務放入隊列中,由線程池中的工作線程按照一定的策略進行執行。
- 同步工具:阻塞隊列還可以作為一種同步工具,在多線程環境下實現線程之間的協作。
- 數據緩衝:阻塞隊列可以用作數據緩衝區,當生產者的速度大於消費者的速度時,數據可以先存儲在隊列中,等待消費者處理
- 事件驅動編程:阻塞隊列可以用於事件驅動的編程模型,當事件發生時,將事件對象放入隊列中,由消費者進行處理
六、阻塞隊列的使用
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// 創建一個容量為10的ArrayBlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
// BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10);
// 創建生產者線程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i <= 5; i++) {
// 將數據放入隊列
queue.put(i);
System.out.println(Thread.currentThread().getName() + "Produced: " + i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 創建消費者線程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i <= 5; i++) {
// 從隊列中取出數據
int num = queue.take();
System.out.println(Thread.currentThread().getName() + "Consumed: " + num);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 啟動生產者和消費者線程
producerThread.start();
consumerThread.start();
// 等待線程執行完畢
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執行輸出:
Thread-0Produced: 0
Thread-1Consumed: 0
Thread-0Produced: 1
Thread-1Consumed: 1
Thread-0Produced: 2
Thread-0Produced: 3
Thread-1Consumed: 2
Thread-0Produced: 4
Thread-0Produced: 5
Thread-1Consumed: 3
Thread-1Consumed: 4
Thread-1Consumed: 5
阻塞隊列的使用比較簡單,這裡是個簡單的使用例子,可設置合適的隊列大小和生產者消費者休眠時間來調試阻塞等待和喚醒通知。使用阻塞隊列可解決多線程併發訪問數據安全問題,也能方便的實現線程間的協調工作。
總結
通過瞭解阻塞隊列的原理和使用場景,我們可以更好地應對多線程編程中的併發問題,提高代碼的可維護性和可擴展性。阻塞隊列作為一種常見的併發編程工具,能夠幫助我們實現高效的數據傳輸和線程協作,為我們的應用程式提供更好的性能和可靠性保障。希望本文能夠為讀者對阻塞隊列的理解和應用提供一些幫助。