ArrayBlockingQueue介紹 ArrayBlockingQueue是最典型的有界阻塞隊列,其內部是用數組存儲元素的,初始化時需要指定容量大小,利用 ReentrantLock 實現線程安全。 在生產者-消費者模型中使用時,如果生產速度和消費速度基本匹配的情況下,使用ArrayBlocki ...
ArrayBlockingQueue介紹
ArrayBlockingQueue是最典型的有界阻塞隊列,其內部是用數組存儲元素的,初始化時需要指定容量大小,利用 ReentrantLock 實現線程安全。
在生產者-消費者模型中使用時,如果生產速度和消費速度基本匹配的情況下,使用ArrayBlockingQueue是個不錯選擇;當如果生產速度遠遠大於消費速度,則會導致隊列填滿,大量生產線程被阻塞。
使用獨占鎖ReentrantLock實現線程安全,入隊和出隊操作使用同一個鎖對象,也就是只能有一個線程可以進行入隊或者出隊操作;這也就意味著生產者和消費者無法並行操作,在高併發場景下會成為性能瓶頸。
ArrayBlockingQueue的源碼分析
【1】屬性值
/** 隊列元素數組 */ final Object[] items; /** 下一個被take,poll,peek,remove的元素位置 */ int takeIndex; /** 插入位置包含put,offer,add */ int putIndex; /** 隊列元素的數量 */ int count; /** 重入鎖 */ final ReentrantLock lock; /** 等待獲取的條件隊列 */ private final Condition notEmpty; /** 等待插入的條件隊列 */ private final Condition notFull; //迭代器的共用狀態 transient Itrs itrs = null;
【2】構造函數
//預設採用非公平鎖 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) { //初始化阻塞隊列 this(capacity, fair); //加鎖將數組元素填入阻塞隊列(主要是考慮到重排序和可見性問題,因為Object[] items 並沒有加上 volatile 屬性) final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; //將插入位置下變更 putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
【3】核心方法分析
1)入隊put方法
public void put(E e) throws InterruptedException { //檢查是否為空 checkNotNull(e); final ReentrantLock lock = this.lock; //加鎖,如果線程中斷拋出異常 lock.lockInterruptibly(); try { //阻塞隊列已滿,則將生產者掛起,等待消費者喚醒 //設計註意點: 用while不用if是為了防止虛假喚醒 while (count == items.length) notFull.await(); //隊列滿了,使用notFull等待(生產者阻塞) // 入隊 enqueue(e); } finally { lock.unlock(); // 喚醒消費者線程 } } private void enqueue(E x) { final Object[] items = this.items; //入隊 使用的putIndex items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; //設計的精髓: 環形數組,putIndex指針到數組盡頭了,返回頭部 count++; //notEmpty條件隊列轉同步隊列,準備喚醒消費者線程,因為入隊了一個元素,肯定不為空了 notEmpty.signal(); }
2)出隊take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加鎖,如果線程中斷拋出異常 lock.lockInterruptibly(); try { //如果隊列為空,則消費者掛起 while (count == 0) notEmpty.await(); //出隊 return dequeue(); } finally { lock.unlock();// 喚醒生產者線程 } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //取出takeIndex位置的元素 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; //設計的精髓: 環形數組,takeIndex 指針到數組盡頭了,返回頭部 count--; if (itrs != null) itrs.elementDequeued(); //notFull條件隊列轉同步隊列,準備喚醒生產者線程,此時隊列有空位 notFull.signal(); return x; }
3)其餘offer&poll&peek&remove方法
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } //本質區別在於設置了超時時間,超時後選擇不加入,返回false public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; //生產線程堵塞nanos時間,也有可能被喚醒,如果超過nanos時間還未被喚醒,則nanos=0,再次迴圈,就會返回false nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //本質區別在於設置了超時時間,超時後選擇不獲取,返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //通過下標查找直接返回 return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; } public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } void removeAt(final int removeIndex) { final Object[] items = this.items; if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal(); }
ArrayBlockingQueue總結
【1】有界阻塞隊列,先進先出,存取相互排斥
【2】數據結構:靜態數組(容量固定須指定長度,沒有擴容機制,沒有元素的位置也占用空間,被null占位)
【3】ReentrantLock鎖保證互斥性:存取都是同一把鎖,操作的是同一個數組對象,存取相互排斥
【4】阻塞對象(notEmpty【出隊:隊列count=0,無元素可取時,阻塞在該對象上】,notFull【入隊:隊列count=length,放不進元素時,阻塞在該對象上】)
【5】入隊,從隊首開始添加元素,記錄putIndex(到隊尾時設置為0),喚醒notEmpty
【6】出隊,從隊首開始添加元素,記錄takeIndex(到隊尾時設置為0),喚醒notFull
【7】兩個指針都是從隊首向隊尾移動,保證隊列的先進先出原則(亮點:利用指針和數組,形成環狀結構,重覆利用記憶體空間)