### 前言 上篇文章[10分鐘從源碼級別搞懂AQS(AbstractQueuedSynchronizer)](https://juejin.cn/post/7273506068104478760)說到JUC併發包中的同步組件大多使用AQS來實現 本篇文章通過AQS自己來實現一個同步組件,並從源碼級 ...
前言
上篇文章10分鐘從源碼級別搞懂AQS(AbstractQueuedSynchronizer)說到JUC併發包中的同步組件大多使用AQS來實現
本篇文章通過AQS自己來實現一個同步組件,並從源碼級別聊聊JUC併發包中的常用同步組件
本篇文章需要的前置知識就是AQS,如果不瞭解AQS的同學可以看上一篇文章哈~
閱讀本篇文章大概需要13分鐘
自定義同步組件
為了更容易理解其他同步組件,我們先來使用AQS自己來實現一個常用的可重入鎖
AQS模板方法流程是固定的,我們主要只需要來實現它的嘗試獲取同步狀態和嘗試釋放同步狀態方法即可
首先我們先規定要實現的可重入鎖是獨占式的
規定同步狀態一開始為0,當有線程獲取鎖成功同步狀態就為1,當這個線程重入時就累加同步狀態
規定釋放同步狀態時每次扣減1個同步狀態,只有當同步狀態扣減到0時,才是真正的釋放獨占鎖
我們使用一個內部類Sync 來繼承AQS 並重寫tryAcquire
嘗試獲取同步狀態、tryRelease
嘗試釋放同步狀態、isHeldExclusively
判斷當前線程是否持有同步狀態(等待、通知時會用到該方法)
static class Sync extends AbstractQueuedSynchronizer {
/**
* 判斷當前線程是否持有同步狀態
*
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
在獲取同步狀態中
- 先判斷是否有同步狀態(即同步狀態是否為0),如果有同步狀態就用CAS去獲取(0->1),成功就設置當前線程為獲取同步狀態的線程
- 如果沒有同步狀態(即同步狀態不為0) ,就查看獲取同步狀態的線程是否為當前線程,如果是當前線程則說明此次是重入,累加重入次數
- 其他情況說明未獲取到同步狀態,返回false 後續走AQS流程(構建節點加入AQS)
/**
* 嘗試獲取同步狀態
*
* @param arg 獲取同步狀態的數量
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
//1.獲取同步狀態
int state = getState();
//2.如果有同步狀態則CAS替換 0->1
if (state == 0) {
if (compareAndSetState(state, 1)) {
//替換成功 說明獲取到同步狀態 設置當前獲取同步狀態線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
} else if (getExclusiveOwnerThread() == Thread.currentThread()) {
//3.沒有同步狀態 查看獲取同步資源的線程是否為當前線程 可重入 累加重入次數
setState(state + arg);
return true;
}
//其他情況就是沒獲取到同步狀態
return false;
}
在釋放同步狀態中
只有當同步狀態要改成0時才是真正釋放,否則情況情況下就是重入扣減次數
/**
* 嘗試釋放同步狀態
*
* @param arg 釋放同步狀態的數量
* @return
*/
@Override
protected boolean tryRelease(int arg) {
//目標狀態
int targetState = getState() - arg;
//真正釋放鎖
if (targetState == 0) {
setExclusiveOwnerThread(null);
setState(targetState);
return true;
}
//其他情況 扣減狀態
setState(targetState);
return false;
}
使用內部類實現AQS的方法後,我們在自定義同步組件類中去實現Lock介面,並用內部類實現AQS的方法去實現Lock介面的方法
將要獲取、釋放的同步狀態都設置成1,對應響應中斷、超時的方法就用AQS中對應的方法即可
public class MySynchronizedComponent implements Lock {
public MySynchronizedComponent() {
sync = new Sync();
}
private Sync sync;
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.new ConditionObject();
}
}
實際上我們只需要去實現嘗試獲取、釋放同步狀態方法就能夠完成自己的同步組件,這就是使用AQS帶來的好處
代碼案例可以去git倉庫獲取,放在本文最後
ReentrantLock
ReentrantLock是併發包中提供的可重入鎖,它除了能夠實現synchronized的功能外還可以響應中斷、超時、實現公平鎖等,其底層也是通過AQS來實現的
ReentrantLock的功能與synchronized類似,可重入的獨占鎖,用於保證併發場景下同步操作
使用時需要顯示加鎖、解鎖,常用格式如下:
reentrantLock.lock();
try{
//....
}finally {
reentrantLock.unlock();
}
finally中最先去解鎖,並且加鎖要放在try塊的最外層,並保證加鎖和try塊之間不會拋出異常
加鎖不放在try中是因為加鎖實現未知可能拋出不受檢查unchecked的異常,當加鎖拋出異常時,後續finally塊解鎖也會拋出非法監視器的異常從而導致覆蓋
加鎖和try塊之間如果拋出異常,那麼就無法執行解鎖了
ReentrantLock除了提供基本的同步功能,還提供響應中斷、超時的API,同學們可以私下去查看
熟悉ReentrantLock實現的同學,可能看上面自定義同步組件的代碼很熟悉,其實就是參考ReentrantLock非公平鎖寫的
ReentrantLock中使用內部類Sync來繼承AQS,同時內部類NonfairSync和FairSync來繼承Sync去實現非公平、公平的獲取同步狀態
非公平鎖嘗試獲取同步狀態 流程類似就不過多描述
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
那公平鎖如何來實現獲取同步狀態呢?
其實看過上篇AQS文章的同學就知道了,在上篇文章中已經說過
只需要在嘗試獲取同步狀態前加上一個條件:隊列中是否有前置任務(即在隊列中FIFO排隊獲取)
公平鎖也是這麼去實現的,前置條件hasQueuedPredecessors
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantReadWriteLock
功能與實現
ReentrantReadWriteLock在ReentrantLock功能的基礎上,提供讀寫鎖的功能,讓鎖的粒度更細
在一些讀多寫少的場景下是允許同時讀的,允許多個線程獲取,其實想到了AQS的共用式,讀鎖也就是共用式
在讀讀的場景下,都是讀鎖/共用鎖,不會進行阻塞
在讀寫、寫讀、寫寫的場景下,都會進行阻塞
比如要獲取寫鎖時,需要等待讀鎖、寫鎖都解鎖;要獲取讀鎖時,需要等待寫鎖解鎖
ReentrantReadWriteLock 在 ReentrantLock 的基礎上增加ReadLock
和WriteLock
分別作為讀鎖和寫鎖
實際上讀鎖就是共用鎖、寫鎖就是獨占鎖,在實現加鎖、解鎖的方法時分別調用共用式、獨占式的獲取、釋放同步狀態即可
在構造時,讀寫鎖中實際使用的都是同一個AQS
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
//讀鎖構造
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
//寫鎖構造
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
即同步狀態會被讀寫鎖共用,那麼它們如何查看/修改自己的那部分同步狀態呢?
在讀寫鎖中,同步狀態被一分為二,高16位的同步狀態是讀鎖的,低16位的同步狀態是寫鎖的
當線程獲取寫鎖時,寫狀態+1,由於寫狀態在低位,相當於同步狀態+1
當線程獲取讀鎖時,讀狀態+1,由於讀狀態在高位,相當於同步狀態+(1<<16)
寫鎖獲取
寫鎖的獲取實現在sync.tryAcquire
中 sync可以是公平也可以是非公平,實際上是獨占式的獲取
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//得到同步狀態c
int c = getState();
//得到寫狀態(同步狀態低16位 與上 全1)
int w = exclusiveCount(c);
if (c != 0) {
//同步狀態不為0,寫狀態為0,說明讀狀態不為0,讀鎖已經被獲取,此時獲取寫鎖失敗
//同步狀態不為0,寫狀態也不為0,查看當前線程是否是獲取寫鎖的線程,不是的話獲取寫鎖失敗
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//只有當前線程獲取過寫鎖才能進入這裡
//如果原來的寫狀態+這次重入的寫狀態 超過了 同步狀態的0~15位 則拋出異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//設置同步狀態 因為寫狀態在低16位所以不用左移 (重入累加)
setState(c + acquires);
return true;
}
//同步狀態為0 無鎖時
//writerShouldBlock在非公平鎖下返回false 在公平鎖下查看是否有前驅任務
//如果CAS失敗則返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//CAS成功則 設置當前線程為獲得獨占鎖(寫鎖)的線程
setExclusiveOwnerThread(current);
return true;
}
查看源碼可以知道:
- 當有鎖時(同步狀態不為0情況),如果只有讀鎖(沒有寫鎖),那麼直接失敗;如果只有寫鎖則查看當前線程是否為獲取寫鎖的線程(重入情況)
- 當無鎖時進行CAS獲取寫鎖,成功則設置獲取寫鎖的線程,失敗則返回
根據源碼分析可以知道,寫鎖允許重入,並且獲取寫鎖時,如果有讀鎖會被阻塞
寫鎖釋放
寫鎖的釋放實現在sync.tryRelease
中
protected final boolean tryRelease(int releases) {
//判斷當前線程是不是獲取寫(獨占)鎖的線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//新狀態
int nextc = getState() - releases;
//如果新狀態低16位為0(沒有寫鎖)就設置獲取寫鎖的線程為空,然後設置同步狀態,再返回
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
釋放其實也類似,只有當寫狀態為0時才是真正釋放,其他情況都是扣減重入次數
讀鎖獲取
讀鎖的獲取也就是共用式的獲取
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
//同步狀態
int c = getState();
//exclusiveCount 為獲取寫鎖狀態 低16位全與1
//如果有寫鎖 並且 獲取寫鎖的線程不是當前線程 則失敗(說明允許同一線程獲取寫鎖再獲取讀鎖)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//獲取讀狀態 (同步狀態右移16位)
int r = sharedCount(c);
//讀沒被阻塞 沒超過最大值 且CAS成功 記錄信息 返回成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
在讀鎖中允許同一線程獲取寫鎖再獲取讀鎖
在某些場景下要先寫數據再讀數據,比如:
- 獲取寫鎖
- 寫數據
- 釋放寫鎖
- 使用(讀)數據
這樣會導致釋放完寫鎖後,其他線程可以獲取寫鎖,從而導致第四步會出現臟讀
正確的用法應該在釋放寫鎖前獲取讀鎖:
- 獲取寫鎖
- 寫數據
- 獲取讀鎖
- 釋放寫鎖
- 讀數據
這樣其他線程獲取寫鎖時因為都讀鎖會被阻塞,而其他線程需要讀時又不會被阻塞
在讀多寫少的場景,讀寫鎖粒度更細,讀讀不阻塞,併發性能更好
信號量
功能
信號量用於控制同時訪問資源的線程數量
線程訪問資源時需要先拿到信號量才能訪問,訪問完釋放信號量,信號量允許同時N個線程獲取
下麵是控制同時只能有2個線程獲取到信號量
//初始化信號量
Semaphore semaphore = new Semaphore(2);
//每次只有兩個線程能夠獲取到信號量執行
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
executor.execute(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"獲得資源");
//執行任務
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(Thread.currentThread().getName()+"釋放資源======");
semaphore.release();
}
});
}
executor.shutdown();
實現
熟悉AQS的同學應該可以猜到信號量其實就是通過共用式實現的
信號量構造時提供初始化信號量的數量,實際上就是初始化同步狀態,比如設置2個信號量就是設置同步狀態為2;還可以在構造中設置公平、非公平
在獲取信號量時,使用響應中斷的共用式,在非公平情況下執行nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取同步狀態
int available = getState();
//目標同步狀態
int remaining = available - acquires;
//沒有信號量 或 CAS成功 都會返回目標同步狀態 為負數時獲取失敗
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
在獲取時實際上就是扣減要獲取的信號量,可能多個線程同時獲取信號量,使用CAS+失敗重試保證原子性,直到沒有信號量或CAS成功
在釋放信號量時實際就是加上釋放的信號量,可能多個線程同時釋放信號量,因此釋放時使用CAS+失敗重試保證原子性
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
CountDownLatch
CountDownLatch 相當於一個計數器,在構造時設置計數數量
功能
調用countDown
方法會對數量進行自減
調用await
方法時,如果還有數量沒被扣減完,則會阻塞,直到數量都被扣減完
當一個線程執行N個任務,或者多個線程執行一個任務時,要等待它們執行完再進行下一步操作時,就可以使用CountDownLatch
//初始化10
CountDownLatch countDownLatch = new CountDownLatch(10);
//固定線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++) {
final int index = i;
executor.execute(() -> {
System.out.println(Thread.currentThread() + "處理任務" + index);
//執行任務...
//數量-1
countDownLatch.countDown();
});
}
//計數量為0時才可以繼續執行
countDownLatch.await();
System.out.println("處理完任務");
executor.shutdown();
實現
其實它的實現與信號量類似,也是通過共用式
在構造中設置初始值時,實際上就是在設置同步狀態
當執行countDown
扣減數量時,實際上就是在扣減同步狀態 ,由於可能多線程同時執行,使用CAS+失敗重試保證扣減同步狀態成功
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
執行await
時,實際就是判斷同步狀態是否為0,不是則說明有的線程還未執行完任務,阻塞等待
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
CyclicBarrier
cyclic Barrier 是一個可迴圈使用的屏障,它常常被用來和countdownlatch作比較
它就像一個屏障,讓線程執行完任務後遇到屏障阻塞,直到所有線程都執行完任務(都到達屏障),並且它是可重覆使用的
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("所有線程到達屏障後,優先執行構造規定的runnable");
});
Thread t1 = new Thread(() -> {
//執行任務
task(cyclicBarrier);
}, "t1");
Thread t2 = new Thread(() -> {
//執行任務
task(cyclicBarrier);
}, "t2");
Thread t3 = new Thread(() -> {
//執行任務
task(cyclicBarrier);
}, "t3");
t1.start();
t2.start();
t3.start();
task方法中會執行await阻塞直到所有線程到達屏障
private static void task(CyclicBarrier cyclicBarrier) {
System.out.println(Thread.currentThread() + "執行任務...");
try {
TimeUnit.SECONDS.sleep(1);
cyclicBarrier.await();
System.out.println("所有線程都執行完, " + Thread.currentThread() + "走出屏障");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
cyclic barrier會記錄需要多少線程到達屏障,並且通過代來達到重覆使用
使用reentrant lock 在await中加鎖、解鎖,每當一個線程到達屏障(執行await時),都會進行自減,如果不為0會阻塞,自減到0時說明所有線程到達屏障,喚醒其他線程,並更新新的代
Exchange
Exchanger用於線程間的協作,可以用來交換變數
Exchanger<String> exchanger = new Exchanger();
new Thread(() -> {
String A = "A";
try {
//B
System.out.println(exchanger.exchange(A));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
String B = "B";
try {
//A
String A = exchanger.exchange(B);
System.out.println("A=" + A + " B=" + B);
} catch (InterruptedException e) {
e.printStackTrace();
}
當一個線程先執行exchange時會等待另一個線程執行,等到另一個線程exchange時則喚醒等待的線程
總結
本篇文章圍繞前置知識AQS原理,來實現自定義的同步組件,並對併發包中常用同步組件的功能和原理進行說明
繼承AQS後,只需要實現嘗試獲取、釋放同步狀態等方法就可以自定義同步組件
ReentrantLock 是由AQS實現的獨占式可重入鎖,初始值同步狀態為0;獲取鎖時,如果是無鎖則嘗試CAS自增,成功就獲取了鎖;如果有鎖則判斷獲取鎖的線程是不是當前線程,是則說明是可重入鎖自增次數;在釋放鎖時由於可重入的關係,只有自減為0才是真正釋放鎖
ReentrantLock 還提供響應中斷、超時、公平鎖的其他功能,公平鎖實現只需要加上獲取鎖的前提:在AQS中FIFO排隊,前驅節點為首節點
ReentrantReadWriteLock 提供共用的讀鎖和獨占的寫鎖,將鎖的狀況更加細粒度,將同步狀態高低16位拆分為讀、寫的狀態,在讀多寫少的場景併發性能會更好;在獲取寫鎖時,如果有讀鎖那麼會阻塞,如果有寫鎖會查看是否為可重入;在獲取讀鎖時,沒有寫鎖就可以獲取,如果寫鎖是當前線程也可以獲取
信號量用於控制線程訪問資源,初始化自定義的信號量數量,線程訪問資源時先獲取信號量,獲取到信號量才能夠訪問資源;使用共用式來實現,由於可能多個線程同時獲取、釋放信號量,在實現時都需要使用CAS+失敗重試保證原子性
CountDownLatch 用於計數,可以用於一個線程執行N個任務,也可以用於多個線程執行1個任務,當執行完任務使用countdown 來對同步狀態進行扣減,執行await方法時只要同步狀態不為0就會阻塞線程,直到所有任務執行完(將同步狀態扣減完)
CyclicBarrier 是可迴圈使用的屏障,用於多線程到達屏障後,需要等待其他線程都到達屏障才繼續執行;使用reentrant lock 和 代 來實現,調用await時自減,當計數為0時說明所有線程到達屏障,喚醒其他阻塞的線程
Exchange 用於線程間的協作,能夠交換線程間的變數
最後(不要白嫖,一鍵三連求求拉~)
本篇文章被收入專欄 由點到線,由線到面,深入淺出構建Java併發編程知識體系,感興趣的同學可以持續關註喔
本篇文章筆記以及案例被收入 gitee-StudyJava、 github-StudyJava 感興趣的同學可以stat下持續關註喔~
案例地址:
Gitee-JavaConcurrentProgramming/src/main/java/C_AQSComponent
Github-JavaConcurrentProgramming/src/main/java/C_AQSComponent
有什麼問題可以在評論區交流,如果覺得菜菜寫的不錯,可以點贊、關註、收藏支持一下~
關註菜菜,分享更多乾貨,公眾號:菜菜的後端私房菜
本文由博客一文多發平臺 OpenWrite 發佈!