Semaphore用於管理信號量,在併發編程中,可以控制返訪問同步代碼的線程數量。Semaphore在實例化時傳入一個int值,也就是指明信號數量。主要方法有兩個:acquire()和release()。acquire()用於請求信號,每調用一次,信號量便少一個。release()用於釋放信號,調用 ...
Semaphore用於管理信號量,在併發編程中,可以控制返訪問同步代碼的線程數量。Semaphore在實例化時傳入一個int值,也就是指明信號數量。主要方法有兩個:acquire()和release()。acquire()用於請求信號,每調用一次,信號量便少一個。release()用於釋放信號,調用一次信號量加一個。信號量用完以後,後續使用acquire()方法請求信號的線程便會加入阻塞隊列掛起。本篇簡單分析Semaphore的源碼,說明其實現原理。
Semaphore對於信號量的控制是基於AQS(AbstractQueuedSynchronizer)來做的。Semaphore有一個內部類Sync繼承了AQS。而且Semaphore中還有兩個內部類FairSync和NonfairSync繼承Sync,也就是說Semaphore有公平鎖和非公平鎖之分。以下是Semaphore中內部類的結構:
看一下Semaphore的兩個構造函數:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
預設是非公平鎖。兩個構造方法都必須傳int permits值。
這個int值在實例化內部類時,被設置為AQS中的state。
Sync(int permits) { setState(permits); }
一、acquire()獲取信號
內部類Sync調用AQS中的acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
- 調用tryAcquireShared()方法嘗試獲取信號。
- 如果沒有可用信號,將當前線程加入等待隊列並掛起
tryAcquireShared()方法被Semaphore的內部類NonfairSync和FairSync重寫,實現有一些區別。
NonfairSync.tryAcquireShared()
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
可以看到,非公平鎖對於信號的獲取是直接使用CAS進行嘗試的。
FairSync.tryAcquireShared()
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
- 先調用hasQueuedPredecessors()方法,判斷隊列中是否有等待線程。如果有,直接返回-1,表示沒有可用信號
- 隊列中沒有等待線程,再使用CAS嘗試更新state,獲取信號
再看看acquireSharedInterruptibly()方法中,如果沒有可用信號加入隊列的方法doAcquireSharedInterruptibly()
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); // 1 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 2 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && // 3 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
- 封裝一個Node節點,加入隊列尾部
- 在無限迴圈中,如果當前節點是頭節點,就嘗試獲取信號
- 不是頭節點,在經過節點狀態判斷後,掛起當前線程
二、release()釋放信號
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 1 doReleaseShared(); // 2 return true; } return false; }
- 更新state加一
- 喚醒等待隊列頭節點線程
tryReleaseShared()方法在內部類Sync中被重寫
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
這裡也就是直接使用CAS演算法,將state也就是可用信號,加1。
看看Semaphore具體的使用示例:
public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10)); //信號總數為5 Semaphore semaphore = new Semaphore(5); //運行10個線程 for (int i = 0; i < 10; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { //獲取信號 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "獲得了信號量,時間為" + System.currentTimeMillis()); //阻塞2秒,測試效果 Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "釋放了信號量,時間為" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放信號 semaphore.release(); } } }); } threadPool.shutdown(); }
代碼結果為:
pool-1-thread-2獲得了信號量,時間為1550584196125 pool-1-thread-1獲得了信號量,時間為1550584196125 pool-1-thread-3獲得了信號量,時間為1550584196125 pool-1-thread-4獲得了信號量,時間為1550584196126 pool-1-thread-5獲得了信號量,時間為1550584196127 pool-1-thread-2釋放了信號量,時間為1550584198126 pool-1-thread-3釋放了信號量,時間為1550584198126 pool-1-thread-4釋放了信號量,時間為1550584198126 pool-1-thread-6獲得了信號量,時間為1550584198126 pool-1-thread-9獲得了信號量,時間為1550584198126 pool-1-thread-8獲得了信號量,時間為1550584198126 pool-1-thread-1釋放了信號量,時間為1550584198126 pool-1-thread-10獲得了信號量,時間為1550584198126 pool-1-thread-5釋放了信號量,時間為1550584198127 pool-1-thread-7獲得了信號量,時間為1550584198127 pool-1-thread-6釋放了信號量,時間為1550584200126 pool-1-thread-8釋放了信號量,時間為1550584200126 pool-1-thread-10釋放了信號量,時間為1550584200126 pool-1-thread-9釋放了信號量,時間為1550584200126 pool-1-thread-7釋放了信號量,時間為1550584200127
可以看到,最多5個線程獲得信號,其它線程必須等待獲得信號的線程釋放信號。