Semaphore Semaphore 字面意思是信號量的意思,它的作用是控制訪問特定資源的線程數目。應用場景:資源訪問,服務限流。 Semaphore 實現AbstractQueuedSynchronizer的方法與ReentrantLock一樣 Semaphore構造方法 public Sema ...
Semaphore
Semaphore 字面意思是信號量的意思,它的作用是控制訪問特定資源的線程數目。應用場景:資源訪問,服務限流。 Semaphore 實現AbstractQueuedSynchronizer的方法與ReentrantLock一樣
Semaphore構造方法
public Semaphore(int permits) {------permits 表示能同時有多少個線程訪問我們的資源
sync = new NonfairSync(permits); -------------預設創建的是非公平鎖。
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);-------傳入的permits做i為了state的值,作為資源總數
}
semaphore.acquire();獲取資源,源碼實現
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);---------每次申請一次資源
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();-----------------線程無效直接拋異常
if (tryAcquireShared(arg) < 0) --------------------拿不到資源,需要進行入隊操作
doAcquireSharedInterruptibly(arg); ---------入隊操作
}
final int nonfairTryAcquireShared(int acquires) { --------獲取資源的操作
for (;;) {
int available = getState(); --------------拿到現有的資源
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) -----------原子操作,多線程情況下會可能失敗,所以無線迴圈自旋下去,直到成功;
return remaining;---------------------如果大於等於0那麼就是拿到了資源,如果小於0,那麼線程就要進入等待隊列
}
}
為什麼要用死迴圈----compareAndSetState這個是cas原子操作,失敗之後要迴圈重覆繼續操作,直到成功。死迴圈也就結束了。
private void doAcquireSharedInterruptibly(int arg)-------------線程入隊操作
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);---------------註意這裡是以共用的方式入隊
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { --------新入隊的節點會判斷他上一個節點是不是頭節點,如果是頭節點會再次嘗試獲取資源,
int r = tryAcquireShared(arg);
if (r >= 0) { -------------如果獲取到資源,那麼這個阻塞隊列就要清空了,裡面沒有在等待的線程了。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && ----------如果獲取不到資源,那麼就要線程阻塞了
parkAndCheckInterrupt()) -----------parkAndCheckInterrupt這個方法會將線程阻塞(掛起),線程都阻塞了,這個死迴圈就不會執行了,這也就是為什麼juc源碼寫了很多
死迴圈都沒問題地原因,我們可以借鑒。當線程被喚醒之後又開始這個死迴圈,嘗試拿資源(非公平鎖有可能拿不到),
拿不到再次被阻塞掛起。
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { ---------------判斷線程能否被正常阻塞
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) -----------------如果上一個節點是有效的在等待的線程,那麼該線程就可以插入到隊列後面
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { -----------如果上一個節點是無效的,那就查找上上個節點是不是有效的,直到找到那個有效的節點,然後將該節點插入到那個有效節點後面,中間的無效節點從鏈表中刪除,後面的節點要找前面
的節點這也就說明瞭為什麼我們地等待隊列要設計成雙鏈表,不光有next。next這種找後驅節點地操作還有pre .pre這樣前驅節點。所以需要雙鏈表。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
semaphore.release();釋放資源,源碼分析
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases; -----------獲取當前的資源然後給資源加回去
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) -----------------CAS演算法還資源,死迴圈,直到成功還回去,死迴圈結束。
return true;
}
}
資源還回去之後執行doReleaseShared方法喚醒其他線程搶資源
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) { --------發現阻塞隊列有阻塞線程
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); ---------跳過頭節點,喚醒下一個節點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; ------迴圈找到waitStatus<0能喚醒的節點調用unpark方法喚醒線程。
}
if (s != null)
LockSupport.unpark(s.thread);
}