LinkedBlockingDeque介紹 【1】LinkedBlockingDeque是一個基於鏈表實現的雙向阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,可以看做無界隊列,但也可以設置容量限制,作為有界隊列。 【2】相比於其他阻塞隊列,LinkedBlockingD ...
LinkedBlockingDeque介紹
【1】LinkedBlockingDeque是一個基於鏈表實現的雙向阻塞隊列,預設情況下,該阻塞隊列的大小為Integer.MAX_VALUE,可以看做無界隊列,但也可以設置容量限制,作為有界隊列。
【2】相比於其他阻塞隊列,LinkedBlockingDeque 多了 addFirst、addLast、peekFirst、peekLast 等方法。以first結尾的方法,表示插入、獲取或移除雙端隊列的第一個元素。以 last 結尾的方法,表示插入、獲取或移除雙端隊列的最後一個元素。但本質上並沒有優化鎖的競爭情況,因為不管是從隊首還是隊尾,都是在競爭同一把鎖,只不過數據插入和獲取的方式多了。
LinkedBlockingDeque的源碼分析
【1】屬性值
//典型的雙端鏈表結構 static final class Node<E> { E item; //存儲元素 Node<E> prev; //前驅節點 Node<E> next; //後繼節點 Node(E x) { item = x; } } // 鏈表頭 本身是不存儲任何元素的,初始化時item指向null transient Node<E> first; // 鏈表尾 transient Node<E> last; // 元素數量 private transient int count; // 容量,指定容量就是有界隊列 private final int capacity; //重入鎖 final ReentrantLock lock = new ReentrantLock(); // 當隊列無元素時,鎖會阻塞在notEmpty條件上,等待其它線程喚醒 private final Condition notEmpty = lock.newCondition(); // 當隊列滿了時,鎖會阻塞在notFull上,等待其它線程喚醒 private final Condition notFull = lock.newCondition();
【2】構造函數
public LinkedBlockingDeque() { // 如果沒傳容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // 為保證可見性而加的鎖 try { for (E e : c) { if (e == null) throw new NullPointerException(); //從尾部插入元素,插入失敗拋出異常 if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
【3】核心方法分析
1)入隊方法
//添加頭結點元素 public void addFirst(E e) { //如果添加失敗,拋出異常 if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } //添加尾結點元素 public void addLast(E e) { //如果添加失敗,拋出異常 if (!offerLast(e)) throw new IllegalStateException("Deque full"); } //添加頭結點元素 public boolean offerFirst(E e) { //添加的元素為空 拋出空指針異常 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,並調用添加頭結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { //解鎖 lock.unlock(); } } //添加尾結點元素 public boolean offerLast(E e) { //添加的元素為空 拋出空指針異常 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,並調用添加尾結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); } finally { lock.unlock(); } } //頭部插入 private boolean linkFirst(Node<E> node) { //當前容量大於隊列最大容量時,直接返回插入失敗 if (count >= capacity) return false; //隊列中的頭結點 Node<E> f = first; //原來的頭結點作為 新插入結點的後一個結點 node.next = f; //替換頭結點 為新插入的結點 first = node; //尾結點不存在,將尾結點置為當前新插入的結點 if (last == null) last = node; else //原來的頭結點的上一個結點為當前新插入的結點 f.prev = node; //當前容量增加 ++count; //喚醒讀取時因隊列中無元素而導致阻塞的線程 notEmpty.signal(); return true; } //尾部插入 private boolean linkLast(Node<E> node) { //當前容量大於隊列最大容量時,直接返回插入失敗 if (count >= capacity) return false; //獲取尾節點 Node<E> l = last; //將新插入的前一個節點指向原來的尾節點 node.prev = l; //尾結點設置為新插入的結點 last = node; //頭結點為空,新插入的結點作為頭節點 if (first == null) first = node; else //將原尾結點的下一個結點指向新插入的節點 l.next = node; //當前容量增加 ++count; //喚醒讀取時因隊列中無元素而導致阻塞的線程 notEmpty.signal(); return true; } //頭結點插入 public void putFirst(E e) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,並調用添加頭結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { //頭結點如果插入失敗,會阻塞該方法,直到取出結點或刪除結點時被喚醒 while (!linkFirst(node)) notFull.await(); } finally { //解鎖 lock.unlock(); } } //尾結點插入 public void putLast(E e) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //這邊會加鎖,並調用添加尾結點插入的核心方法 final ReentrantLock lock = this.lock; lock.lock(); try { //尾結點如果插入失敗,會阻塞該方法,直到取出結點或刪除結點時被喚醒 while (!linkLast(node)) notFull.await(); } finally { lock.unlock(); } } //頭結點插入 可指定阻塞時間 public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //計算剩餘應阻塞時間 long nanos = unit.toNanos(timeout); //這邊會加鎖,並調用添加頭結點插入的核心方法 final ReentrantLock lock = this.lock; //獲取可響應中斷的鎖,保證阻塞時間到期後可重新獲得鎖 lock.lockInterruptibly(); try { //頭結點如果插入失敗,會阻塞該方法,直到取出結點或刪除結點時被喚醒 //或者阻塞時間到期直接返回失敗 while (!linkFirst(node)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } return true; } finally { //解鎖 lock.unlock(); } } //頭結點插入 可指定阻塞時間 public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException { //元素不能為空 if (e == null) throw new NullPointerException(); //將元素構造為結點 Node<E> node = new Node<E>(e); //計算剩餘應阻塞時間 long nanos = unit.toNanos(timeout); //這邊會加鎖,並調用添加尾結點插入的核心方法 final ReentrantLock lock = this.lock; //獲取可響應中斷的鎖,保證阻塞時間到期後可重新獲得鎖 try { //尾結點如果插入失敗,會阻塞該方法,直到取出結點或刪除結點時被喚醒 //或者阻塞時間到期直接返回失敗 while (!linkLast(node)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } return true; } finally { //解鎖 lock.unlock(); } }
2)出隊方法
//刪除頭結點 - 加鎖 直接返回結果 public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { //調用刪除頭結點核心方法 return unlinkFirst(); } finally { lock.unlock(); } } //刪除尾結點 - 加鎖 直接返回結果 public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { //調用刪除尾結點核心方法 return unlinkLast(); } finally { lock.unlock(); } } //刪除頭結點 - 加鎖,如果刪除失敗則阻塞 public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //調用刪除頭結點核心方法 while ((x = unlinkFirst()) == null) //阻塞 notEmpty.await(); return x; } finally { lock.unlock(); } } //刪除尾結點 - 加鎖,如果刪除失敗則阻塞 public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //調用刪除尾結點核心方法 while ((x = unlinkLast()) == null) //阻塞 notEmpty.await(); return x; } finally { lock.unlock(); } } //刪除頭結點 - 加鎖,如果刪除失敗則阻塞 可以指定阻塞時間 public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { //計算應阻塞時間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; //調用刪除頭結點核心方法 while ((x = unlinkFirst()) == null) { if (nanos <= 0) //阻塞時間過期,返回結果 return null; //阻塞 並指定阻塞時間 nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } } //刪除尾結點 - 加鎖,如果刪除失敗則阻塞 可以指定阻塞時間 public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { //計算應阻塞時間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; //調用刪除尾結點核心方法 while ((x = unlinkLast()) == null) { if (nanos <= 0) //阻塞時間過期,返回結果 return null; //阻塞 並指定阻塞時間 nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } } //刪除頭結點 private E unlinkFirst() { //獲取當前頭結點 Node<E> f = first; //頭結點為空 返回空 if (f == null) return null; //獲取頭結點的下一個結點 Node<E> n = f.next; //獲取頭結點元素(記錄return需要用到的刪除了哪個元素) E item = f.item; //將頭結點元素置為null f.item = null; //將頭結點的下一個結點指向自己 方便gc f.next = f; //設置頭結點為原頭結點的下一個結點 first = n; //若原頭結點的下一個結點不存在(隊列中沒有了結點) if (n == null) //將尾結點也置為null last = null; else //新的頭結點的前一個結點指向null,因為他已經作為了頭結點 所以不需要指向上一個結點 n.prev = null; //當前數量減少 --count; //喚醒因添加元素時隊列容量滿導致阻塞的線程 notFull.signal(); //返回原來的頭結點中的元素 return item; } //刪除尾結點 private E unlinkLast() { //獲取當前尾結點 Node<E> l = last; //尾結點不存在 返回null if (l == null) return null; //獲取當前尾結點的上一個結點 Node<E> p = l.prev; //獲取當前尾結點中的元素,需要返回記錄 E item = l.item; //將當前尾結點的元素置為null l.item = null; //並將當前尾結點的上一個結點指向自己,方便gc l.prev = l; //設置新的尾結點,為原來尾結點的前一個結點 last = p; //若無新的尾結點,頭結點置為空(隊列中沒有了結點) if (p == null) first = null; else //將新的尾結點的下一個結點指向null,因為他已經為尾結點所以不需要指向下一個結點 p.next = null; //數量減少 --count; //喚醒因添加元素時隊列容量滿導致阻塞的線程 notFull.signal(); //返回原來的尾結點元素 return item; }
LinkedBlockingDeque總結
【1】一個鏈表阻塞雙端無界隊列,可以指定容量,預設為 Integer.MAX_VALUE
【2】數據結構:鏈表(同LinkedBlockingQueue,內部類Node存儲元素)
【3】鎖:ReentrantLock(同ArrayBlockingQueue)存取是同一把鎖,操作的是同一個數組對象
【4】阻塞對象(notEmpty【出隊:隊列count=0,無元素可取時,阻塞在該對象上】,notFull【入隊:隊列count=capacity,放不進元素時,阻塞在該對象上】)
【5】入隊,首尾都可以,均可以添加刪除。
【6】出隊,首尾都可以,均可以添加刪除。
【7】應用場景:常用於 “工作竊取演算法”。