前言 這次分析信號量Semaphore,為什麼稱之為信號量呢?是因為它可以控制同時訪問某個資源的操作數量或是同時執行某個指定操作的數量。就好比它像一個租賃汽車的公司,租賃公司的汽車的數量是固定的,用完需要歸還,用之前需要去租借(acquire 前提是還有可用的汽車),如果汽車都被租出去了,那隻能等到 ...
前言
這次分析信號量Semaphore,為什麼稱之為信號量呢?是因為它可以控制同時訪問某個資源的操作數量或是同時執行某個指定操作的數量。就好比它像一個租賃汽車的公司,租賃公司的汽車的數量是固定的,用完需要歸還,用之前需要去租借(acquire 前提是還有可用的汽車),如果汽車都被租出去了,那隻能等到別人歸還了才能租到。它是基於AQS的共用鎖來實現的,其中使用了較多的AQS的方法,所以在這之前最好需要分析過AQS的源碼,不嫌棄也可以查看本人之前AQS的源碼分析,有些AQS方法沒有在之前分析過的這裡涉及到了會進行分析。
源碼
構造器和屬性
// 繼承了AQS,並且有2個子類實現(非公平鎖和公平鎖) private final Sync sync; // 構造器只有許可數量permits的信號量 public Semaphore(int permits) { // 非公平鎖 sync = new NonfairSync(permits); } // 構造器帶有許可數量permitsy以及是否是公平鎖 public Semaphore(int permits, boolean fair) { // fair為true時,為公平鎖否則非公平 sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
內部抽象類Sync
// 繼承AQS abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { // 將許可值賦值給AQS的state setState(permits); } // 獲取當前state值 final int getPermits() { return getState(); } // 非公平獲取許可 final int nonfairTryAcquireShared(int acquires) { // 保證CAS操作成功 for (;;) { // state值,可用許可 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 小於0表示申請許可大於剩餘的許可直接返回 // 大於0時會更新為之前剩餘許可數減去申請許可數後的剩餘值 return remaining; } } // 嘗試歸還 protected final boolean tryReleaseShared(int releases) { // 保證CAS操作成功 for (;;) { // 當前剩餘許可數量 int current = getState(); // 當前剩餘加上歸還 int next = current + releases; // int值溢出檢查 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // CAS更新剩餘 if (compareAndSetState(current, next)) return true; } } // 減少許可 final void reducePermits(int reductions) { // 保證CAS操作成功 for (;;) { // 獲取當前許可數 int current = getState(); // 減少reductions個許可 int next = current - reductions; // 溢出 if (next > current) // underflow throw new Error("Permit count underflow"); // CAS更新 if (compareAndSetState(current, next)) return; } } // 將當前剩餘許可全部扔掉 final int drainPermits() { // 保證CAS操作成功 for (;;) { int current = getState(); // 當前剩餘許可為0 直接返回0 // 當前許可大於0,將其更新為0 if (current == 0 || compareAndSetState(current, 0)) return current; } } }
內部類非公平鎖實現類NonfairSync
static final class NonfairSync extends Sync { NonfairSync(int permits) { // 調用父類Sync的構造器 super(permits); } // 嘗試獲取acquires個許可 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
內部類公平鎖實現類FairSync
static final class FairSync extends Sync { FairSync(int permits) { // 同樣調用父類的構造器 super(permits); } protected int tryAcquireShared(int acquires) { // 保證操作成功 for (;;) { // 獲取許可之前查看是否有正在等待的 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 小於0表示申請許可大於剩餘的許可直接返回 // 大於0時會更新為之前剩餘許可數減去申請許可數後的剩餘值 return remaining; } } }
方法acquire 獲取許可
acquire方法分為無參和有參,裡面調用的方法都是一樣只是參數不一樣,都為可中斷
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 獲取permits個許可 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 線程中斷時拋出異常 if (Thread.interrupted()) throw new InterruptedException(); // arg 大於可用許可時會調用doAcquireSharedInterruptibly方法 // doAcquireSharedInterruptibly 當還有可用許可時不會掛起當前線程,反之則會掛起當前線程,在上一篇博文有分析 if (tryAcquireShared(arg) < 0) // doAcquireSharedInterruptibly doAcquireSharedInterruptibly(arg); }
方法release 歸還許可
public void release() { // 歸還一個許可 sync.releaseShared(1); } public void release(int permits) { // 參數保護 if (permits < 0) throw new IllegalArgumentException(); // 歸還permits個許可 sync.releaseShared(permits); }
AQS的releaseShared方法
public final boolean releaseShared(int arg) { // 歸還許可,只要許可沒讓int溢出都返回true,然後會去喚醒正在等待許可的線程 if (tryReleaseShared(arg)) { // 上一篇博文有分析此方法 doReleaseShared(); return true; } return false; }
總結
信號量可以用來實現資源池,比如資料庫連接池、線程池等等,信號量的許可量在初始化後並不是還可以通過drainPermits將當前許可數量變為0.