AQS源碼探究 競爭鎖資源 我們進入ReentrantLock源碼中查看其內部類 Sync 對AQS進行擴展公共方法並定義抽象方法的抽象類 FaireSync 實現公平鎖的AQS的實現類 UnFairSync 實現非公平鎖的ASQ的實現類 我使用例子進行的debug,然後一步一步看源碼。例子在文章最 ...
AQS源碼探究---競爭鎖資源
我們進入ReentrantLock源碼中查看其內部類
- Sync 對AQS進行擴展公共方法並定義抽象方法的抽象類
- FaireSync 實現公平鎖的AQS的實現類
- UnFairSync 實現非公平鎖的ASQ的實現類
我使用例子進行的debug,然後一步一步看源碼。例子在文章最後面
以下流程皆以非公平鎖為例
線程競爭鎖資源
AQS的state解釋:
- 0 表示鎖沒有被占用
- 1 表示鎖被占用了
- > 1 表示鎖被重入了 PS: ReentrantLock是可重入鎖
獲得鎖執行流程
- 創建ReentrantLock對象
// ReetrantLock 預設創建一個非公平鎖的AQS
public ReentrantLock() {
sync = new NonfairSync();
}
- 然後我們調用lock方法請求鎖
- 成功,即將鎖的owner主人設置為當前線程,接下來就是回到線程中執行線程的任務。
- 失敗,即進入acquire的流程。
static final class NonfairSync extends Sync {
final void lock() {
// 請求鎖資源,如果將鎖的state狀態0改成1,即為成功獲得鎖資源
if (compareAndSetState(0, 1))
// 將鎖的擁有者設置為當前線程,裡面就一句話沒啥好看的
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
下麵是AQS阻塞鏈表是由一個雙向鏈表組成的。
阻塞鏈表的成員對象Node的waitState狀態解釋:
- CANCELLED = 1 表示線程已經被取消了
- SIGNAL = -1 表示後繼線程需要unpark解除阻塞,下圖即表示。
鎖競爭失敗流程
- 進入acquire方法
public final void acquire(int arg) {
// 首先再次請求鎖
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 首先會執行tryAcquire方法
protected final boolean tryAcquire(int acquires) { // 註意:我們進入的是非公平鎖的tryAcquire實現
return nonfairTryAcquire(acquires);
}
再次進入nonfairTryAcquire(acquires)方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 獲得當前線程
int c = getState(); // 獲得當前線程的狀態
if (c == 0) { // 如果狀態為0即鎖資源被釋放現在處於空閑狀態,會嘗試獲得鎖
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 這裡是可重入代碼,後面解釋
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // 失敗返回false。如果是成功獲得鎖或者是重入都會返回true。需要瞭解
}
- 回到步驟1代碼,如果是失敗返回false取反true,就會繼續執行if語句。成功取反後false就直接結束當前語句,就會直接回到線程執行線程代碼了。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 這裡是兩個方法,需要一個一個來
selfInterrupt();
}
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- 執行addWaiter方法,概括就是將沒有獲得鎖的加入一個等待鏈表中。
private Node addWaiter(Node mode) { // 剛創建的時候mode為null的
Node node = new Node(Thread.currentThread(), mode); // 首先創建一個node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 將尾部的引用給pred變數
if (pred != null) { // 剛開始創建的時候pred是null的
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 這個代碼塊就是cas嘗試加入雙向鏈表尾部
pred.next = node;
return node;
}
}
enq(node); // 這裡是創建head和tail進的方法,和if (compareAndSetTail(pred, node))失敗進入
return node; // 方法返回由當前線程創建的node
}
enq方法的進入條件
- 進行head和tail的初始化。
- 多線程下如果調用enq方法失敗,就是當別的線程也進入了等待鏈表,此時tail就會改變,上面的cas就會false,沒有返回,就會進行enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 如果尾部為空就會進行初始化,沒有的話不斷進行cas嘗試插入鏈表尾部。
if (t == null) { // Must initialize 初始化鏈表
if (compareAndSetHead(new Node())) // 我們可以看到head是指向一個沒有參數的node對象的
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node; // 註意t還是引用舊值,而tail已經更新引用為node了。
return t;
}
}
}
}
疑問:
compareAndSetTail(t, node) 方法在我初次遇見的時候很奇怪。為什麼t還算指向了舊的node對象
因為這個compareAndSetTail只是將tail的引用改變成了node,註意這邊改變的是tail的引用。並沒有去改變pred的引用。傳入pred只是保證我們獲得的尾部和現在的尾部是一樣的,才能進行安全的尾部連接。
這也是我基礎不太扎實的原因吧。
- 執行acquireQueued方法,再次嘗試獲得鎖,和進行阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 獲得node前驅
if (p == head && tryAcquire(arg)) { // 如果是第一個等待鎖的線程,再次請求鎖
setHead(node); // 請求成功就將該線程的node直接移出等待鏈表
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 檢查狀態並更新前驅狀態為-1,即表示有後繼節點阻塞了。
parkAndCheckInterrupt()) // 進入park,如果被中斷返回true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在parkAndCheckInterrupt方法時進行park阻塞。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
線程釋放鎖
- 調用unlock方法
public void unlock() {
sync.release(1);
}
- 調用release方法
public final boolean release(int arg) {
if (tryRelease(arg)) { // 進入tryRelease即嘗試釋放
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
進入tryRelease的ReentrantLock實現
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 獲得當前的狀態
if (Thread.currentThread() != getExclusiveOwnerThread()) // 非獲得鎖線程拋異常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 如果沒有重入直接釋放鎖將owner置為null
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 由於鎖資源只有一個只有一個線程能更新狀態,所以更新AQS狀態不需要cas
return free;
}
- 繼續回到release方法,釋放鎖成功返回true,進入條件語句
public final boolean release(int arg) {
if (tryRelease(arg)) { // 進入tryRelease即嘗試釋放
Node h = head;
if (h != null && h.waitStatus != 0) // 阻塞隊列存在即頭節點不為空且頭節點的狀態不為0,為0表示後面沒節點阻塞了
unparkSuccessor(h);
return true;
}
return false;
}
- 進入unparkSuccessor方法,就不貼源碼了,簡單介紹一下就是將頭節點置空,將阻塞隊列中第一個等待的node解除阻塞,將他放出來去搶鎖資源。
非公平鎖和公平鎖的區別
看完源碼,整明白了就是鎖資源釋放後會放第一個等待線程去搶鎖。
我就疑惑了,那明明就是公平的啊。
其實只是釋放了線程,但是同時有其他的線程進行爭搶,就又會變成爭搶的情況,還是可能被其他線程搶走鎖資源。
公平鎖
就會判斷如果阻塞鏈表是否為空,為空才能進行獲取鎖資源,又或者是鎖重入
不然就是直接加入阻塞鏈表,從而實現了公平。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
DEBUG例子
@Slf4j
public class Test1 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
// Reentrantlock鎖資源被擁有
new Thread(()->{
lock.lock();
try{
log.debug("運行中");
try {
Thread.sleep(2000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
lock.unlock();
}
}).start();
// ReentrantLock阻塞鏈表初始化
new Thread(()->{
lock.lock();
try{
log.debug("運行中");
try {
Thread.sleep(2000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
lock.unlock();
}
}).start();
// ReentrantLock 再次向阻塞鏈表添加線程
new Thread(()->{
lock.lock();
try{
log.debug("運行中");
try {
Thread.sleep(2000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
lock.unlock();
}
}).start();
}
}