package cn.daheww.demo.juc.reentrylock; import sun.misc.Unsafe; import java.lang.reflect.Field; import java.util.concurrent.locks.LockSupport; /** * @ ...
package cn.daheww.demo.juc.reentrylock;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.concurrent.locks.LockSupport;
/**
* @author daheww
* @date 2022/7/7
*/
public class MiniReentryLock implements Lock {
/**
* 鎖的是什麼 --> 資源 --> state
* 0 --> 未加鎖
* >0 -> 加鎖
*/
private volatile int state;
/**
* 獨占模式
* 同一時刻只有一個線程可以持有鎖,其它線程在未獲取到鎖的時候會被阻塞
*
* 當前獨占鎖的線程(占用鎖的線程)
*/
private Thread exclusiveOwnerThread;
/**
* 需要有兩個節點去維護阻塞隊列
* Head 指向隊列的頭節點
* Tail 指向隊列的尾節點
*
* 比較特殊:Head節點對應的線程就是當前占用鎖的線程
*/
private Node head;
private Node tail;
/**
* 獲取鎖
* 假設當前鎖被占用,則會阻塞調用者線程,直到它搶占到鎖為止
*
* 模擬公平鎖
* --> 先來後到
*
* lock的過程
* 情景1.線程進來後發現,當前state == 0 --> 直接去搶鎖
* 情景2.線程進來後發現,當前state > 0 --> 將當前線程入隊
*/
@Override
public void lock() {
// 第一次獲取到鎖時,將state設置為1
// 第n次重入時,將state設置為n
acquire(1);
}
@Override
public void unlock() {
release(1);
}
private void release(int arg) {
// 條件成立:說明線程已經完全釋放鎖了
if (tryRelease(arg)) {
// 阻塞隊列裡面,還有睡覺的線程,應該喚醒一個線程
// 首先需要知道有沒有等待的node --> head.next == null
Node head = this.head;
if (head.nx != null) {
// 公平鎖,喚醒head.nx節點
unparkSuccessor(head);
}
}
}
private void unparkSuccessor(Node node) {
Node s = node.nx;
if (s != null && s.thread != null) {
LockSupport.unpark(s.thread);
}
}
/**
* 完全釋放鎖成功則返回true
*/
private boolean tryRelease(int arg) {
int c = getState() - arg;
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new RuntimeException("must get lock first");
}
// 如果執行到這裡,不存在併發,只會有一個線程會來到這裡
// 條件成立,則說明當前線程持有的lock鎖已經完全釋放了
if (c == 0) {
this.exclusiveOwnerThread = null;
this.state = c;
return true;
} else {
this.state = c;
return false;
}
}
/**
* 競爭資源
* 1.嘗試獲取鎖。成功則占用鎖,且返回
* 2.搶占鎖失敗,阻塞當前線程
* @param arg
*/
private void acquire(int arg) {
if (!tryAcquire(arg)) {
// 搶鎖失敗
// step1.將當前線程封裝成node,加入到阻塞隊列中
Node node = addWaiter();
// step2.將當前線程park,使線程處於掛起狀態
acquireQueued(node, arg);
}
// 搶鎖成功
// 1.搶到了鎖
// 2.重入了鎖
}
/**
* 嘗試搶鎖失敗,需要做的事:
* 1.需要將當前線程封裝成node,加入到阻塞隊列中
* 2.需要將當前線程park,使線程處於掛起狀態
*
* 喚醒的流程:
* 1.檢查當前node是否為head.next節點
* head.next是擁有搶占許可權的線程,其它node都沒有搶占的許可權
* 2.搶占:
* 成功:
* 1.將當前node設置為node,將老的head出隊,返回到業務層面
* 2.繼續park等待被喚醒
*
* ----------------------------------------------
* 1.添加到阻塞隊列的邏輯 addWaiter()
* 2.競爭資源的邏輯 acquireQueued()
*/
private void acquireQueued(Node node, int arg) {
// 當前線程已經放到queue中了
// 只有當前node成功獲取到鎖以後才會跳出自旋
for (; ; ) {
// 什麼情況下,當前node被喚醒後可以嘗試去獲取鎖呢?
// 只有一種情況,當前node是head的後繼節點,才有這個許可權
// 不是就先來後到
Node pvNode = node.pv;
// 條件1:pvNode == head
// true --> 說明當前node擁有搶占許可權
// queue中的第一個節點代表的是當前鎖正在執行的線程 --> head指向的線程
// head後面的線程代表的是正在排隊的線程 --> 所以只有head.nx節點擁有搶鎖的權利
// 條件2:tryAcquire(arg)
// true --> 當前線程獲取到了鎖
//
if (pvNode == head && tryAcquire(arg)) {
// 進入到這裡面說明當前線程競爭鎖成功了
// 需要做的操作:
// 1.設置當前head為當前線程的node
// 2.協助原來的對象出隊
setHead(node);
pvNode.nx = null;
// 因為獲取到了鎖,所以就return了
return;
}
// 當前不是head.nx節點,或者去嘗試獲取鎖失敗了,這個時候都需要去把當前線程park掉
System.out.println("線程:" + Thread.currentThread().getName() + " 掛起");
LockSupport.park();
// 直到某個線程做了當前線程的unPark操作,這個線程才會繼續執行
/*
所以總結一下,lock的邏輯就是:
1.在沒鎖的情況下,如果有個線程調用了lock方法,它就會改變lock中的state值。此時state值就不會為0了。
那麼其它線程調用lock方法時,會看到這個state不為0。
2.然後這個線程會被封裝成一個node節點
3.然後會去嘗試競爭一下鎖,做一下最後的輓救工作,如果實在輓救不了,就park了
--> 線程就在這個lock的lock()方法里被阻塞了。就達到了鎖的效果
--> 所有調用這個鎖對象lock的方法只能有一個線程能繼續執行,然後其它線程會被阻塞,直到這個線程做了unlock操作
*/
System.out.println("線程:" + Thread.currentThread().getName() + " 喚醒");
// 什麼時候喚醒被park的線程?--> unlock()
}
}
/**
* 把當前線程入隊
* 返回當前線程對應的node節點
*
* addWaiter執行完成後,保證當前線程已經入隊成功
*/
private Node addWaiter() {
Node newNode = new Node(Thread.currentThread());
// 如何入隊?
// Case1.當前node不是第一個入隊的node,隊列已經有等待的node了
// 1.找到newNode的pv節點
// 2.更新newNode.pvNode = pv節點
// 3.CAS更新tail為newNode
// 4.更新pv節點
Node pvNode = tail;
if (pvNode != null) {
newNode.pv = pvNode;
// 條件成立,說明當前線程成功入隊
if (compareAndSetTail(pvNode, newNode)) {
pvNode.nx = newNode;
return newNode;
}
}
// 執行到這裡的幾種情況
// 1.tail == null隊列是空
// 2.cas設置當前newNode為tail時失敗了 --> 迴圈入隊 --> 自旋
enq(newNode);
return newNode;
}
/**
* 自旋入隊,只有成功之後才返回
* 1.tail == null 隊列是空隊列
* 2.cas設置當前newNode為tail時失敗了
*/
private void enq(Node node) {
for (; ; ) {
// 第一種情況:隊列是空隊列
// --> 當前線程是第一個搶占鎖的線程...
// 當前持有鎖的線程,並沒有設置過任何node,所以作為該線程的第一個後驅節點
// 需要給他擦屁股
// 給當前持有鎖的線程補充一個node作為head節點
// head節點任何時候都代表當前占用鎖的線程
if (tail == null) {
// 條件成立:說明當前線程給當前持有鎖的線程補充head操作成功了
if (compareAndSetHead(new Node())) {
tail = head;
// 註意,並沒有直接返回,而是會繼續自旋
}
} else {
// 當前隊列中已經有node了,說明這是一個追加node的過程
// 如何入隊呢?
// 1.找到newNode的pv節點 --> 最新的tail節點
// 2.更新newNode.pvNode = pv節點
// 3.CAS更新tail為newNode
// 4.更新pv節點
Node pvNode = tail;
node.pv = pvNode;
// 條件成立,說明當前線程成功入隊
if (compareAndSetTail(pvNode, node)) {
pvNode.nx = node;
return;
}
}
}
}
/**
* 嘗試獲取鎖,不會去阻塞線程
* true --> 搶占成功
* false --> 搶占失敗
*/
private boolean tryAcquire(int arg) {
if (state == 0) {
// 當前state為0
// 不能直接搶鎖 --> 公平鎖 --> 先來後到
// 條件一:!hasQueuedPredecessors() ---> 取反之後為true,表示當前線程前面沒有等待著的線程
// 條件二:compareAndSetState(0, arg) -> 使用cas的原因:lock方法可能有多線程調用的情況
// true --> 當前線程搶鎖成功
// (1) volatile --> state被volatile修飾了,所以其它線程能第一時間知道這個值不為0了 --> 緩存能夠一致了
// (2) cas -------> state從0變為arg的操作用cas實現,用於保證只會有一個線程能夠改變state的值(0->arg) --> 只會有一個線程能夠執行接下來的操作 --> 鎖
// 1.如果cas的變數不用volatile修飾就沒有意義:
// 因為A線程改變了state的值,但是B線程並不知道
// (可見性,volatile會讓B線程中的副本馬上失效,然後獲取最新的state的值,此時B線程工作空間中的state值就不為0了)
// 2.如果volatile的變數不用cas去改變它的值的話,也沒有意義:
//· step1.A線程,B線程都拿到了state的副本信息,此時state值為0
// step2.A線程改變了state的值。B線程還在寫,因為state的值改變了,所以B線程工作空間中的state值改變,然後B繼續寫。
// 所以所有判斷出state值為0的線程都能寫成功,並且能執行寫成功後續的操作
// 所以要用cas+volatile去保證只會有一個線程能夠寫成功這個值
// Ps.可以看到,如果這些線程想寫的值都是同一個值的話,多寫了幾次,但是結果和只寫一次是一致的
// cas+volatile主要還是去控制寫成功之後的操作只會被執行一次,這樣就像一個鎖一樣了
if (!hasQueuedPredecessors() && compareAndSetState(0, arg)) {
// 搶鎖成功
// 1.將exclusiveOwnerThread設置為當前線程
this.exclusiveOwnerThread = Thread.currentThread();
return true;
// 不會入隊任何node,接返回true
// 接下來第一個競爭失敗的線程會先去幫忙創建一個node,然後再執行後續的操作
}
// 當前線程前面有線程在等待 || 多個線程和當前線程一起在嘗試獲取這個鎖,然後當前線程失敗了 --> return false;
} else if (Thread.currentThread() == this.exclusiveOwnerThread) {
// 執行的時機:
// 1.當前鎖被占用
// 2.當前線程即為持鎖線程
// 這裡面不存在併發。只有當前加鎖的線程才有許可權修改state
// 即使是同一個線程多次進入到這,設置state的值,那麼它們都是使用的同一個工作空間
// 不存在不同工作空間下,這個值的不一樣的情況(因為沒有了緩存)
// 鎖重入的流程
int c = getState();
c += arg;
// TODO 越界判斷
this.state = c;
return true;
}
// 什麼時候會返回false?
// 1.cas加鎖失敗
// 2.state大於0,且當前線程不是持鎖線程
return false;
}
/**
* 當前線程前面是否有等待著的線程
* true --> 當前線程前面有等待著的線程
* false -> 當前線程前面沒有其它等待著的線程
*
* 調用鏈
* lock --> acquire -> tryAcquire -> hasQueuedPredecessors(state值為0時,即當前lock為無主狀態)
*
* 什麼時候返回false?
* 1.當前隊列是空
* 2.當前線程為head.next節點 --> head.next在任何時候都有權力去爭取lock
*/
private boolean hasQueuedPredecessors() {
Node h = head;
Node t = tail;
Node s;
// 條件一:h != t
// true --> 當前隊列已經有node了
// false -> h == t
// case1. h == t == null --> 還沒初始化過queue
// case2. h == t == head
// 第一個獲取鎖失敗的線程會為當前持有鎖的線程補充創建一個head node
// 條件二:
// 前置條件:條件一成立
// 排除幾種情況:
// 條件2.1:極端情況 --> 第一個獲取鎖失敗的線程,會為持鎖的線程補充創建head節點,然後在自旋入隊
// step1.cas設置tail成功了
// step2.head.next = node
// 在這兩步中間的時候,有線程來檢查前面是否有等待的線程
// 這種情況應該返回true:已經有head.next節點了,其它線程來這的時候需要返回true
// 條件2.2:
// 前置條件:h.next不是null
// true --> 條件成立說明當前線程就是持有鎖的線程
// false -> 說明當前線程就是h.next節點對應的線程,需要返回false。回頭線程就會去競爭鎖了
return h != t && ((s = h.nx) == null || s.thread != Thread.currentThread());
}
private static final Unsafe UNSAFE;
private static final long STATE_OFFSET;
private static final long HEAD_OFFSET;
private static final long TAIL_OFFSET;
static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
UNSAFE = (Unsafe) f.get(null);
STATE_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("state"));
HEAD_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("head"));
TAIL_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
private boolean compareAndSetHead(Node update) {
return UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, null, update);
}
private boolean compareAndSetTail(Node expect, Node update) {
return UNSAFE.compareAndSwapObject(this, TAIL_OFFSET, expect, update);
}
private boolean compareAndSetState(int expect, int update) {
return UNSAFE.compareAndSwapInt(this, STATE_OFFSET, expect, update);
}
/**
* 阻塞的線程被封裝成node節點,然後放進FIFO隊列
*/
static final class Node {
/**
* 封裝的線程本身
*/
Thread thread;
/**
* 前置節點引用
*/
Node pv;
/**
* 後置節點引用
*/
Node nx;
public Node(Thread thread) {
this.thread = thread;
}
public Node() {
}
}
public int getState() {
return state;
}
private void setHead(Node node) {
this.head = node;
// 當前線程已經是獲取到鎖的線程
node.thread = null;
node.pv = null;
}
public void setState(int state) {
this.state = state;
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
public Node getHead() {
return head;
}
public Node getTail() {
return tail;
}
public void setTail(Node tail) {
this.tail = tail;
}
}