阻塞隊列 同步隊列 AQS 原理 數據結構 CLH 共用方式 設計模式 組件 自定義 ...
1、AbstractQueue抽象隊列、BlockingQueue阻塞隊列、Deque雙端隊列
2、Queue FIFO先進先出,
寫入:隊列滿阻塞等待,取出:隊列滿阻塞等待生產
3、使用場景:多線程併發處理、線程池
4、阻塞隊列(BlockingQueue)——四組API(添加、移除、判斷隊列首部的場景)
==1、會拋出異常
==2、有返回值,不拋出異常
==3、阻塞等待(一直阻塞等待)
==4、超時等待
=========拋出異常 add()\remove()\element()=========
//拋出異常
public static void atest(){
//數組類型的阻塞隊列 要指定 隊列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加操作 返回Boolean值
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
/*
java.lang.IllegalStateException: Queue full
隊列已滿 拋出異常
*/
// System.out.println(blockingQueue.add("d"));
//檢測隊首元素
System.out.println("隊首元素"+blockingQueue.element());
//移除操作 返回移除的添加的元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
/*
java.util.NoSuchElementException
元素為空 不能再移除 拋出異常
*/
// System.out.println(blockingQueue.remove());
} public static void atest(){
//數組類型的阻塞隊列 要指定 隊列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加操作 返回Boolean值
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
/*
java.lang.IllegalStateException: Queue full
隊列已滿 拋出異常
*/
// System.out.println(blockingQueue.add("d"));
//移除操作 返回移除的添加的元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
/*
java.util.NoSuchElementException
元素為空 不能再移除 拋出異常
*/
// System.out.println(blockingQueue.remove());
}
=========有返回值,不拋出異常 add()\remove()\element()=========
// 不拋出異常
public static void btest(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加 返回Boolean值 添加成功 返回true
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// 隊列大小為3 繼續添加元素 不能繼續添加 返回false
System.out.println(blockingQueue.offer("d"));
//移除 返回元素
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 隊列在元素全部移除後 繼續移除操作 不能繼續移除 返回null
System.out.println(blockingQueue.poll());
}
=========阻塞等待(一直阻塞等待) add()\remove()\element()=========
// 一直阻塞等待 public static void ctest() throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); //添加元素 blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); // 隊列已滿 如果繼續添加元素 則造成阻塞 一直阻塞等待 程式一直在等// blockingQueue.put("d"); //移除元素 System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); //隊列為空 如果繼續移除元素 則造成阻塞 一直在等待元素來移除 程式一直在等// System.out.println(blockingQueue.take());• }
=========超時等待 add()\remove()\element()=========
// 等待超時退出
public static void dtest() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加元素 offer(元素,時間,單位) offer方法的重載方法
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//隊列已滿 繼續添加元素 等待超過2秒就結束程式
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));
//移除元素 poll(時間,單位) poll方法的重載方法
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//隊列為空 繼續移除元素 等待超過2秒就結束程式
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}
5、同步隊列
==沒有容量(即不長時間存儲元素)
進去一個元素,必須等待取出(take())後才能再往裡邊繼續添加(put())
==例子:
BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();//同步隊列
//添加元素 線程
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"線程1").start();
//移除元素 線程
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"==>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"==>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"==>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"線程2").start();
}
AQS
1、一個用來構建鎖和同步器的框架,定義了鎖的實現機制,並開放出擴展的地方,讓子類去實現。(封裝得很好,但又有子類擴展的地方)
例如:lock(加鎖)時鎖的內部機制:
使用鎖Lock時,AQS開放state欄位,然子類可以根據state欄位來決定是否能夠獲得鎖,對於獲取不到鎖的線程,AQS會自動進行管理,無需子類鎖關心。
2、使用AQS能夠簡單且高效地構造出大量應用廣泛的同步器:ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等,都基於AQS。也可以自定義同步器。
3、AQS底層:
==(1)由 同步隊列 + 條件隊列 聯合實現(CLH隊列鎖)
====一般情況下有同步隊列(雙向鏈表)組成,條件隊列(單向鏈表)不是必須存在的,當程式中存在condition時,才會存在此列表。
====同步隊列管理獲取不到鎖的線程的排隊和釋放
====條件隊列是在一定場景下,對同步隊列的補充(非必須的),如,獲得鎖的線程從空隊列中拿數據(隊列是空的,拿不到數據的),此時,條件隊列會管理該線程,使線程阻塞。
==(2)核心思想:AQS內部維護一個CLH隊列來管理。
====線程請求共用資源時,
如果被請求的共用資源空閑,則將當前請求資源的線程設置為有效線程,並且將共用資源設置為鎖定狀態;
如果被請求的共用資源被占用,則需要CLH隊列鎖實現的機制來實現線程阻塞等待以及線程被喚醒使鎖的分配:即將暫時獲取不到鎖的線程加入到隊列中。(上邊的同步隊列)
==(3)CLH鎖隊列:
====是一個虛擬的雙向隊列(不存在隊列實例,僅存在節點間的關聯關係)
====AQS是將每條請求共用資源的線程封裝成一個CLH鎖隊列的一個個節點(Node)實現鎖的分配。
==(4)AQS中同步隊列的工作流程和數據結構:(結合兩圖理解)
====工作過程:
(1)當前線程獲取同步狀態失敗,同步器會將當線程及等待狀態等信息構成一個Node節點,加入CLH隊列中,放在隊尾,同步器重新設置尾節點。
(2)加入隊列後,會阻塞當前線程
(3)同步狀態被釋放並且同步器重新設置首節點,同步器喚醒等待隊列中第一個節點,讓其再次獲取同步狀態。
====AQS使用一個int成員變數來表示同步狀態,通過內置的FIFO隊列來完成獲取資源線程的排隊工作。
====AQS使用CAS對該同步狀態進行原子性操作實現對其值的修改
private volatile int state;//共用變數,使用volatile修飾保證線程可見性
====狀態信息通過protected類型的getState,setState,compareAndSetState進行操作
//返回同步狀態的當前值
protected final int getState() {
return state;
}
//設置同步狀態的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)將同步狀態值設置為給定值update如果當前同步狀態的值等於expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
4、AQS對資源的共用方式
定義了兩種資源共用方式:
==(1)Exclusive(獨占):又分為公平鎖和非公平鎖
只有一個線程能執行,如ReentrantLock
====公平鎖:按照線程在隊列中的排隊順序, 先到者先拿到鎖
====非公平鎖:當前線程要獲取鎖時,無視隊列內的順序,直接去強鎖,誰搶到了就是誰的。
==(2)Share(共用):
多個線程可以同時執行,如Semaphore\CyclicBarrier\ReadWriteLock\CountDownLatch
==(3)ReentrantReadWriteLock:組合兩種資源共用方式的
====讀鎖運行多個線程同時執行對同一資源進行讀操作
====寫鎖僅允許一個線程能執行對同一資源進行寫操作
==(4)不同自定義的同步器有不同的共用資源方式,自定義同步器在實現時,只需要實現共用資源state的獲取與釋放方式即可。
5、AQS使用的設計模式
(1)基於模板設計模式的
(2)自定義同步器的方式:
==1、繼承AbstractQueuedSynchronizer並重寫指定的方法
重寫方法是指對於共用資源state的獲取和釋放
==2、將AQS組合在自定義同步組件的實現中,並調用其模板方法
這些模板方法會調用上邊繼承重寫的方法
====AQS提高的模板方法
protected boolean tryAcquire(int)//獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
protected boolean tryRelease(int)//獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
protected int tryAcquireShared(int)//共用方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
protected boolean tryReleaseShared(int)//共用方式。嘗試釋放資源,成功則返回true,失敗則返回false。
protected boolean isHeldExclusively()//該線程是否正在獨占資源。只有用到condition才需要去實現它。
====除以上的方法外,AQS類中其它方法不能被重寫,都為final
====例:
=======獨占方式=======
ReentrantLock
(1)state初始化為0,表示未鎖定狀態;
(2)A線程lock()時,會調用tryAcquire()獨占該鎖並state+1;
(3)其它線程tryAcquire()時會失敗,直到A線程unlock()到state=0,即釋放,其它線程才能獲取該鎖。
(4)A線程釋放前,A自己時可以重覆獲取此鎖的(state++),即可重入
(5)A線程釋放,一定要將state回歸為state=0
=======共用方式=======
CountDownLatch,任務分為N個子線程進行執行
(1)state初始化為N(與線程數一致)
(2)N個子線程並行每執行countDown()一次,state CAS減一
(3)所有子線程都執行完成後即state=0,會unpark()主調用線程
(4)主調用線程從await()函數返回,繼續後續的操作
6、AQS組件
(1)Semaphore(信號量):允許多個線程同時訪問
====synchronized和ReentrantLock是一次只允許一個線程訪問某個資源
(2)CountDownLatch(倒計時器):同步工具類,用來協調多個線程間的同步,通常用來控制線程等待,可以讓某個線程等待直到倒計時結束,再開始執行。
(3)CyclicBarrier(迴圈柵欄):可以實現線程間的技術等待,功能更強大複雜。
== Cyclic(可迴圈使用)的Barrier(屏障):
====讓一組線程到達一個屏障(同步點)時被阻塞,直到最後一個線程到達屏障(同步點)時,才會打開屏障,所有被攔截的線程此時才會繼續執行。
====CyclicBarrier的預設構造方法:CyclicBarrier(int parties)
=======parties參數:表示屏障攔截的線程數量
=======每個線程調用await()方法告知CyclicBarrier已到達屏障,然後被阻塞。