## Synchronized 本篇文章將圍繞synchronized關鍵字,使用大量圖片、案例深入淺出的描述CAS、synchronized Java層面和C++層面的實現、鎖升級的原理、源碼等 大概觀看時間17分鐘 可以帶著幾個問題去查看本文,如果認真看完,問題都會迎刃而解: 1、synchro ...
Synchronized
本篇文章將圍繞synchronized關鍵字,使用大量圖片、案例深入淺出的描述CAS、synchronized Java層面和C++層面的實現、鎖升級的原理、源碼等
大概觀看時間17分鐘
可以帶著幾個問題去查看本文,如果認真看完,問題都會迎刃而解:
1、synchronized是怎麼使用的?在Java層面是如何實現?
2、CAS是什麼?能帶來什麼好處?又有什麼缺點?
3、mark word是什麼?跟synchronized有啥關係?
4、synchronized的鎖升級優化是什麼?在C++層面如何實現?
5、JDK 8 中輕量級鎖CAS失敗到底會不會自旋?
6、什麼是object monitor?wait/notify方法是如何實現的?使用synchronized時,線程阻塞後是如何在阻塞隊列中排序的?
...
synchronized Java層面實現
synchronized作用在代碼塊或方法上,用於保證併發環境下的同步機制
任何線程遇到synchronized都要先獲取到鎖才能執行代碼塊或方法中的操作
在Java中每個對象有一個對應的monitor對象(監視器),當獲取到A對象的鎖時,A對象的監視器對象中有個欄位會指向當前線程,表示這個線程獲取到A對象的鎖(詳細原理後文描述)
synchronized可以作用於普通對象和靜態對象,當作用於靜態對象、靜態方法時,都是去獲取其對應的Class對象的鎖
synchronized作用在代碼塊上時,會使用monitorentry和monitorexit位元組碼指令來標識加鎖、解鎖
synchronized作用在方法上時,會在訪問標識上加上synchronized
指令中可能出現兩個monitorexit指令是因為當發生異常時,會自動執行monitorexit進行解鎖
正常流程是PC 12-14,如果在此期間出現異常就會跳轉到PC 17,最終在19執行monitorexit進行解鎖
Object obj = new Object();
synchronized (obj) {
}
在上篇文章中我們說過原子性、可見性以及有序性
synchronized加鎖解鎖的位元組碼指令使用屏障,加鎖時共用記憶體從主記憶體中重新讀取,解鎖前把工作記憶體數據寫回主記憶體以此來保證可見性
由於獲取到鎖才能執行相當於串列執行,也就保證原子性和有序性,需要註意的是加鎖與解鎖之間的指令還是可以重排序的
CAS
為了更好的說明synchronized原理和鎖升級,我們先來聊聊CAS
在上篇文章中我們說過,volatile不能保證複合操作的原子性,使用synchronized方法或者CAS能夠保證複合操作原子性
那什麼是CAS呢?
CAS全稱 Compare And Swap 比較並交換,讀取數據後要修改時用讀取的數據和地址上的值進行比較,如果相等那就將地址上的值替換為目標值,如果不相等,通常會重新讀取數據再進行CAS操作,也就是失敗重試
synchronized加鎖是一種悲觀策略,每次遇到時都認為會有併發問題,要先獲取鎖才操作
而CAS是一種樂觀策略,每次先大膽的去操作,操作失敗(CAS失敗)再使用補償措施(失敗重試)
CAS與失敗重試(迴圈)的組合構成樂觀鎖或者說自旋鎖(迴圈嘗試很像在自我旋轉)
併發包下的原子類,依靠Unsafe大量使用CAS操作,比如AtomicInteger的自增
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//var1是調用方法的對象,var2是需要讀取/修改的值在這個對象上的偏移量,var4是自增1
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//var5是通過對象和欄位偏移量獲取到欄位最新值
var5 = this.getIntVolatile(var1, var2);
//cas:var1,var2找到欄位的值 與 var5比較,相等就替換為 var5+var4
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
CAS只能對一個變數進行操作,如果要對多個變數進行操作,那麼只能對外封裝一層(將多個變數封裝為新對象的欄位),再使用原子類中的AtomicReference
不知各位同學有沒有發現,CAS的流程有個bug,就是在讀數據與比較數據之間,如果數據從A被改變到B,再改變到A,那麼CAS也能執行成功
這種場景有的業務能夠接受,有的業務無法接受,這就是所謂的ABA問題
而解決ABA問題的方式比較簡單,可以再比較時附加一個自增的版本號,JDK也提供解決ABA問題的原子類AtomicStampedReference
CAS能夠避免線程阻塞,但如果一直失敗就會一直迴圈,增加CPU的開銷,CAS失敗後重試的次數/時長不好評估
因此CAS操作適用於競爭小的場景,用CPU空轉的開銷來換取線程阻塞掛起/恢復的開銷
鎖升級
早期版本的synchronized會將獲取不到鎖的線程直接掛起,性能不好
JDK 6 時對synchronized的實現進行優化,也就是鎖升級
鎖的狀態可以分為無鎖、偏向鎖、輕量級鎖、重量級鎖
可以暫時把重量級鎖理解為早期獲取不到鎖就讓線程掛起,新的優化也就是輕量級鎖和偏向鎖
mark word
為了更好的說明鎖升級,我們先來聊聊Java對象頭中的mark word
我們下麵的探究都是圍繞64位的虛擬機
Java對象的記憶體由mark word、klass word、如果是數組還要記錄長度、實例數據(欄位)、對其填充(填充到8位元組倍數)組成
mark word會記錄鎖狀態,在不同鎖狀態的情況下記錄的數據也不同
下麵這個表格是從無鎖到重量級鎖mark word記錄的內容
|----------------------------------------------------------------------|--------|--------|
| unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 | 無鎖
|----------------------------------------------------------------------|--------|--------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 | 偏向鎖
|----------------------------------------------------------------------|--------|--------|
| ptr_to_lock_record:62 | lock:2 | 輕量級鎖
|----------------------------------------------------------------------|--------|--------|
| ptr_to_heavyweight_monitor:62 | lock:2 | 重量級鎖
|----------------------------------------------------------------------|--------|--------|
unused 表示還沒使用
identity_hashcode 用於記錄一致性哈希
age 用於記錄GC年齡
biased_lock 標識是否使用偏向鎖,0表示未開啟,1表示開啟
lock 用於標識鎖狀態標誌位,01無鎖或偏向鎖、00輕量級鎖、10重量級鎖
thread 用於標識偏向的線程
epoch 記錄偏向的時間戳
ptr_to_lock_record 記錄棧幀中的鎖記錄(後文介紹)
ptr_to_heavyweight_monitor 記錄獲取重量級鎖的線程
jol查看mark word
比較熟悉mark word的同學可以跳過
瞭解mark word後再來熟悉下不同鎖狀態下的mark word,我使用的是jol查看記憶體
<!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.12</version>
</dependency>
無鎖
各位同學實驗時的mark word可能和我註釋中的不同,我們主要查看鎖標識的值和是否啟用偏向鎖
public void noLock() {
Object obj = new Object();
//mark word 00000001 被unused:1,age:4,biased_lock:1,lock:2使用,001表示0未啟用偏向鎖,01表示無鎖
//01 00 00 00 (00000001 00000000 00000000 00000000)
//00 00 00 00 (00000000 00000000 00000000 00000000)
ClassLayout objClassLayout = ClassLayout.parseInstance(obj);
System.out.println(objClassLayout.toPrintable());
//計算一致性哈希後
//01 b6 ce a8
//6a 00 00 00
obj.hashCode();
System.out.println(objClassLayout.toPrintable());
//進行GC 查看GC年齡 0 0001 0 01 前2位表示鎖狀態01無鎖,第三位biased_lock為0表示未啟用偏向鎖,後續四位則是GC年齡age 1
//09 b6 ce a8 (00001001 10110110 11001110 10101000)
//6a 00 00 00 (01101010 00000000 00000000 00000000)
System.gc();
System.out.println(objClassLayout.toPrintable());
}
輕量級鎖
public void lightLockTest() throws InterruptedException {
Object obj = new Object();
ClassLayout objClassLayout = ClassLayout.parseInstance(obj);
//1334729950
System.out.println(obj.hashCode());
//0 01 無鎖
//01 4e c0 d5 (00000001 01001110 11000000 11010101)
//6a 00 00 00 (01101010 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
Thread thread1 = new Thread(() -> {
synchronized (obj) {
// 110110 00 中的00表示輕量級鎖其他62位指向擁有鎖的線程
//d8 f1 5f 1d (11011000 11110001 01011111 00011101)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
//1334729950
//無鎖升級成輕量級鎖後 hashcode未變 對象頭中沒存儲hashcode 只存儲擁有鎖的線程
//(實際上mark word內容被存儲到lock record中,所以hashcode也被存儲到lock record中)
System.out.println(obj.hashCode());
}
}, "t1");
thread1.start();
//等待t1執行完 避免 發生競爭
thread1.join();
//輕量級鎖 釋放後 mark word 恢覆成無鎖 存儲哈希code的狀態
//01 4e c0 d5 (00000001 01001110 11000000 11010101)
//6a 00 00 00 (01101010 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
Thread thread2 = new Thread(() -> {
synchronized (obj) {
//001010 00 中的00表示輕量級鎖其他62位指向擁有鎖的線程
//28 f6 5f 1d (00101000 11110110 01011111 00011101)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
}
}, "t2");
thread2.start();
thread2.join();
}
偏向鎖
public void biasedLockTest() throws InterruptedException {
//延遲讓偏向鎖啟動
Thread.sleep(5000);
Object obj = new Object();
ClassLayout objClassLayout = ClassLayout.parseInstance(obj);
//1 01 匿名偏向鎖 還未設置偏向線程
//05 00 00 00 (00000101 00000000 00000000 00000000)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
synchronized (obj) {
//偏向鎖 記錄 線程地址
//05 30 e3 02 (00000101 00110000 11100011 00000010)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
}
Thread thread1 = new Thread(() -> {
synchronized (obj) {
//膨脹為輕量級 0 00 0未啟用偏向鎖,00輕量級鎖
//68 f4 a8 1d (01101000 11110100 10101000 00011101)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
}
}, "t1");
thread1.start();
thread1.join();
}
重量級鎖
public void heavyLockTest() throws InterruptedException {
Object obj = new Object();
ClassLayout objClassLayout = ClassLayout.parseInstance(obj);
Thread thread1 = new Thread(() -> {
synchronized (obj) {
//第一次 00 表示 輕量級鎖
//d8 f1 c3 1e (11011000 11110001 11000011 00011110)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
//用debug控制t2來競爭
//第二次列印 變成 10 表示膨脹為重量級鎖(t2競爭) 其他62位指向監視器對象
//fa 21 3e 1a (11111010 00100001 00111110 00011010)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
}
}, "t1");
thread1.start();
Thread thread2 = new Thread(() -> {
synchronized (obj) {
//t2競爭 膨脹為 重量級鎖 111110 10 10為重量級鎖
//fa 21 3e 1a (11111010 00100001 00111110 00011010)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
}
}, "t2");
thread2.start();
thread1.join();
thread2.join();
//10 重量級鎖 未發生鎖降級
//3a 36 4d 1a (00111010 00110110 01001101 00011010)
//00 00 00 00 (00000000 00000000 00000000 00000000)
System.out.println(Thread.currentThread().getName() + ":");
System.out.println(objClassLayout.toPrintable());
}
輕量級鎖
輕量級鎖的提出是為了減小傳統重量級鎖使用互斥量(掛起/恢複線程)所產生的開銷
面對較少的競爭場景時,獲取鎖的時間總是短暫的,而掛起線程用戶態、內核態的開銷比較大,使用輕量級鎖減少開銷
那麼輕量級鎖是如何實現的呢?
輕量級鎖主要由lock record、mark word、CAS來實現,lock record存儲線上程的棧幀中,來記錄鎖的信息
加鎖
查看對象是不是無鎖狀態,如果對象是無鎖狀態,會將mark word複製到lock record鎖記錄中的displaced mark word
然後再嘗試使用CAS嘗試將mark word中部分內容替換指向這個lock record,如果成功表示獲取鎖成功
如果對象持有鎖,會查看持有鎖的線程是不是當前線程,這種可重入的情況下lock record中記錄不再是mark word而是null
可重入的情況下,只需要進行自增計數即可,解鎖時遇到null的lock record則扣減
如果CAS失敗或者持有鎖的線程不是當前線程,就會觸發鎖膨脹
關鍵代碼如下:
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
//當前對象的mark word
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
//如果當前對象是無鎖狀態
if (mark->is_neutral()) {
//將mark word複製到lock record
lock->set_displaced_header(mark);
//CAS將當前對象的mark word內容替換為指向lock record
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
} else
//如果有鎖 判斷是不是當前線程獲取鎖
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
//可重入鎖 複製null
lock->set_displaced_header(NULL);
return;
}
//有鎖並且獲取鎖的線程不是當前線程 或者 CAS失敗 進行膨脹
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
解鎖
查看lock record中複製的內容是不是空,是空說明是可重入鎖
不為空則查看mark word是否指向lock record,如果指向則CAS嘗試將mark word記錄指向lock record替換為lock record中的displaced mark word(也就是原來的mark word)
如果mark word不指向lock record 或者 CAS失敗了 說明存在競爭,其他線程加鎖失敗讓mark word指向重量級鎖,直接膨脹
關鍵代碼如下:
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
//獲取複製的mark word
markOop dhw = lock->displaced_header();
markOop mark ;
//如果為空 說明是可重入
if (dhw == NULL) {
// Recursive stack-lock.
// Diagnostics -- Could be: stack-locked, inflating, inflated.
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}
mark = object->mark() ;
//如果mark word指向lock record
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
//嘗試CAS將指向lock record的mark word替換為原來的內容
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
//未指向當前lock record或者CAS失敗則膨脹
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
偏向鎖
hotspot開發人員測試,在某些場景下,總是同一個線程獲取鎖,在這種場景下,希望用更小的開銷來獲取鎖
當開啟偏向鎖後,如果是無鎖狀態會將mark word改為偏向某個線程ID,以此標識這個線程獲取鎖(鎖偏向這個線程)
如果正處於偏向鎖,遇到競爭可能膨脹為輕量級鎖,如果要存儲一致性哈希等情況也會膨脹為重量級鎖
JDK8預設開啟偏向鎖,在高版本JDK預設不開啟偏向鎖,可能因為偏向鎖的維護超過收益,我們也不深入進行研究
重量級鎖
object monitor
使用object monitor對象來實現重量級鎖
object monitor中使用一些欄位記錄信息,比如:object欄位用於記錄鎖的那個對象,header欄位用於記錄鎖的那個對象的mark word、owner欄位用於記錄持有鎖的線程
object monitor使用阻塞隊列來存儲競爭不到鎖的線程,使用等待隊列來存儲調用wait進入等待狀態的線程
阻塞隊列和等待隊列類比著併發包下的AQS和Condition
object monitor使用cxq棧和entry list隊列來實現阻塞隊列,其中cxq棧中存儲有競爭的線程,entry list存儲已經競爭失敗較穩定的線程;使用wait set實現等待隊列
當線程調用wait時,進入wait set等待隊列
而調用notify時,只是將等待隊列的隊頭節點加入cxq,並沒有喚醒該線程去競爭
真正的喚醒線程是在釋放鎖時,去穩定的隊列entry list中喚醒隊頭節點去競爭,而此時被喚醒的節點並不一定能搶到鎖,因為線程進入cxq時還會通過自旋來搶鎖,以此來實現非公平鎖
如果穩定的entry list中沒有存儲線程,會將cxq棧中存儲的線程全存儲到entry list中再去喚醒,此時越晚進入cxq的線程反而會越早被喚醒(cxq棧先進後出)
其實實現與AQS類似,來看這樣一段代碼:
t1-t6獲取同一把鎖,使用t1線程進行阻塞一會,後續t2-t6線程按照順序啟動,由於自轉獲取不到鎖,它們會被依次放入cxq:t2,t3,t4,t5,t6
在t1釋放鎖時,由於entry list中沒有線程,於是將cxq中的線程存入entry list:t6,t5,t4,t3,t2
,再喚醒t6
由於後續沒有線程進行競爭,因此最終執行順序為t1,t6,t5,t4,t3,t2
Object obj = new Object();
new Thread(() -> {
synchronized (obj) {
try {
//輸入阻塞
//阻塞的目的是讓 其他線程自旋完未獲取到鎖,進入cxq棧
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 獲取到鎖");
}
}, "t1").start();
//sleep控制線程阻塞的順序
Thread.sleep(50);
new Thread(() -> {
synchronized (obj) {
System.out.println(Thread.currentThread().getName() + " 獲取到鎖");
}
}, "t2").start();
Thread.sleep(50);
new Thread(() -> {
synchronized (obj) {
System.out.println(Thread.currentThread().getName() + " 獲取到鎖");
}
}, "t3").start();
Thread.sleep(50);
new Thread(() -> {
synchronized (obj) {
System.out.println(Thread.currentThread().getName() + " 獲取到鎖");
}
}, "t4").start();
Thread.sleep(50);
new Thread(() -> {
synchronized (obj) {
System.out.println(Thread.currentThread().getName() + " 獲取到鎖");
}
}, "t5").start();
Thread.sleep(50);
new Thread(() -> {
synchronized (obj) {
System.out.println(Thread.currentThread().getName() + " 獲取到鎖");
}
}, "t6").start();
大致瞭解了下object monitor,我們再來看看膨脹和自旋
膨脹
在膨脹時會有四種狀態,分別是
inflated 已膨脹:mark word鎖標誌為10(2)說明已膨脹,直接返回object monitor
inflation in progress 膨脹中:如果已經有其他線程在膨脹了,就等待一會迴圈後查看狀態進入已膨脹的邏輯
stack-locked 輕量級鎖膨脹
neutral 無鎖膨脹
輕量級鎖和無鎖膨脹邏輯差不多,都是需要創建object monitor對象,並且set一些屬性進去(比如:mark word、鎖的哪個對象、哪個線程持有鎖...),最後再使用CAS去替換mark word指向object monitor
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
for (;;) {
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
// The mark can be in one of the following states:
// * Inflated - just return
// * Stack-locked - coerce it to inflated
// * INFLATING - busy wait for conversion to complete
// * Neutral - aggressively inflate the object.
// * BIASED - Illegal. We should never see this
// CASE: inflated
// 已膨脹:查看 mark word 後兩位是否為2 是則膨脹完 返回monitor對象
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
// CASE: inflation in progress - inflating over a stack-lock.
// 膨脹中: 等待一會 再迴圈 從膨脹完狀態退出
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
// CASE: stack-locked
//輕量級鎖膨脹
if (mark->has_locker()) {
//創建ObjectMonitor
ObjectMonitor * m = omAlloc (Self) ;
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
//cas將mark word替換指向ObjectMonitor
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
//cas 失敗 則說明其他線程膨脹成功,刪除當前monitor 退出
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; // Interference -- just retry
}
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
//成功 設置mark word
m->set_header(dmw) ;
//設置持有鎖的線程
m->set_owner(mark->locker());
//設置鎖的是哪個對象
m->set_object(object);
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
//修改mark word對象頭信息 鎖狀態 2
object->release_set_mark(markOopDesc::encode(m));
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
// CASE: neutral
//無鎖膨脹 與輕量級鎖膨脹類似,也是創建monitor對象並註入屬性,只是很多屬性為空
assert (mark->is_neutral(), "invariant");
ObjectMonitor * m = omAlloc (Self) ;
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
//cas 更新 mark word 失敗迴圈等待 成功返回
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
}
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite neutral) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
}
自旋
膨脹過後,在最終掛起前會進行固定自旋和自適應自旋
固定自旋預設10+1次
自適應自旋一開始5000次,如果最近競爭少獲取到鎖就將自旋次數調大,如果最近競爭大獲取不到鎖就將自旋次數調小
int ObjectMonitor::TrySpin_VaryDuration (Thread * Self) {
// Dumb, brutal spin. Good for comparative measurements against adaptive spinning.
int ctr = Knob_FixedSpin ;
if (ctr != 0) {
while (--ctr >= 0) {
if (TryLock (Self) > 0) return 1 ;
SpinPause () ;
}
return 0 ;
}
//先進行固定11自旋次數 獲取到鎖返回,沒獲取到空轉
for (ctr = Knob_PreSpin + 1; --ctr >= 0 ; ) {
if (TryLock(Self) > 0) {
// Increase _SpinDuration ...
// Note that we don't clamp SpinDuration precisely at SpinLimit.
// Raising _SpurDuration to the poverty line is key.
int x = _SpinDuration ;
if (x < Knob_SpinLimit) {
if (x < Knob_Poverty) x = Knob_Poverty ;
_SpinDuration = x + Knob_BonusB ;
}
return 1 ;
}
SpinPause () ;
}
//自適應自旋 一開始5000 如果成功認為此時競爭不大 自旋獲取鎖成功率高 增加重試次數 如果失敗則減少
//...
}
總結
本篇文章圍繞synchronized,深入淺出的描述CAS、synchronized在Java層面和C++層面的實現、鎖升級原理、案例、源碼等
synchronized用於併發下的需要同步的場景,使用它可以滿足原子性、可見性以及有序性,它可以作用在普通對象和靜態對象,作用於靜態對象時是去獲取其對應的Class對象的鎖
synchronized作用在代碼塊上時,使用monitorentry、monitorexit位元組碼指令來標識加鎖、解鎖;作用在方法上時,在訪問標識加鎖synchronized關鍵字,虛擬機隱式使用monitorentry、monitorexit
CAS 比較並替換,常與重試機制實現樂觀鎖/自旋鎖,優點是能夠在競爭小的場景用較小的開銷取代線程掛起,但帶來ABA問題、無法預估重試次數空轉CPU的開銷等問題
輕量級鎖的提出是為了在交替執行/競爭少的場景,用更小的開銷取代互斥量;使用CAS和lock record實現
輕量級鎖加鎖時,如果是無鎖則複製mark word到lock record中,再CAS將對象mark word替換為指向該lock record,失敗則膨脹;如果已經持有鎖則判斷持有鎖的線程是不是當前線程,是則累加次數,不是當前線程則膨脹
輕量級鎖解鎖時,查看lock record複製的是不是null,是則說明是可重入鎖,次數減一;不是則CAS把複製過來的mark word替換回去,如果替換失敗說明其他線程競爭,mark word已經指向object monitor,去指向重量級鎖的釋放
偏向鎖的提出是為了在經常一條線程執行的場景下,用更小的開銷來取代CAS的開銷,只不過高版本不再預設開啟
重量級鎖由object monitor來實現,object monitor中使用cxq、entry list來構成阻塞隊列,wait set來構成等待隊列
當執行wait方法時,線程構建為節點加入wait set;當執行notify方法時,將wait set隊頭節點加入cxq,在釋放鎖時才去喚醒entry list隊頭節點競爭鎖,即使沒搶到鎖構建為節點加入cxq時還會自旋,因此並不是entry list隊頭節點就一定能搶到鎖,以此來實現非公平鎖;當entry list為空時,將cxq棧中的節點加入entry list隊列(後進入cxq的節點會被先喚醒)
在膨脹為重量級鎖時有四種情況,如果狀態為已膨脹則直接返回object monitor對象;如果狀態為膨脹中,說明其他線程正在膨脹,等待會,下次迴圈進入已膨脹的邏輯;如果狀態為輕量級鎖膨脹或無鎖膨脹,都會去創建object monitor對象,set一些重要屬性,並CAS去將mark word替換為指向該object monitor
重量級鎖在最終掛起前會進行固定自旋和自適應自旋(最近競爭小就增加自旋次數;競爭多就減少自旋次數)
最後(不要白嫖,一鍵三連求求拉~)
本篇文章被收入專欄 由點到線,由線到面,深入淺出構建Java併發編程知識體系,感興趣的同學可以持續關註喔
本篇文章筆記以及案例被收入 gitee-StudyJava、 github-StudyJava 感興趣的同學可以stat下持續關註喔~
案例地址:
Gitee-JavaConcurrentProgramming/src/main/java/B_synchronized
Github-JavaConcurrentProgramming/src/main/java/B_synchronized
有什麼問題可以在評論區交流,如果覺得菜菜寫的不錯,可以點贊、關註、收藏支持一下~
關註菜菜,分享更多乾貨,公眾號:菜菜的後端私房菜
本文由博客一文多發平臺 OpenWrite 發佈!