LinkedBlockingQueue介紹 【1】LinkedBlockingQueue是一個基於鏈表實現的阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,由於這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界隊列,代表它幾乎沒有界限,隊列可以隨著 ...
LinkedBlockingQueue介紹
【1】LinkedBlockingQueue是一個基於鏈表實現的阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,由於這個數值特別大,所以 LinkedBlockingQueue 也被稱作無界隊列,代表它幾乎沒有界限,隊列可以隨著元素的添加而動態增長,但是如果沒有剩餘記憶體,則隊列將拋出OOM錯誤。所以為了避免隊列過大造成機器負載或者記憶體爆滿的情況出現,我們在使用的時候建議手動傳一個隊列的大小。
【2】LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。LinkedBlockingQueue採用兩把鎖的鎖分離技術實現入隊出隊互不阻塞,添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以並行執行。
LinkedBlockingQueue使用
//指定隊列的大小創建有界隊列 BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100); //無界隊列 BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue的源碼分析
【1】屬性值
// 容量,指定容量就是有界隊列 private final int capacity; // 元素數量,用原子操作類的原因在於有兩個線程都會操作需要保證可見性 private final AtomicInteger count = new AtomicInteger(); // 鏈表頭 本身是不存儲任何元素的,初始化時item指向null transient Node<E> head; // 鏈表尾 private transient Node<E> last; // take鎖 鎖分離,提高效率 private final ReentrantLock takeLock = new ReentrantLock(); // notEmpty條件 // 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒 private final Condition notEmpty = takeLock.newCondition(); // put鎖 private final ReentrantLock putLock = new ReentrantLock(); // notFull條件 // 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒 private final Condition notFull = putLock.newCondition(); //典型的單鏈表結構 static class Node<E> { E item; //存儲元素 Node<E> next; //後繼節點 單鏈表結構 Node(E x) { item = x; } }
【2】構造函數
public LinkedBlockingQueue() { // 如果沒傳容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化head和last指針為空值節點 last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 為保證可見性而加的鎖 try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
【3】核心方法分析
1)入隊put方法
public void put(E e) throws InterruptedException { // 不允許null元素 if (e == null) throw new NullPointerException(); int c = -1; // 新建一個節點 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 使用put鎖加鎖 putLock.lockInterruptibly(); try { // 如果隊列滿了,就阻塞在notFull上等待被其它線程喚醒(阻塞生產者線程) while (count.get() == capacity) { notFull.await(); } // 隊列不滿,就入隊 enqueue(node); c = count.getAndIncrement();// 隊列長度加1,返回原值 // 如果現隊列長度小於容量,notFull條件隊列轉同步隊列,準備喚醒一個阻塞在notFull條件上的線程(可以繼續入隊) // 這裡為啥要喚醒一下呢?因為存在情況是,沒人獲取時,隊列滿了而且還不斷有人塞數據,此時會一大批線程被阻塞,現在有空餘位置了,應該被喚醒 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); // 真正喚醒生產者線程 } // 如果原隊列長度為0,現在加了一個元素後立即喚醒阻塞在notEmpty上的線程 if (c == 0) signalNotEmpty(); } private void enqueue(Node<E> node) { // 直接加到last後面,last指向入隊元素 last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock();// 加take鎖 try { notEmpty.signal();// notEmpty條件隊列轉同步隊列,準備喚醒阻塞在notEmpty上的線程 } finally { takeLock.unlock(); // 真正喚醒消費者線程 } }
2)出隊take方法
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 使用takeLock加鎖 takeLock.lockInterruptibly(); try { // 如果隊列無元素,則阻塞在notEmpty條件上(消費者線程阻塞) while (count.get() == 0) { notEmpty.await(); } // 否則,出隊 x = dequeue(); c = count.getAndDecrement();//長度-1,返回原值 if (c > 1)// 如果取之前隊列長度大於1,notEmpty條件隊列轉同步隊列,準備喚醒阻塞在notEmpty上的線程,原因與入隊同理 notEmpty.signal(); } finally { takeLock.unlock(); // 真正喚醒消費者線程 } // 為什麼隊列是滿的才喚醒阻塞在notFull上的線程呢? // 因為喚醒是需要加putLock的,這是為了減少鎖的次數,所以,這裡索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程, // 這也是鎖分離帶來的代價 // 如果取之前隊列長度等於容量(已滿),則喚醒阻塞在notFull的線程 if (c == capacity) signalNotFull(); return x; } private E dequeue() { // head節點本身是不存儲任何元素的 // 這裡把head刪除,並把head下一個節點作為新的值 // 並把其值置空,返回原來的值 Node<E> h = head; Node<E> first = h.next; h.next = h; // 方便GC head = first; E x = first.item; first.item = null; return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal();// notFull條件隊列轉同步隊列,準備喚醒阻塞在notFull上的線程 } finally { putLock.unlock(); // 解鎖,這才會真正的喚醒生產者線程 } }
LinkedBlockingQueue總結
【1】無界阻塞隊列,可以指定容量,預設為 Integer.MAX_VALUE,先進先出,存取互不幹擾
【2】數據結構:鏈表(可以指定容量,預設為 Integer.MAX_VALUE,內部類Node存儲元素)
【3】鎖分離:存取互不幹擾,存取操作的是不同的Node對象(takeLock【取Node節點保證前驅後繼不亂】,putLock【存Node節點保證前驅後繼不亂】,刪除時則兩個鎖一起加)【這是最大的亮點】
【4】阻塞對象(notEmpty【出隊:隊列count=0,無元素可取時,阻塞在該對象上】,notFull【入隊:隊列count=capacity,放不進元素時,阻塞在該對象上】)
【5】入隊,從隊尾入隊,由last指針記錄。
【6】出隊,從隊首出隊,由head指針記錄。
【7】線程池中採用LinkedBlockingQueue而不採用ArrayBlockingQueue的原因便是因為鎖分離帶來了性能的提升,大大提高隊列的吞吐量。