java併發的一系列框架和技術主要是由java.util.concurrent 包所提供。包下的所有類可以分為如下幾大類: locks部分:顯式鎖(互斥鎖和速寫鎖)相關; atomic部分:原子變數類相關,是構建非阻塞演算法的基礎; executor部分:線程池相關; collections部分:併發 ...
java併發的一系列框架和技術主要是由java.util.concurrent 包所提供。包下的所有類可以分為如下幾大類:
- locks部分:顯式鎖(互斥鎖和速寫鎖)相關;
- atomic部分:原子變數類相關,是構建非阻塞演算法的基礎;
- executor部分:線程池相關;
- collections部分:併發容器相關;
- tools部分:同步工具相關,如信號量、閉鎖、柵欄等功能
整體實現技術可按照依賴級別分為以下三層:
高層類 |
Lock 同步工具 併發容器 Executor/ExecutorCompletionService |
基礎類 |
AQS 非阻塞數據結構 原子變數類 |
底層原理 |
volatile變數的讀/寫 CAS |
一、volatile
保證線程之間操作的可見性,避免操作的重排序,但不保證原子性(i++)。
由於 Java 記憶體模型(JMM)規定,所有的變數都存放在主記憶體中,而每個線程都有著自己的工作記憶體(高速緩存)。
在當前的Java記憶體模型下,線程可以把變數保存在本地記憶體(比如機器的寄存器)中,而不是直接在主存中進行讀寫。這就可能造成一個線程在主存中修改了一個變數的值,而另外一個線程還繼續使用它在寄存器中的變數值的拷貝,造成數據的不一致。
Volatile修飾的成員變數在每次被線程訪問時,都強迫從共用記憶體中重讀該成員變數的值;當成員變數發生變化時,強迫線程將變化值回寫到共用記憶體。由於使用 volatile 屏蔽掉了 VM 中必要的代碼優化,所以在效率上會低效一些。
二、CAS無鎖演算法
CAS(comapre and swap):CAS是一項樂觀鎖技術,其核心思想為衝突檢測和數據更新。Java對於CAS支持是利用sun.misc.Unsafe這個類通過JNI調用CPU底層指令實現Unsafe.compareAndSwapInt操作。其主要實現思路為:將記憶體值V與舊的預期值E,如果相同則更新為U,不相同則由上層系統迴圈獲取實際值後,再次調用此CAS演算法。最主要的應用就在原子變數類的具體實現中。
|
將記憶體值V與舊的預期值E,如果相同則更新為U,不相同,返回記憶體值給用戶。
|
但CAS存在問題:ABA問題/迴圈時間長開銷大/只能保證一個共用變數的原子操作 ABA:如棧結構(A-B),線程T1要移除A,B變棧頂,,線程T2將結構變為A-C-D,,此時T1進行CAS衝突檢測發現A沒變,但實際上整個棧結構變了,此時進行操作會覆蓋掉C-D,解決辦法是使用原子類AtomicStampedReference來保證整個棧的一致性。
對於線程衝突較輕,使用CAS能夠避免加鎖和釋放鎖的操作,消耗CPU資源。 對於線程衝突較重,CAS容易產生自旋,即不停比較然後失敗重試,浪費CPU。 |
三、AQS及其他同步工具
AbstractQueuedSynchronizer抽象隊列同步器,定義了多線程訪問資源的同步框架。用於構建鎖和其他同步組件CountDownLatch等的基本框架。使用一個volatile(代表共用資源)來維護狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作。如果被請求的共用資源空閑,則將當前請求資源的線程設置為有效的工作線程,並且將共用資源設置為鎖定狀態。如果被請求的共用資源被占用,那麼就需要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列(帶頭節點的雙向非迴圈鏈表)鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。
AQS兩類應用場景:Exclusive(資源獨占,只有一個線程能執行,如ReentrantLock)和Share(資源共用,多個線程可同時執行,如Semaphore/CountDownLatch)
1.CountDownLatch: 倒計時器,同步工具類,允許一個或多個線程,等待其他一組線程完成操作,再繼續執行。在CountDownLatch 上定義兩種操作:CountDown.countDown表示該線程工作結束(計數器減一)、CountDown.await當前線程阻塞,等待其他工作線程結束(計數器為0)
2.CyclicBarrier:線程屏障,同步工具類,允許一組線程相互之間等待,達到一個共同點,再繼續執行。能夠重置並多次使用,並且能夠獲取阻塞線程數量;不會阻塞主線程。
3.Semaphore:信號量,用於控制線程的併發數量。在信號量上我們定義兩種操作: acquire(獲取) 和 release(釋放)。當一個線程調用acquire操作時,它要麼通過成功獲取許可(許可減1),要麼一直等下去,直到有線程釋放許可,或超時。release(釋放)實際上會將許可的值加1,然後喚醒等待的線程。
使用Seamphore(2),創建了多少線程(5),實際就會有多少線程執行(5),只是可同時執行的線程數量會受到限制(2)。但使用線程池(2,5),不管你創建多少線程實際可執行的線程數是一定的(2)。
4.Exchanger:交換者,實現線程間的相互數據交換或通信。提供一個同步點,當兩個線程都達到該同步點時,則進行交換數據,可以多個進行隨機交換,但必須為偶數個。無鎖,通過迴圈 cas 來實現線程安全。eg:String data2 = (String) exchanger.exchange(data1)。
四、非阻塞數據結構
同步集合:Vector、HashTable,同步集合包裝類/Collections.synchronizedMap()和Collections.synchronizedList()---同步集合會對整個May或List加鎖。
ConcurrentHashMap:HashMap的併發級別,通過繼承自ReentrantLock的Segment來對Hahs表進行分段鎖,提高了併發效率
ConcurrentQueue也是通過同樣的方式來提高併發性能的,子類ConcurrentLinkedQueue。例子:多線程賣票。
CopyOnWriteArrayList:寫時複製容器,複製該容器進行寫操作,將當前容器進行Copy,複製出一個新的容器,然後向新的容器里添加元素,添加完元素之後,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行併發的讀,而不需要加鎖,因為在當前讀的容器中不會添加任何元素。所以CopyOnWrite容器是一種讀寫分離的思想,讀和寫對應不同的容器。
BlockingQueue:阻塞隊列,併發容器,沒有元素時-讀取會堵塞,元素滿時-加元素會阻塞。適合消費者生產者模式,其中ExecutorCompletionService就是將LinkedBlockingQueue和Executor結合管理線程返回結果。
ArrayBlockingQueue |
有界隊列,使用一個ReentrantLock 鎖。 |
LinkedBlockingQueue |
無界隊列,內部使用ReentrantLock實現插入鎖(putLock)和取出鎖(takeLock),fixedThreadPool用的這個。 |
DelayQueue |
其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。但是要註意的是,不能將null元素放置到這種隊列中.eg:處理超時的客戶端鏈接、超時的緩存對象 |