前言 簡介 Semaphore 中文稱信號量,它和ReentrantLock 有所區別,ReentrantLock是排他的,也就是只能允許一個線程擁有資源,Semaphore是共用的,它允許多個線程同時擁有資源,是AQS中共用模式的實現,在前面的AQS分析文章中,我也是用Semaphore去解釋共用 ...
前言
簡介
Semaphore 中文稱信號量,它和ReentrantLock 有所區別,ReentrantLock是排他的,也就是只能允許一個線程擁有資源,Semaphore是共用的,它允許多個線程同時擁有資源,是AQS中共用模式的實現,在前面的AQS分析文章中,我也是用Semaphore去解釋共用鎖的
現實中,我們火爆一點兒的飯店吃飯,比如海底撈,為什麼我們需要排隊,是因為裡面只能容納這麼多人吃飯,位置不夠了,我們就要去排隊,有人吃完出來了才能有新的人進去,同樣的道理
使用
下麵還是常規流程 可能有的人沒用過,就寫個小demo,介紹下他的使用
先看下代碼:
/**
* @ClassName SemaphoreDemo
* @Auther burgxun
* @Description: 使用信號量的Demo
* @Date 2020/4/3 13:53
**/
public class SemaphoreDemo {
public static void PrintLog(String logContent) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm.ss.SSS");
System.out.println(String.format("%s : %s", simpleDateFormat.format(new Date()), logContent));
}
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(10);
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
PrintLog("Thread1 starting ......");
semaphore.acquire(5);
PrintLog("Thread1 get permits success");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
PrintLog("thread1 release permits success");
semaphore.release(5);
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
PrintLog("Thread2 starting ......");
semaphore.acquire(5);
PrintLog("Thread2 get permits success");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
PrintLog("thread2 release permits success");
semaphore.release(5);
}
}
});
thread1.start();
thread2.start();
try {
semaphore.acquire(5);
PrintLog("Main thread get permits success");
Thread.sleep(3000);
} finally {
PrintLog("Main thread release permits");
semaphore.release(5);
}
}
}
上面的代碼邏輯也很簡答,就是設置一個信號量為10的permits,然後代碼中新開2個線程,也獲取semaphore的5個permits,同時主線程也獲取5個permits
讓我們看下結果:
2020-04-08 11:53.40.539 : Thread1 starting ......
2020-04-08 11:53.40.539 : Main thread get permits success
2020-04-08 11:53.40.539 : Thread2 starting ......
2020-04-08 11:53.40.544 : Thread1 get permits success
2020-04-08 11:53.43.543 : Main thread release permits
2020-04-08 11:53.43.543 : Thread2 get permits success
2020-04-08 11:53.43.544 : thread1 release permits success
2020-04-08 11:53.46.543 : thread2 release permits success
從結果上可以看到 程式啟動的時候,Thread1 和Thread2 還有主線程同時要獲取permits,但是由於Semaphore的permits一共就是10個,所以當主線程和Thread1獲取到了以後,Thread2 雖然啟動了 但是會阻塞在這邊,進入AQS的SyncQueue中,當MainThread或者Thread1 執行完成,釋放permits後,Thread2 才會從阻塞隊列中 喚醒回來從新獲取的信號量,後面繼續執行!
源碼分析
看完了 上面的小Demo 相信你對信號量有了一定的瞭解,那我們就進入源碼中看下,是怎麼實現的,首先我們先看下Semaphore的結構圖:
類結構圖
從圖上我們可以看到 這個類機構和我們之前看的ReentrantLock差不多,有一個Sync靜態的抽象類,然後還有2個繼承Sync的類,一個是公平類FairSync ,另外一個是非公平的NonfairSync
關於公平和非公平的選擇 我在ReentrantLock的結尾部分已經做了部分闡述,這邊就不說了
那麼就看下代碼實現吧!
Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* m預設的構造函數 設置AQS同步器的State值
*/
Sync(int permits) {
setState(permits);
}
/**
* 獲取同步器的狀態值 State 這個就是獲取設置的許可數量
*/
final int getPermits() {
return getState();
}
/**
* 實現共用模式下非公平方法的獲取資源
*/
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {//自旋
int available = getState();//當前的同步器的狀態值
int remaining = available - acquires;//本次用完 還剩下幾個permits值
// 如果剩餘小於0或者CAS 成功 就返回 後面利用這個方法的時候 會判斷返回值的
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* 共用模式下的嘗試釋放資源
*/
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState();
int next = current + releases;//用完的permits 就要加回去
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//CAS 修改AQS的State 由於可能存在多個線程同時操作的State
// 所以要使用CAS操作 失敗了就繼續迴圈,CAS成功就返回
return true;
}
}
/**
* 根據參數reductions 去減少信號量
*/
final void reducePermits(int reductions) {
for (; ; ) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
/**
* 清空所有信號量 並返回清空的數量
* 這邊我看很多文章裡面的人只是翻譯了英文 獲取信號量 但是這個裡面有個清空的操作
*/
final int drainPermits() {
for (; ; ) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
Sync 裡面的方法不多,都是常見的討論,裡面預設提供一個非公平版本的獲取Permits,還有統一的釋放Permits的方法,其餘的就是一個獲取信號量方法getPermits,和減少當前Permits數量的方法reducePermits,最後還有一個清空信號量的方法drainPermits,drainPermits這個方法好多文章裡面 都翻譯了因為註解,都翻譯為獲取並返回許可,但是這個方法其實主要做的還是清空Semaphore裡面的信號量的操作,有人會想 為什麼提供這個一個雞肋方法麽,因為先用getPermits獲取 然後使用reducePermits減少不就好了麽,哈哈,這邊要考慮到多線程併發的情況,這邊只能使用CAS的操作去更新!每一行代碼都有它存在的意義,就像人一樣,存在即合理!
NonfairSync
/**
* 非公平版本
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
這個類 真沒啥好說的,都是調的Sync裡面的實現
FairSync
/**
* 公平版本
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (; ; ) {//這邊為什麼要用自旋 主要是因為當前版本是共用模式 可能會多個線程同事操作State 導致當前的CAS 操作失敗 所以要做重試
if (hasQueuedPredecessors())//如果當前的SyncQueue中還有等待線程 那就直接返回-1 不讓當前線程獲取資源
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
這個公平方法的初始化方法和非公平的一樣,區別就是這個tryAcquireShared方法的實現,一個是自己實現的,一個是調的Sync的實現,這個2個方法 雖然區別也不大,但是執行的邏輯卻是不一樣的,主要是這個hasQueuedPredecessors方法,這個方法就是獲取AQS的SyncQueue中 是否還有等待獲取資源的線程,這就是公平和非公平的區別,上一篇ReentrantLock中我也做個說明,就相當於插隊一樣,如果公平的話 大家獲取不到的時候 就到後面排隊等待,大家挨個來,但是非公共的獲取 就是一上來,我不管後面有沒有等待的人,如果有滿足我獲取的條件,我就直接占用!
Semaphore 構造函數
Semaphore有個構造函數
- Semaphore(int permits) 這個是預設的構造函數,是非公平permits就是信號量許可的總數
- Semaphore(int permits, boolean fair) 這個和上面的區別就是 可以設置實現的版本 true就是FairSync false 就是NonfairSync
Semaphore 成員方法
獲取
方法 | 是否響應中斷 | 是否阻塞 |
---|---|---|
acquire() | 是 | 是 |
acquire(int permits) | 是 | 是 |
acquireUninterruptibly() | 否 | 是 |
acquireUninterruptibly(int permits) | 否 | 是 |
tryAcquire() | 否 | 否 |
tryAcquire(int permits) | 否 | 否 |
tryAcquire(long timeout, TimeUnit unit) | 是 | 是(時間可控) |
tryAcquire(int permits, long timeout, TimeUnit unit) | 是 | 是(時間可控) |
上面的方法 是我對Semaphore獲取permits的使用總結,記不住也沒事兒,看看名字或者到時候看下註解,應該也能看明白的~
調一個核心方法acquire吧
/**
* Semaphore中
* 獲取資源
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* AbstractQueuedSynchronizer 中
* 共用模式下的 獲取資源 可以響應中斷
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//檢測下 中斷標識符 如果發生了中斷 就拋出中斷異常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//嘗試獲取資源 如果返回值小於0 說明當前的同步器裡面的值 不夠當前獲取 就進入排隊
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//以共用模式加入到阻塞隊列中 這裡addWaiter和獨占鎖加鎖使用的是同一個方法 不清楚的 可以看之前的文章
final Node node = addWaiter(Node.SHARED);// 返回成功加入隊尾的節點
boolean failed = true;//標識是否獲取資源失敗
try {
for (; ; ) {//自旋
final Node p = node.predecessor();// 獲取當前節點的前置節點
if (p == head) {// 如果前置節點是head 那就去嘗試獲取資源,因為可能head已經釋放了資源
int r = tryAcquireShared(arg);
if (r >= 0) {// 如果獲取成功且大於等於0,意味這資源還有剩餘,可喚醒其餘線程獲取
setHeadAndPropagate(node, r);// 這邊方法就是和獨占鎖處理不一樣地放 我們可以重點去看下 其餘的流程是一樣的
p.next = null; // help GC
failed = false;
return;
}
}
/*下麵的方法和獨占鎖的是一樣的 在第一篇文章中已經解讀過,小伙伴們如果不清楚 可以去看下
有區別的地方就是對中斷的處理這邊是直接拋出中斷異常,獨占鎖處理是返回標記是否中斷 讓上一層處理中斷
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 更新prev節點狀態 ,並根據prev節點狀態判斷是否自己當前線程需要阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// node的prev節點的狀態
if (ws == Node.SIGNAL) // 如果SIGNAL 就返回true 就會執行到parkAndCheckInterrupt方法裡面
return true;
/*
* 如果ws 大於0 這裡是只能為1,如果是1說明線程已經取消,相當於無效節點
* 者說明 當前node 節點加入到了一個無效節點後面,那這個必須處理一下
node.prev = pred = pred.prev
* 這個操作 我們拆解下來看,下看 pred = pred.prev這個的意思是把prev節點的prev*節點 賦值給prev節點
*後面再看 node.prev = pred 聯合 剛纔的賦值 這個的意思就是把prev節點的prev節點和node關聯起來,
*原因我上面也說了因為pre節點線程取消了,所以node節點不能指向pre節點 只能一個一個的往前找,
*找到waitStatus 小於或者等於0的結束迴圈最後再把找到的pre節點執行node節點 ,這樣就跳過了所有無效的節點
*/
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
*這邊的操作就是把pred 節點的狀態設置為SIGNAL,這樣返回false 這樣可以返回到上面的自旋中
*再次執行一次,如果還是獲取不到鎖,那麼又回到當前的shouldParkAfterFailedAcquire方法 執行到方法最上面的判斷
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 阻塞當前線程,併在恢復後堅持是否發送了中斷
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
/*
* 這邊線程阻塞,只有2中方式喚醒當前線程,
* 一種方式就是 當前線程發生中斷
* 另外一個情況就是 資源釋放的時候會調unpark 方法 喚醒當前的線程 這個會在下一篇會講到
*/
LockSupport.park(this);
return Thread.interrupted();//檢查線程在阻塞過程中 是否發生了中斷
}
這其中 tryAcquireShared 方法 都是子類去重寫實現的,非公平版本和公平版本的實現 上文已經描述過!
這其中的整個實現都是在AQS中的,在前面的AQS文章中也詳細的描述過~不清楚的 去前面的文章中看下
下麵是整個方法的調用關係圖如下:
釋放
/**
* 釋放資源
*/
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
釋放方法調用了Sync的releaseShared 實際上就是調用了AQS內部的方法releaseShared
/**
* AbstractQueuedSynchronizer 中
* 共用版本的 釋放資源
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//tryReleaseShared 還是和之前的套路一樣 子類去重寫的
doReleaseShared();//是不是很熟悉
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共用模式下的釋放資源
*/
private void doReleaseShared() {
for (; ; ) {
Node h = head;
if (h != null && h != tail) {// 這個head!=tail 說明阻塞隊列中至少2個節點 不然也沒必要去傳播喚醒 如果就自己一個節點 就算資源條件滿足 還換個誰呢?
int ws = h.waitStatus;// head 節點狀態SIGNAL
if (ws == Node.SIGNAL) {// 如果head狀態是
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//是和獨占鎖釋放用的同樣的方法 喚醒的是下一個節點 前面的文章有分析到
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; //這邊設置為-3 是為了喚醒的傳播 也就是滿足上一個方法有判斷waitStatus 小於0
}
if (h == head)
break;
}
}
/**
* 喚醒等待線程去獲取資源
*/
private void unparkSuccessor(Node node) {
/*
* 這邊判斷了下如果當前節點狀態小於0,更新這邊的節點狀態的0,是為了防止多次釋放的時候 會多次喚醒,
* 因為上面的方法有個判斷waitStatus不等0才會執行到這個方法裡面
*/
int ws = node.waitStatus;//這邊的弄得節點就是 要釋放的節點 也就是當前隊列的頭節點
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果當前節點的next 節點 不存在或者waitStatus 大於0 說明next節點的線程已取消
if (s == null || s.waitStatus > 0) {
s = null;
//這個迴圈就是 從尾部節點開始往前找,找到離node節點也就是當前釋放節點最近的一個非取消的節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
/*
*一開始覺得 這行判斷null有點多餘 因為上面去for 迴圈去找s 的時候 已經判斷了不等於null
*才可以進下麵的迴圈賦值的 後來一想 不對,你們猜為什麼?
*因為 可能在迴圈的過程中t在賦值給s後,繼續迴圈 雖然條件不滿足,
*但是這個時候已經比修改成null 了 我是這麼想的哈~不知道對不對~
*/
if (s != null)
LockSupport.unpark(s.thread);// 這邊就是喚醒當前線程去獲取資源,
}
具體裡面的doReleaseShared方法 我之前在AQS的共用鎖的文章裡面 都詳細做了概述,這邊我就不再次贅述了!
tryReleaseShared 方法 還是和之前的套路一樣,AQS裡面沒有實現 只是寫了個拋出異常的方法,tryReleaseShared方法需要子類去重寫實現,具體為什麼不寫成抽象方法,哈哈 這個問題 自己去AQS中的文章去找下吧~相信你能找到答案
總結
看完了整個代碼的實現,Semphore實際上就是一個共用鎖,多個線程可以共用一個AQS中的State,Semphore常見使用場景是限制資源的併發訪問的線程數量