1、對於LinkedBlockingQueue需要掌握以下幾點創建入隊(添加元素)出隊(刪除元素)2、創建Node節點內部類與LinkedBlockingQueue的一些屬性 static class Node { E item;//節點封裝的數據 /** ...
1、對於LinkedBlockingQueue需要掌握以下幾點
- 創建
- 入隊(添加元素)
- 出隊(刪除元素)
2、創建
Node節點內部類與LinkedBlockingQueue的一些屬性
static class Node<E> { E item;//節點封裝的數據 /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next;//下一個節點 Node(E x) { item = x; } } /** 指定鏈表容量 */ private final int capacity; /** 當前的元素個數 */ private final AtomicInteger count = new AtomicInteger(0); /** 鏈表頭節點 */ private transient Node<E> head; /** 鏈表尾節點 */ private transient Node<E> last; /** 出隊鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 出隊等待條件 */ private final Condition notEmpty = takeLock.newCondition(); /** 入隊鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 入隊等待條件 */ private final Condition notFull = putLock.newCondition();View Code
2.1、public LinkedBlockingQueue(int capacity)
使用方法:
Queue<String> abq = new LinkedBlockingQueue<String>(1000);
源代碼:
/** * 創建一個 LinkedBlockingQueue,容量為指定容量 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//初始化頭節點和尾節點,均為封裝了null數據的節點 }View Code
註意點:
- LinkedBlockingQueue的組成一個鏈表+兩把鎖+兩個條件
2.2、public LinkedBlockingQueue()
使用方法:
Queue<String> abq = new LinkedBlockingQueue<String>();
源代碼:
/** * 創建一個LinkedBlockingQueue,容量為整數最大值 */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }View Code
註意點:預設容量為整數最大值,可以看做沒有容量限制
3、入隊:
3.1、public boolean offer(E e)
原理:
- 在隊尾插入一個元素, 如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false
使用方法:
- abq.offer("hello1");
源代碼:
/** * 在隊尾插入一個元素, * 容量沒滿,可以立即插入,返回true; * 隊列滿了,直接返回false * 註:如果使用了限制了容量的隊列,這個方法比add()好,因為add()插入失敗就會拋出異常 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count;//獲取隊列中的元素個數 if (count.get() == capacity)//隊列滿了 return false; int c = -1; final ReentrantLock putLock = this.putLock; putLock.lock();//獲取入隊鎖 try { if (count.get() < capacity) {//容量沒滿 enqueue(e);//入隊 c = count.getAndIncrement();//容量+1 if (c + 1 < capacity)//如果上邊+1後的容量再加1,還小於指定容量(說明在插入當前元素後,至少還可以再插一個元素) notFull.signal();//喚醒等待notFull條件的其中一個線程 } } finally { putLock.unlock();//釋放入隊鎖 } if (c == 0)//如果c==0,這是什麼情況? signalNotEmpty(); return c >= 0; }View Code
/** * 創建一個節點,並加入鏈表尾部 * @param x */ private void enqueue(E x) { /* * 封裝新節點,並賦給當前的最後一個節點的下一個節點,然後在將這個節點設為最後一個節點 */ last = last.next = new Node<E>(x); }View Code
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock();//獲取出隊鎖 try { notEmpty.signal();//喚醒等待notEmpty條件的線程中的一個 } finally { takeLock.unlock();//釋放出隊鎖 } }View Code
如果,入隊邏輯不懂,查看最後總結部分入隊邏輯的圖,代碼非常簡單,流程看註釋即可,只有一點註意點:
- c==0是怎麼出現的?
3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
原理:
- 在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現以下三種情況:
- 被喚醒
- 等待時間超時
- 當前線程被中斷
使用方法:
try { abq.offer("hello2",1000,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現以下三種情況: * 1、被喚醒 * 2、等待時間超時 * 3、當前線程被中斷 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout);// 轉換為納秒 int c = -1; final ReentrantLock putLock = this.putLock;// 入隊鎖 final AtomicInteger count = this.count;// 總數量 putLock.lockInterruptibly(); try { while (count.get() == capacity) {// 容量已滿 if (nanos <= 0)// 已經超時 return false; /* * 進行等待: 在這個過程中可能發生三件事: * 1、被喚醒-->繼續當前這個while迴圈 * 2、超時-->繼續當前這個while迴圈 * 3、被中斷-->拋出中斷異常InterruptedException */ nanos = notFull.awaitNanos(nanos); } enqueue(e);// 入隊 c = count.getAndIncrement();// 入隊元素數量+1 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }View Code
註意:
- awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去查看AQS的源代碼。
3.3、public void put(E e) throws InterruptedException
原理:
- 在隊尾插入一個元素,如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷
使用方法:
try { abq.put("hello1"); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 在隊尾插一個元素 * 如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock;//入隊鎖 final AtomicInteger count = this.count;//當前隊列中的元素個數 putLock.lockInterruptibly();//加鎖 try { while (count.get() == capacity) {//如果隊列滿了 /* * 加入notFull等待隊列,直到隊列元素不滿了, * 被其他線程使用notFull.signal()喚醒 */ notFull.await(); } enqueue(e);//入隊 c = count.getAndIncrement();//入隊數量+1 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }View Code
4、出隊
4.1、public E poll()
原理:
- 如果沒有元素,直接返回null;如果有元素,出隊
使用方法:
abq.poll();
源代碼:
/** * 出隊: * 1、如果沒有元素,直接返回null * 2、如果有元素,出隊 */ public E poll() { final AtomicInteger count = this.count;//獲取元素數量 if (count.get() == 0)//沒有元素 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock();//獲取出隊鎖 try { if (count.get() > 0) {//有元素 x = dequeue();//出隊 //元素個數-1(註意:該方法是一個無限迴圈,直到減1成功為止) c = count.getAndDecrement(); if (c > 1)//還有元素 notEmpty.signal();//喚醒等待在notEmpty隊列中的其中一條線程 } } finally { takeLock.unlock();//釋放出隊鎖 } if (c == capacity)//c == capacity是怎麼發生的 signalNotFull(); return x; }View Code
/** * 從隊列頭部移除一個節點 */ private E dequeue() { Node<E> h = head;//獲取頭節點:x==null Node<E> first = h.next;//將頭節點的下一個節點賦值給first h.next = h; // 將當前將要出隊的節點置null(為了使其做head節點做準備) head = first;//將當前將要出隊的節點作為了頭節點 E x = first.item;//獲取出隊節點的值 first.item = null;//將出隊節點的值置空 return x; }View Code
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }View Code
註意:出隊邏輯如果不懂,查看最後總結部分的圖
4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException
原理:
- 從隊頭刪除一個元素,如果隊列不空,出隊;如果隊列已空且已經超時,返回null;如果隊列已空且時間未超時,則進入等待,直到出現以下三種情況:
- 被喚醒
- 等待時間超時
- 當前線程被中斷
使用方法:
try { abq.poll(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 從隊列頭部刪除一個元素, * 如果隊列不空,出隊; * 如果隊列已空,判斷時間是否超時,如果已經超時,返回null * 如果隊列已空且時間未超時,則進入等待,直到出現以下三種情況: * 1、被喚醒 * 2、等待時間超時 * 3、當前線程被中斷 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) {//如果隊列沒有元素 if (nanos <= 0)//已經超時 return null; /* * 進行等待: * 在這個過程中可能發生三件事: * 1、被喚醒-->繼續當前這個while迴圈 * 2、超時-->繼續當前這個while迴圈 * 3、被中斷-->拋出異常 */ nanos = notEmpty.awaitNanos(nanos); } x = dequeue();//出隊 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }View Code
4.3、public E take() throws InterruptedException
原理:
- 將隊頭元素出隊,如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷
使用方法:
try { abq.take(); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 出隊: * 如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count;//獲取隊列中的元素總量 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//獲取出隊鎖 try { while (count.get() == 0) {//如果沒有元素,一直阻塞 /* * 加入等待隊列, 一直等待條件notEmpty(即被其他線程喚醒) * (喚醒其實就是,有線程將一個元素入隊了,然後調用notEmpty.signal()喚醒其他等待這個條件的線程,同時隊列也不空了) */ notEmpty.await(); } x = dequeue();//出隊 c = count.getAndDecrement();//元素數量-1 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }View Code
總結:
1、具體入隊與出隊的原理圖:
圖中每一個節點前半部分表示封裝的數據x,後邊的表示指向的下一個引用。
1.1、初始化
初始化之後,初始化一個數據為null,且head和last節點都是這個節點。
1.2、入隊兩個元素過後
這個可以根據入隊方法enqueue(E x)來看,源代碼再貼一遍:
/** * 創建一個節點,並加入鏈表尾部 * * @param x */ private void enqueue(E x) { /* * 封裝新節點,並賦給當前的最後一個節點的下一個節點,然後在將這個節點設為最後一個節點 */ last = last.next = new Node<E>(x); }View Code
其實這我們就可以發現其實真正意義上出隊的頭節點是Head節點的下一個節點。(這也就是Node這個內部類中對next的註釋,我沒有翻譯)
1.3、出隊一個元素後
錶面上看,只是將頭節點的next指針指向了要刪除的x1.next,事實上這樣我覺的就完全可以,但是jdk實際上是將原來的head節點刪除了,而上邊看到的這個head節點,正是剛剛出隊的x1節點,只是其值被置空了。
這一塊對應著源代碼來看:dequeue()
/** * 從隊列頭部移除一個節點 */ private E dequeue() { Node<E> h = head;// 獲取頭節點:x==null Node<E> first = h.next;// 將頭節點的下一個節點賦值給first h.next = h; // 將當前將要出隊的節點置null(為了使其做head節點做準備) head = first;// 將當前將要出隊的節點作為了頭節點 E x = first.item;// 獲取出隊節點的值 first.item = null;// 將出隊節點的值置空 return x; }View Code
2、三種入隊對比:
- offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞
- put(E e):如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷-->阻塞
- offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現以下三種情況:-->阻塞
- 被喚醒
- 等待時間超時
- 當前線程被中斷
3、三種出隊對比:
- poll():如果沒有元素,直接返回null;如果有元素,出隊
- take():如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷-->阻塞
- poll(long timeout, TimeUnit unit):如果隊列不空,出隊;如果隊列已空且已經超時,返回null;如果隊列已空且時間未超時,則進入等待,直到出現以下三種情況:
- 被喚醒
- 等待時間超時
- 當前線程被中斷
4、ArrayBlockingQueue與LinkedBlockingQueue對比
- ArrayBlockingQueue:
- 一個對象數組+一把鎖+兩個條件
- 入隊與出隊都用同一把鎖
- 在只有入隊高併發或出隊高併發的情況下,因為操作數組,且不需要擴容,性能很高
- 採用了數組,必須指定大小,即容量有限
- LinkedBlockingQueue:
- 一個單向鏈表+兩把鎖+兩個條件
- 兩把鎖,一把用於入隊,一把用於出隊,有效的避免了入隊與出隊時使用一把鎖帶來的競爭。
- 在入隊與出隊都高併發的情況下,性能比ArrayBlockingQueue高很多
- 採用了鏈表,最大容量為整數最大值,可看做容量無限
兩個疑問:(望大神指點)
- 入隊時:c==0是怎樣出現的?
- 出隊時:c==capcity是怎樣出現的?