什麼是阻塞隊列 【1】阻塞隊列:從定義上來說是隊列的一種,那麼肯定是一個先進先出(FIFO)的數據結構。與普通隊列不同的是,它支持兩個附加操作,即阻塞添加和阻塞刪除方法。 【2】阻塞添加:當阻塞隊列是滿時,往隊列里添加元素的操作將被阻塞。 【3】阻塞移除:當阻塞隊列是空時,從隊列中獲取元素/刪除元素 ...
什麼是阻塞隊列
【1】阻塞隊列:從定義上來說是隊列的一種,那麼肯定是一個先進先出(FIFO)的數據結構。與普通隊列不同的是,它支持兩個附加操作,即阻塞添加和阻塞刪除方法。
【2】阻塞添加:當阻塞隊列是滿時,往隊列里添加元素的操作將被阻塞。
【3】阻塞移除:當阻塞隊列是空時,從隊列中獲取元素/刪除元素的操作將被阻塞。
java中對阻塞隊列的定義
【1】BlockingQueue介面與Queue介面【Queue 和 BlockingQueue 都是在 Java 5 中加入的】
1)Queue介面
public interface Queue<E> extends Collection<E> { //添加一個元素,添加成功返回true, 如果隊列滿了,就會拋出異常 boolean add(E e); //添加一個元素,添加成功返回true, 如果隊列滿了,返回false boolean offer(E e); //返回並刪除隊首元素,隊列為空則拋出異常 E remove(); //返回並刪除隊首元素,隊列為空則返回null E poll(); //返回隊首元素,但不移除,隊列為空則拋出異常 E element(); //獲取隊首元素,但不移除,隊列為空則返回null E peek(); }
2)BlockingQueue介面
1.源碼展示
public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }
2.分析說明
【1】BlockingQueue 繼承了 Queue 介面,是隊列的一種。阻塞隊列(BlockingQueue)是一個在隊列基礎上又支持了兩個附加操作的隊列,兩個附加操作:
1)支持阻塞的插入方法put: 隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
2)支持阻塞的移除方法take: 隊列空時,獲取元素的線程會等待隊列變為非空。
【2】BlockingQueue和JDK集合包中的Queue介面相容,同時在其基礎上增加了阻塞功能:
入隊:
(1)offer(E e):如果隊列沒滿,返回true,如果隊列已滿,返回false(不阻塞)
(2)offer(E e, long timeout, TimeUnit unit):可以設置阻塞時間,如果隊列已滿,則進行阻塞。超過阻塞時間,則返回false
(3)put(E e):隊列沒滿的時候是正常的插入,如果隊列已滿,則阻塞,直至隊列空出位置
出隊:
(1)poll():如果有數據,出隊,如果沒有數據,返回null (不阻塞)
(2)poll(long timeout, TimeUnit unit):可以設置阻塞時間,如果沒有數據,則阻塞,超過阻塞時間,則返回null
(3)take():隊列里有數據會正常取出數據並刪除;但是如果隊列里無數據,則阻塞,直到隊列里有數據
【3】BlockingQueue常用方法示例:
方法 | 拋出異常 | 返回結果但不拋出異常 | 阻塞 | 阻塞特定時間 |
入隊 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出隊 | remove() | poll() | take() | poll(time, unit) |
獲取隊首元素 | element() | peek() | 不支持 | 不支持 |
阻塞隊列特性
【1】阻塞
1)阻塞隊列區別於其他類型的隊列的最主要的特點就是“阻塞”這兩個字,所以下麵重點介紹阻塞功能:阻塞功能使得生產者和消費者兩端的能力得以平衡,當有任何一端速度過快時,阻塞隊列便會把過快的速度給降下來。實現阻塞最重要的兩個方法是 take 方法和 put 方法。
2)take 方法
take 方法的功能是獲取並移除隊列的頭結點,通常在隊列里有數據的時候是可以正常移除的。可是一旦執行 take 方法的時候,隊列里無數據,則阻塞,直到隊列里有數據。一旦隊列里有數據了,就會立刻解除阻塞狀態,並且取到數據。過程如圖所示:
3)put 方法
put 方法插入元素時,如果隊列沒有滿,那就和普通的插入一樣是正常的插入,但是如果隊列已滿,那麼就無法繼續插入,則阻塞,直到隊列里有了空閑空間。如果後續隊列有了空閑空間,比如消費者消費了一個元素,那麼此時隊列就會解除阻塞狀態,並把需要添加的數據添加到隊列中。過程如圖所示:
【2】是否有界
阻塞隊列還有一個非常重要的屬性,那就是容量的大小,分為有界和無界兩種。無界隊列意味著裡面可以容納非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一個數,可以近似認為是無限容量,因為我們幾乎無法把這個容量裝滿。但是有的阻塞隊列是有界的,例如 ArrayBlockingQueue 如果容量滿了,也不會擴容,所以一旦滿了就無法再往裡放數據了。
阻塞隊列應用場景
【1】BlockingQueue 是線程安全的,我們在很多場景下都可以利用線程安全的隊列來優雅地解決我們業務自身的線程安全問題。比如說,使用生產者/消費者模式的時候,我們生產者只需要往隊列里添加元素,而消費者只需要從隊列里取出它們就可以了,如圖所示:
【2】因為阻塞隊列是線程安全的,所以生產者和消費者都可以是多線程的,不會發生線程安全問題。生產者/消費者直接使用線程安全的隊列就可以,而不需要自己去考慮更多的線程安全問題。這也就意味著,考慮鎖等線程安全問題的重任從“你”轉移到了“隊列”上,降低了我們開發的難度和工作量。
【3】同時,隊列它還能起到一個隔離的作用。比如說我們開發一個銀行轉賬的程式,那麼生產者線程不需要關心具體的轉賬邏輯,只需要把轉賬任務,如賬戶和金額等信息放到隊列中就可以,而不需要去關心銀行這個類如何實現具體的轉賬業務。而作為銀行這個類來講,它會去從隊列里取出來將要執行的具體的任務,再去通過自己的各種方法來完成本次轉賬。這樣就實現了具體任務與執行任務類之間的解耦,任務被放在了阻塞隊列中,而負責放任務的線程是無法直接訪問到我們銀行具體實現轉賬操作的對象的,實現了隔離,提高了安全性。
常見阻塞隊列
BlockingQueue 介面的實現類都被放在了 juc 包中,它們的區別主要體現在存儲結構上或對元素操作上的不同,但是對於take與put操作的原理,卻是類似的(可自行查看源碼分析):
隊列 | 描述 |
ArrayBlockingQueue | 基於數組結構實現的一個有界阻塞隊列 |
LinkedBlockingQueue | 基於鏈表結構實現的一個無界阻塞隊列,指定容量為有界阻塞隊列 |
PriorityBlockingQueue | 支持按優先順序排序的無界阻塞隊列 |
DelayQueue | 基於優先順序隊列(PriorityBlockingQueue)實現的無界阻塞隊列 |
SynchronousQueue | 不存儲元素的阻塞隊列 |
LinkedTransferQueue | 基於鏈表結構實現的一個無界阻塞隊列 |
LinkedBlockingDeque | 基於鏈表結構實現的一個雙端阻塞隊列 |
如何選擇適合的阻塞隊列
選擇策略
通常我們可以從以下 5 個角度考慮,來選擇合適的阻塞隊列:
第 1 個需要考慮的就是功能層面,比如是否需要阻塞隊列幫我們排序,如優先順序排序、延遲執行等。如果有這個需要,我們就必須選擇類似於 PriorityBlockingQueue 之類的有排序能力的阻塞隊列。
第 2 個需要考慮的是容量,或者說是否有存儲的要求,還是只需要“直接傳遞”。在考慮這一點的時候,我們知道前面介紹的那幾種阻塞隊列,有的是容量固定的,如 ArrayBlockingQueue;有的預設是容量無限的,如 LinkedBlockingQueue;而有的裡面沒有任何容量,如 SynchronousQueue;而對於 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞隊列的容量是千差萬別的,我們需要根據任務數量來推算出合適的容量,從而去選取合適的 BlockingQueue。
第 3 個需要考慮的是能否擴容。因為有時我們並不能在初始的時候很好的準確估計隊列的大小,因為業務可能有高峰期、低谷期。如果一開始就固定一個容量,可能無法應對所有的情況,也是不合適的,有可能需要動態擴容。如果我們需要動態擴容的話,那麼就不能選擇 ArrayBlockingQueue ,因為它的容量在創建時就確定了,無法擴容。相反,PriorityBlockingQueue 即使在指定了初始容量之後,後續如果有需要,也可以自動擴容。所以我們可以根據是否需要擴容來選取合適的隊列。
第 4 個需要考慮的點就是記憶體結構。我們分析過 ArrayBlockingQueue 的源碼,看到了它的內部結構是“數組”的形式。和它不同的是,LinkedBlockingQueue 的內部是用鏈表實現的,所以這裡就需要我們考慮到,ArrayBlockingQueue 沒有鏈表所需要的“節點”,空間利用率更高。所以如果我們對性能有要求可以從記憶體的結構角度去考慮這個問題。
第 5 點就是從性能的角度去考慮。比如 LinkedBlockingQueue 由於擁有兩把鎖,它的操作粒度更細,在併發程度高的時候,相對於只有一把鎖的 ArrayBlockingQueue 性能會更好。另外,SynchronousQueue 性能往往優於其他實現,因為它只需要“直接傳遞”,而不需要存儲的過程。如果我們的場景需要直接傳遞的話,可以優先考慮 SynchronousQueue。
線程池對於阻塞隊列的選擇
線程池有很多種,不同種類的線程池會根據自己的特點,來選擇適合自己的阻塞隊列:
FixedThreadPool(SingleThreadExecutor 同理)選取的是 LinkedBlockingQueue
CachedThreadPool 選取的是 SynchronousQueue
ScheduledThreadPool(SingleThreadScheduledExecutor同理)選取的是延遲隊列