PriorityBlockingQueue介紹 【1】PriorityBlockingQueue是一個無界的基於數組的優先順序阻塞隊列,數組的預設長度是11,也可以指定數組的長度,且可以無限的擴充,直到資源消耗盡為止,每次出隊都返回優先順序別最高的或者最低的元素。預設情況下元素採用自然順序升序排序,當然 ...
PriorityBlockingQueue介紹
【1】PriorityBlockingQueue是一個無界的基於數組的優先順序阻塞隊列,數組的預設長度是11,也可以指定數組的長度,且可以無限的擴充,直到資源消耗盡為止,每次出隊都返回優先順序別最高的或者最低的元素。預設情況下元素採用自然順序升序排序,當然我們也可以通過構造函數來指定Comparator來對元素進行排序。需要註意的是PriorityBlockingQueue不能保證同優先順序元素的順序。
【2】優先順序隊列PriorityQueue: 隊列中每個元素都有一個優先順序,出隊的時候,優先順序最高的先出。
PriorityBlockingQueue的源碼分析
【1】屬性值
//預設容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; //最大容量設定 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //存放數據的數組 private transient Object[] queue; //元素個數 private transient int size; //排序規則(比較器) private transient Comparator<? super E> comparator; //獨占鎖 private final ReentrantLock lock; //隊列為空的時候的阻塞隊列 private final Condition notEmpty; //用於分配的CAS自旋鎖 private transient volatile int allocationSpinLock; //只用於序列化的普通優先隊列 private PriorityQueue<E> q;
【2】構造函數
//沒有指定容量,則容量預設11 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //有指定容量則容量為指定數值 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } //初始化所需要的屬性值 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
【3】核心方法分析
1)核心擴容函數
//擴容函數 private void tryGrow(Object[] array, int oldCap) { lock.unlock(); //必須釋放然後重新獲得主鎖,這一步的意義在於所有操作共用一把鎖,在進行擴容時(因為寫已滿),釋放鎖(不能寫,要等待擴容完才能寫),提供給讀操作 Object[] newArray = null; // CAS 輕量級鎖加鎖,避免併發擴容 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 擴容步長,舊值小於64時,變為兩倍+2。大於64時,變為1.5倍。 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // 超過最大容量,記憶體溢出 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 創建新數組 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 併發擴容時,線程讓出 cpu 執行時間,給其他線程執行,自己稍後執行,原因:加鎖不成功必然有其他線程也在擴容,在等待過程中讓出資源給其他線程利用 if (newArray == null) Thread.yield(); //重新加鎖 lock.lock(); //變更記憶體指向,利用記憶體拷貝複製舊數據 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
2)入隊方法
public void put(E e) { offer(e); // never need to block } public boolean add(E e) { return offer(e); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) //自旋擴容 tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //根據比較器採用填充的方式 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
4)隊列填入方式
代碼展示
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } //雷同上面的不過比較器採用傳入的 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
圖解說明
1.利用了滿二叉樹的理念,(k - 1) >>> 1可以獲得存入節點的父節點下標,然後進行比較。若判斷應該上升,則將父節點置於k存儲的地方。
2.重覆迴圈,知道二叉樹root節點,或者已經找到了合適的位置
4)出隊方法
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; //將頭節點取出返回 E result = (E) array[0]; //將末尾節點當做向頭節點位置存入的節點 E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
5)隊列修正方式
代碼展示
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; //過濾掉最後一層的元素(以為滿二叉樹中,最後一層占據的元素就是n/2) int half = n >>> 1; // loop while a non-leaf while (k < half) { //獲取頭節點的左節點 int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; //進行左右節點的比較,取小的一邊作為比較,以為優先隊列要確保頭節點是最小的(換而言之,保證子樹下麵的頭節點為子樹裡面最小的即可) if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } } private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n,Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
圖解說明
1.採用尾結點代替頭節點,然後利用下沉的方式來修正優先隊列裡面的數據。
2.下沉限制在倒數第二層,以為倒數第二層就會與倒數第一層進行比較了,所以應該進行限制(下標位置超出倒數第二層的最大下標就應該停止了)
3.其次看的時候,可以把左右子樹都當做一個小的滿二叉樹,不斷逐層劃分,這樣條理更清晰。
PriorityBlockingQueue總結
【1】一個支持優先順序排序的無界阻塞隊列,優先順序高的先出隊,優先順序低的後出隊
【2】數據結構:數組+二叉堆(預設容量11,可指定初始容量,會自動擴容,最大容量是(Integer.MAX_VALUE - 8))
【3】鎖:ReentrantLock,存取是同一把鎖
【4】阻塞對象:NotEmpty,出隊,隊列為空時阻塞
【5】入隊,不阻塞,永遠返回成功,無界;根據比較器進行堆化(排序)自下而上,如果比較器為 null,則按照自然順序排序,傳入比較器對象就按照比較器的順序排序。
【6】出隊,優先順序最高的元素在堆頂(彈出堆頂元素),彈出前比較兩個子節點再進行堆化(自上而下)。
【7】應用場景:1.業務辦理排隊叫號,VIP客戶插隊;2.電商搶購活動,會員級別高的用戶優先搶購到商品;