註意:在閱讀本文之前或在閱讀的過程中,需要用到ReentrantLock,內容見《第五章 ReentrantLock源碼解析1--獲得非公平鎖與公平鎖lock()》《第六章 ReentrantLock源碼解析2--釋放鎖unlock()》《第七章 ReentrantLock總結》1、對於ArrayB...
註意:在閱讀本文之前或在閱讀的過程中,需要用到ReentrantLock,內容見《第五章 ReentrantLock源碼解析1--獲得非公平鎖與公平鎖lock()》《第六章 ReentrantLock源碼解析2--釋放鎖unlock()》《第七章 ReentrantLock總結》
1、對於ArrayBlockingQueue需要掌握以下幾點
- 創建
- 入隊(添加元素)
- 出隊(刪除元素)
2、創建
- public ArrayBlockingQueue(int capacity, boolean fair)
- public ArrayBlockingQueue(int capacity)
使用方法:
- Queue<String> abq = new ArrayBlockingQueue<String>(2);
- Queue<String> abq = new ArrayBlockingQueue<String>(2,true);
通過使用方法,可以看出ArrayBlockingQueue支持ReentrantLock的公平鎖模式與非公平鎖模式,對於這兩種模式,查看本文開頭的文章即可。
源代碼如下:
private final E[] items;//底層數據結構 private int takeIndex;//用來為下一個take/poll/remove的索引(出隊) private int putIndex;//用來為下一個put/offer/add的索引(入隊) private int count;//隊列中元素的個數 /* * Concurrency control uses the classic two-condition algorithm found in any * textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock;//鎖 /** Condition for waiting takes */ private final Condition notEmpty;//等待出隊的條件 /** Condition for waiting puts */ private final Condition notFull;//等待入隊的條件View Code
/** * 創造一個隊列,指定隊列容量,指定模式 * @param fair * true:先來的線程先操作 * false:順序隨機 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity];//初始化類變數數組items lock = new ReentrantLock(fair);//初始化類變數鎖lock notEmpty = lock.newCondition();//初始化類變數notEmpty Condition notFull = lock.newCondition();//初始化類變數notFull Condition } /** * 創造一個隊列,指定隊列容量,預設模式為非公平模式 * @param capacity <1會拋異常 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }View Code
註意:
- ArrayBlockingQueue的組成:一個對象數組+1把鎖ReentrantLock+2個條件Condition
- 在查看源碼的過程中,也要模仿帶條件鎖的使用,這個雙條件鎖模式是很經典的模式
3、入隊
3.1、public boolean offer(E e)
原理:
- 在隊尾插入一個元素, 如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false
使用方法:
- abq.offer("hello1");
源代碼:
/** * 在隊尾插入一個元素, * 如果隊列沒滿,立即返回true; * 如果隊列滿了,立即返回false * 註意:該方法通常優於add(),因為add()失敗直接拋異常 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length)//數組滿了 return false; else {//數組沒滿 insert(e);//插入一個元素 return true; } } finally { lock.unlock(); } }View Code
private void insert(E x) { items[putIndex] = x;//插入元素 putIndex = inc(putIndex);//putIndex+1 ++count;//元素數量+1 /** * 喚醒一個線程 * 如果有任意一個線程正在等待這個條件,那麼選中其中的一個區喚醒。 * 在從等待狀態被喚醒之前,被選中的線程必須重新獲得鎖 */ notEmpty.signal(); }View Code
/** * i+1,數組下標+1 */ final int inc(int i) { return (++i == items.length) ? 0 : i; }View Code
代碼非常簡單,流程看註釋即可,只有一點註意點:
- 在插入元素結束後,喚醒等待notEmpty條件(即獲取元素)的線程,可以發現這類似於生產者-消費者模式
3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
原理:
- 在隊尾插入一個元素,,如果數組已滿,則進入等待,直到出現以下三種情況:
- 被喚醒
- 等待時間超時
- 當前線程被中斷
使用方法:
try { abq.offer("hello2",1000,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 在隊尾插入一個元素, * 如果數組已滿,則進入等待,直到出現以下三種情況: * 1、被喚醒 * 2、等待時間超時 * 3、當前線程被中斷 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout);//將超時時間轉換為納秒 final ReentrantLock lock = this.lock; /* * lockInterruptibly(): * 1、 在當前線程沒有被中斷的情況下獲取鎖。 * 2、如果獲取成功,方法結束。 * 3、如果鎖無法獲取,當前線程被阻塞,直到下麵情況發生: * 1)當前線程(被喚醒後)成功獲取鎖 * 2)當前線程被其他線程中斷 * * lock() * 獲取鎖,如果鎖無法獲取,當前線程被阻塞,直到鎖可以獲取並獲取成功為止。 */ lock.lockInterruptibly();//加可中斷的鎖 try { for (;;) { if (count != items.length) {//隊列未滿 insert(e); return true; } if (nanos <= 0)//已超時 return false; try { /* * 進行等待: * 在這個過程中可能發生三件事: * 1、被喚醒-->繼續當前這個for(;;)迴圈 * 2、超時-->繼續當前這個for(;;)迴圈 * 3、被中斷-->之後直接執行catch部分的代碼 */ nanos = notFull.awaitNanos(nanos);//進行等待(在此過程中,時間會流失,在此過程中,線程也可能被喚醒) } catch (InterruptedException ie) {//在等待的過程中線程被中斷 notFull.signal(); // 喚醒其他未被中斷的線程 throw ie; } } } finally { lock.unlock(); } }View Code
註意:
- awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去查看AQS的源代碼。
- lockInterruptibly()與lock()的區別見註釋
3.3、public void put(E e) throws InterruptedException
原理:
- 在隊尾插入一個元素,如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷
使用方法:
try { abq.put("hello1"); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 在隊尾插入一個元素 * 如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length)//隊列滿了,一直阻塞在這裡 /* * 一直等待條件notFull,即被其他線程喚醒 * (喚醒其實就是,有線程將一個元素出隊了,然後調用notFull.signal()喚醒其他等待這個條件的線程,同時隊列也不慢了) */ notFull.await(); } catch (InterruptedException ie) {//如果被中斷 notFull.signal(); // 喚醒其他等待該條件(notFull,即入隊)的線程 throw ie; } insert(e); } finally { lock.unlock(); } }View Code
4、出隊
4.1、public E poll()
原理:
- 如果沒有元素,直接返回null;如果有元素,將隊頭元素置null,但是要註意隊頭是隨時變化的,並非一直是items[0]。
使用方法:
abq.poll();
源代碼:
/** * 出隊 */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0)//如果沒有元素,直接返回null,而非拋出異常 return null; E x = extract(); return x; } finally { lock.unlock(); } }View Code
/** * 出隊 */ private E extract() { final E[] items = this.items; E x = items[takeIndex];//獲取出隊元素 items[takeIndex] = null;//將出隊元素位置置空 /* * 第一次出隊的元素takeIndex==0,第二次出隊的元素takeIndex==1 * (註意:這裡出隊之後,並沒有將後面的數組元素向前移) */ takeIndex = inc(takeIndex); --count;//數組元素個數-1 notFull.signal();//數組已經不滿了,喚醒其他等待notFull條件的線程 return x;//返回出隊的元素 }View Code
4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException
原理:
- 從對頭刪除一個元素,如果數組不空,出隊;如果數組已空且已經超時,返回null;如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
- 被喚醒
- 等待時間超時
- 當前線程被中斷
使用方法:
try { abq.poll(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 從對頭刪除一個元素, * 如果數組不空,出隊; * 如果數組已空,判斷時間是否超時,如果已經超時,返回null * 如果數組已空且時間未超時,則進入等待,直到出現以下三種情況: * 1、被喚醒 * 2、等待時間超時 * 3、當前線程被中斷 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout);//將時間轉換為納秒 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != 0) {//數組不空 E x = extract();//出隊 return x; } if (nanos <= 0)//時間超時 return null; try { /* * 進行等待: * 在這個過程中可能發生三件事: * 1、被喚醒-->繼續當前這個for(;;)迴圈 * 2、超時-->繼續當前這個for(;;)迴圈 * 3、被中斷-->之後直接執行catch部分的代碼 */ nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }View Code
4.3、public E take() throws InterruptedException
原理:
- 將隊頭元素出隊,如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷
使用方法:
try { abq.take(); } catch (InterruptedException e) { e.printStackTrace(); }View Code
源代碼:
/** * 將隊頭元素出隊 * 如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0)//如果數組為空,一直阻塞在這裡 /* * 一直等待條件notEmpty,即被其他線程喚醒 * (喚醒其實就是,有線程將一個元素入隊了,然後調用notEmpty.signal()喚醒其他等待這個條件的線程,同時隊列也不空了) */ notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }View Code
總結:
1、具體入隊與出隊的原理圖:這裡只說一種情況,見下圖,途中深色部分表示已有元素,淺色部分沒有元素。
上面這種情況是怎麼形成的呢?當隊列滿了,這時候,隊頭元素為items[0]出隊了,就形成上邊的這種情況。
假設現在又要出隊了,則現在的隊頭元素是items[1],出隊後就形成下麵的情形。
出隊後,對頭元素就是items[2]了,假設現在有一個元素將要入隊,根據inc方法,我們可以得知,他要插入到items[0]去,入隊了形成下圖:
以上就是整個入隊出隊的流程,inc方法上邊已經給出,這裡再貼一遍:
/** * i+1,數組下標+1 * 註意:這裡這樣寫的原因。 */ final int inc(int i) { return (++i == items.length) ? 0 : i; }View Code
2、三種入隊對比:
- offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞
- put(E e):如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷-->阻塞
- offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果數組已滿,則進入等待,直到出現以下三種情況:-->阻塞
- 被喚醒
- 等待時間超時
- 當前線程被中斷
3、三種出對對比:
- poll():如果沒有元素,直接返回null;如果有元素,出隊
- take():如果隊列空了,一直阻塞,直到數組不為空或者線程被中斷-->阻塞
- poll(long timeout, TimeUnit unit):如果數組不空,出隊;如果數組已空且已經超時,返回null;如果數組已空且時間未超時,則進入等待,直到出現以下三種情況:
- 被喚醒
- 等待時間超時
- 當前線程被中斷