ReentrantLock 1 這篇還是接著ReentrantLock的公平鎖,沒看過第0篇的可以先去看上一篇https://www.cnblogs.com/sunankang/p/16456342.html 這篇就以問題為導向,先提出問題,然後根據問題去看代碼 確保能喚醒排隊的線程? A,B兩線程 ...
ReentrantLock 1
這篇還是接著ReentrantLock的公平鎖,沒看過第0篇的可以先去看上一篇https://www.cnblogs.com/sunankang/p/16456342.html
這篇就以問題為導向,先提出問題,然後根據問題去看代碼
確保能喚醒排隊的線程?
A,B兩線程,A線程執行完業務釋放鎖過程中B線程添加進了鏈表,如何保證B線程能正常醒來
現在假設A線程走完tryAcuqire
後獲取到鎖,執行業務代碼,最後unlock()
tryAcquire代碼就不進去看了,上篇講過了 現在只需關註兩個點
lock方法中的acquireQueued
用來park
unlock方法中的release
用來unpark
首先來看park的條件是啥
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
進入acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //在這裡進行的park
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
也就是shouldParkAfterFailedAcquire
如果這個方法返回true,才會去park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
現在假設第一種情況,首次進入這個shouldParkAfterFailedAcquire
方法的時候,A線程就進入unlock方法了 那麼此時節點狀態如下圖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//主要看這段代碼
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
那麼h!=null
進入,但是頭節點的waitStatus還是0,所以不走unpark,A線程結束
A線程結束了誰來喚醒B線程呢? 回到acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因為第一次進入shouldParkAfterFailedAcquire
方法中,最後走到else代碼塊,我們假設沒有發生衝突,修改成功
A線程執行完了unlock,而此時鎖的狀態值為0,沒有被持有的狀態,最外層的for(;;)
讓代碼又重新跑了一遍
第二次的時候if (p == head && tryAcquire(arg))
這個if就會進入,因為現在已經沒有其他線程在持有鎖了,所以tryAcquire
嘗試獲取鎖成功,返回ture
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
在setHead方法中將當前節點,咱們這個例子中也就是B節點,設置為head,之後清空上個引用和當前引用的線程
最後清除上個節點對B節點的引用,此時節點關係如下
而原來的頭節點沒有任何引用,等待GC即可,也可以看到在代碼p.next = null; // help GC
這段旁邊寫的註釋 幫助GC
之後將失敗狀態設置為false,返回是否被打斷的變數,lock方法結束,
現在來假設在shouldParkAfterFailedAcquire
方法中修改成功,但此時的A線程還沒有走到unlock,當B線程馬上要開始走parkAndCheckInterrupt
方法開始park的時候,時間片用完的情況
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
//====假設此時B線程在這裡=====
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
此時節點關係如下
A線程的unlock就可以進入unparkSuccessor
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
第一個if判斷為true,嘗試修改狀態為0 (這裡沒看懂為什麼是嘗試修改)
if (s == null || s.waitStatus > 0)
這個判斷我們是不進入的,註意unparkSuccessor
這個方法的node參數是head節點,而不是我們的B節點,所以繼續執行下麵的if判斷
s
就是B節點,在B線程park前喚醒,B線程再走到park的時候是不會再進行park的,直接返回,方法結束
真的公平嗎?
A線程在運行,B線程初始化鏈表中的過程中,A線程運行完成,釋放鎖,C線程進入
我們只需要看線程B初始化鏈表的情況即可
addWaiter
中enq
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
//假設線程B走到這裡時間片用完,還沒來得及設置tail
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
那麼此時線程A解鎖了,線程C調用lock方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在tryAcquire
方法的hasQueuedPredecessors
方法中
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
此時tail還是null,而head已經被線程B設置了一個空Node,h!=t
為true,h也只是一個空Node,所以(s = h.next) == null
為true,整體返回true,外層取反為false,退出tryAcquire方法去入隊列
那麼入隊列會破壞隊列的初始化或者C線程變成第一個排隊的節點嗎?,註意咱們現在假設的線程B還沒有獲取到cpu的調用,還是停在 tail = head;
代碼執行前
線程C執行addWaiter
方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這個時候tail還是空,進入enq
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
首先第一個判斷是會進入的,這個時候tail還是空,但是if (compareAndSetHead(new Node()))
方法不會成功,來看看代碼
private final boolean compareAndSetHead(Node update) {
//註意第三個參數 null
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
判斷的是head為null的時候才會進行修改,所以線程C沒有修改成功,那麼會一直在for(;;)
中迴圈,直到線程B初始化完空的頭節點,也就是執行tail = head;
這段代碼
如果線程B走完了 tail = head;
沒來得及進行第二次迴圈添加B節點的時候,線程A解鎖了,線程C進來了呢
還是在tryAcquire
方法的hasQueuedPredecessors
中
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
這個時候第一個h!=t
就是false,因為B線程已經將head和tail的引用指向同一個空節點了,返回false
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//因為返回false,取反則進行獲取鎖的操作
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
C線程直接獲取鎖去運行代碼了,所以ReentrantLock的公平鎖其實並不是絕對的公平