隊列實質就是一種存儲數據的結構 通常用鏈表或者數組實現 一般而言隊列具備FIFO先進先出的特性,當然也有雙端隊列(Deque)優先順序隊列 主要操作:入隊(EnQueue)與出隊(Dequeue) BlockingQueue 隊列實質就是一種存儲數據的結構 通常用鏈表或者數組實現 一般而言隊列具備FI ...
隊列實質就是一種存儲數據的結構 通常用鏈表或者數組實現 一般而言隊列具備FIFO先進先出的特性,當然也有雙端隊列(Deque)優先順序隊列 主要操作:入隊(EnQueue)與出隊(Dequeue)
BlockingQueue
隊列實質就是一種存儲數據的結構 通常用鏈表或者數組實現 一般而言隊列具備FIFO先進先出的特性,當然也有雙端隊列(Deque)優先順序隊列 主要操作:入隊(EnQueue)與出隊(Dequeue)
1、ArrayBlockingQueue 由數組支持的有界隊列
2、LinkedBlockingQueue 由鏈接節點支持的可選有界隊列
3、PriorityBlockingQueue 由優先順序堆支持的無界優先順序隊列
4、DelayQueue 由優先順序堆支持的、基於時間的調度隊列
BlockingQueue底層依賴ReenTrantLock 和Condition(條件隊列只能在獨占模式下使用,共用模式不能用)(AQS框架實現)
ArrayBlockingQueue
BlockingQueue<Ball> blockingQueue = new ArrayBlockingQueue<Ball>(1);初始化源碼分析
public ArrayBlockingQueue(int capacity) {
this(capacity, false); -------------false預設創建時非公平鎖
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];-------初始化數組
lock = new ReentrantLock(fair);------創建非公平鎖
notEmpty = lock.newCondition(); ---------------初始化了兩個條件隊列,對應兩個條件對列notEmpty ,notFull
notFull = lock.newCondition();
}
// 兩個條件隊列如下圖:
為什麼會有連兩個條件隊列,因為有put(生產者),和take(消費者) 兩個操作 put的時候要看(Notfull)是不是滿了,take(Notempty)是看是不是空了。
put生產者操作源碼解讀
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //----先上來獲取獨占鎖,才能進入下麵的邏輯。
try {
while (count == items.length)
notFull.await(); // 判斷當前隊列是不是已經滿了,如果count == items.length,那麼這個下線程就要入條件隊列等待
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* 加入條件隊列等待,條件隊列入口
*/
public final void await() throws InterruptedException {
//如果當前線程被中斷則直接拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//把當前節點加入條件隊列
Node node = addConditionWaiter();
//釋放掉已經獲取的獨占鎖資源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在同步隊列中則不斷掛起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//這裡被喚醒可能是正常的signal操作也可能是中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/**
* 走到這裡說明節點已經條件滿足被加入到了同步隊列中或者中斷了
* 這個方法很熟悉吧?就跟獨占鎖調用同樣的獲取鎖方法,從這裡可以看出條件隊列只能用於獨占鎖
* 在處理中斷之前首先要做的是從同步隊列中成功獲取鎖資源
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//走到這裡說明已經成功獲取到了獨占鎖,接下來就做些收尾工作
//刪除條件隊列中被取消的節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//根據不同模式處理中斷
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 這裡的判斷邏輯是:
* 1.如果現在不是中斷的,即正常被signal喚醒則返回0
* 2.如果節點由中斷加入同步隊列則返回THROW_IE,由signal加入同步隊列則返回REINTERRUPT
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 這個方法是把條件節點轉運到CLH同步隊列中並把信號量置為初始值0,返迴轉運成功與否標誌。只有把條件節點轉運到同步隊列才又可能被喚醒。
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}