JUC鎖:核心類AQS源碼詳解

来源:https://www.cnblogs.com/knowledgeispower/archive/2022/09/04/16654916.html
-Advertisement-
Play Games

一、結論 首先說結論:深拷貝出來的對象就是完完全全的新對象,不管是對象本身(id),還是對象中包含的子對象,都和原始對象不一樣; 淺拷貝出來的對象就是外新內舊的對象,對象本身(id)和原始對象完全不同,但是子對象和原始對象的子對象是一樣的。 再補充說下賦值,賦值來的對象就是完完全全的原始對象,只是叫 ...


目錄

1 疑點todo和解疑

同步狀態變數:state就是那個共用資源(private volatile int state;) Lock類繼承AQS類並定義lock()、unLock()的方法,表示獲取鎖和釋放鎖。多線程併發訪問同一個lock實例,lock()方法會cas修改state變數,修改成功的線程獲得鎖,其他線程進入AQS隊列等待。

沒有必要!sync隊列是雙向鏈表結構,出隊時,head交替方式,只需要修改head和head後繼2個節點引用關係;固定head,就要修改head,head後繼,以及head後繼的後繼 共3個節點。顯然前者效率更高

不存在的,因為經過判斷得出此時node就是head的後繼。並且必須由這個取消節點node來喚醒後繼,要不node線程結束後,就沒有線程能夠喚醒隊列里的其他節點了。

先說結果:由搶到鎖的那個線程來喚醒!
上述的場景是存在的,例如在非公平鎖模式中,B線程被A線程喚醒,A結束,B成為head,B去執行tryAcquire(),但此時C線程搶占到鎖,B執行tryAcquire()沒有拿到鎖,再次park阻塞。C線程執行結束後將A喚醒

只有將前置節點狀態改為SIGNAL,才能確保當前節點可以被前置unPark喚醒。也就是說阻塞自己前先保證一定能夠被喚醒。因為代碼中:
獨占模式下,喚醒後繼前先限制:h.waitStatus != 0
共用模式下,喚醒後繼前先限制:h.waitStatus=SIGNAL

表示本線程在獲取資源期間,如果被其他線程中斷,本線程不會因為中斷而取消獲取資源,只是將中斷標記傳遞下去。

 When acquired in exclusive mode,
 * attempted acquires by other threads cannot succeed. Shared mode
 * acquires by multiple threads may (but need not) succeed. This class
 * does not "understand" these differences except in the
 * mechanical sense that when a shared mode acquire succeeds, the next
 * waiting thread (if one exists) must also determine whether it can
 * acquire as well. Threads waiting in the different modes share the
 * same FIFO queue.
  1. 共用模式:允許多個線程同時獲取資源;當一個節點的線程獲取共用資源後,需要要通知後繼共用節點的線程,也可以獲取了。共用節點具有傳播性,傳播性的目的也是儘快通知其他等待的線程儘快獲取鎖。
  2. 獨占模式: 只能夠一個線程占有資源,其它嘗試獲取資源的線程將會進入到隊列等待。
  3. 響應中斷並終止線程只要被中斷就不會獲取資源:兩種情況的中斷:1、剛嘗試獲取、2、進入隊列中等待,前者立即停止獲取,後者執行取消邏輯,等待節點變為取消狀態

A、B先後進入隊列,狀態都是0。A獲得資源,進入setHeadAndPropagate晉升為head,A進入doReleaseShared嘗試喚醒B時,但B還沒將A改為signal,因為A還是0,A將狀態改為PROPAGATE

2 AbstractQueuedSynchronizer學習總結

2.1 AQS要點總結

對於AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。

  1. 每一個節點都是由前一個節點喚醒
  2. 當節點發現前驅節點是head並且嘗試獲取成功,則會輪到該線程運行。
  3. condition queue中的節點向sync queue中轉移是通過signal操作完成的。
  4. SIGNAL,表示後面的節點需要運行。
  5. PROPAGATE:就是為了避免線程無法會喚醒的窘境。因為共用鎖會有很多線程獲取到鎖或者釋放鎖,所以有些方法是併發執行的,就會產生很多中間狀態,而PROPAGATE就是為了讓這些中間狀態不影響程式的正常運行。

2.2 細節分析

2.2.1 插入節點時先更新prev再更新前驅next

//addWaiter():
node.prev = pred; // 1 更新node節點的prev域
if (compareAndSetTail(pred, node)) {
    pred.next = node; //2 更新node前驅的next域
    return node;
}
//enq():
node.prev = t; // 1 更新node節點的prev域
if (compareAndSetTail(t, node)) {
    t.next = node;//2 更新node前驅的next域
    return t;
}
//unparkSuccessor():
Node s = node.next; //通過.next來直接獲取到節點的後繼節點,這個節點的後繼的prev一定指向節點本身
      //....
        if (s != null)
            LockSupport.unpark(s.thread);
  1. addWaiter() 或者enq()插入節點時,都是先更新節點的prev域,再更新它前驅的next域。那麼通過node.next()取到的後繼,後繼的prev域一定是指向node本身。如果先更新next域,在更新prev域時出現異常,那麼通過.next取到不是完整的節點
  2. unparkSuccessor()喚醒後繼時,Node s = node.next; 通過.next來獲取node的後繼,後繼的prev一定指向node本身

2.2.2 為什麼unparkSuccessor()要從尾部往前遍歷

因為取消節點的next域指向了自身,所以不能從通過next來遍歷,但prev是完整的,所以通過prev來遍歷。

2.2.3 AQS的設計,儘快喚醒其他等待線程體現在3個地方

  1. 共用鎖的傳播性。
  2. doReleaseShared()中head改變,會迴圈喚醒head的後繼節點。
  3. 線程獲取鎖失敗後入隊列並不會立刻阻塞,而是判斷是否應該阻塞shouldParkAfterFailedAcquire,如果前繼是head,會再給一次機會獲取鎖。

3 AQS 簡介

AQS是一個用來構建鎖和同步器的框架。理論參考:JUC同步器框架

三個基本組件相互協作:

  1. 同步狀態的原子性管理;
  2. 線程的阻塞與喚醒;
  3. 隊列的管理;

同步器一般包含兩種方法,一種是acquire,另一種是release。acquire操作阻塞調用的線程,直到或除非同步狀態允許其繼續執行。而release操作則是通過某種方式改變同步狀態,使得一或多個被acquire阻塞的線程繼續執行。

3.1 AQS核心思想

  1. 如果請求的共用資源空閑,則將當前請求線程設置為有效工作線程,並且將共用資源設置為鎖狀態
  2. 設計一套機制:【線程如何阻塞等待以及被喚醒時鎖如何分配】?這個機制AQS是用CLH隊列鎖實現的
  3. CLH隊列鎖:一個虛擬的雙向隊列,AQS是將每條請求共用資源的線程封裝成一個CLH鎖隊列的一個節點(Node)來實現鎖的分配。【嚴格的FIFO隊列,框架不支持基於優先順序的同步
  4. 使用一個int成員變數來表示同步狀態,使用volatile修飾保證線程可見性,並使用CAS思想進行值維護。

3.2 對資源的共用方式

兩種方式:

  1. Exclusive(獨占):只有一個線程能執行。又可分為公平鎖和非公平鎖:
    1. 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖
    2. 非公平鎖:當線程要獲取鎖時,無視隊列順序直接去搶鎖,誰搶到就是誰的
  2. Share(共用):多個線程可同時執行

3.3 AQS數據結構

分析類,首先就要分析底層採用了何種數據結構,抓住核心點進行分析:

  1. Sync queue,即同步隊列,是雙向鏈表,包括head節點和tail節點,head節點主要用作後續的調度
  2. Condition queue不是必須的,其是一個單向鏈表,只有當使用Condition時,才會存在此單向鏈表。並且可能會有多個Condition queue

4 AbstractQueuedSynchronizer源碼分析

4.1 類的繼承關係

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

繼承自抽象類:AbstractOwnableSynchronizer,父類提供獨占線程的設置與獲取的方法

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private static final long serialVersionUID = 3737899427754241961L;
    protected AbstractOwnableSynchronizer() { }//   構造函數
    private transient Thread exclusiveOwnerThread; //獨占模式下的線程
    // 設置獨占線程 
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    // 獲取獨占線程 
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

4.1.1 AQS需要子類重寫的方法

    protected boolean tryAcquire(int arg) {//獨占方式獲取鎖
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) { //釋放獨占的鎖
        throw new UnsupportedOperationException();
    }
    protected int tryAcquireShared(int arg) { //以共用方式獲取鎖
        throw new UnsupportedOperationException();
    }
    protected boolean tryReleaseShared(int arg) {//釋放共用鎖
        throw new UnsupportedOperationException();
    }
    protected boolean isHeldExclusively() {//是否獨占資源
        throw new UnsupportedOperationException();
    }

關於重寫說明
目的是將共用資源state的讀寫交給子類管理,AQS專註在隊列的維護以及線程的阻塞與喚醒

4.2 類的常量/成員變數

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {    
    private static final long serialVersionUID = 7373984972572414691L;    
    // 頭節點
    private transient volatile Node head;    
    // 尾節點
    private transient volatile Node tail;    
    //0:表示沒有線程獲取到鎖;1表示有線程獲取到鎖;大於1:表示有線程獲得了鎖,且允許重入
    private volatile int state;    
    // 自旋時間
    static final long spinForTimeoutThreshold = 1000L;
    
    // 以下跟cas有關
    private static final Unsafe unsafe = Unsafe.getUnsafe();  // Unsafe類實例
    private static final long stateOffset; // state記憶體偏移地址
    private static final long headOffset; // head記憶體偏移地址
    private static final long tailOffset; // state記憶體偏移地址
    private static final long waitStatusOffset;// tail記憶體偏移地址
    private static final long nextOffset; // next記憶體偏移地址
    // 靜態初始化塊
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
        } catch (Exception ex) { throw new Error(ex); }
    }
}

說明:

  1. 屬性中包含了頭節點head,尾節點tail,狀態state、自旋時間spinForTimeoutThreshold
  2. AbstractQueuedSynchronizer抽象的屬性在記憶體中的偏移地址,通過該偏移地址,可以獲取和設置該屬性的值
  3. 同時還包括一個靜態初始化塊,用於載入記憶體偏移地址。

4.3 靜態內部類Node

線程封裝成Node並具備狀態

static final class Node
{
	// 模式,分為共用與獨占
	static final Node SHARED = new Node();// 共用模式
	static final Node EXCLUSIVE = null; // 獨占模式
	// 節點狀態
	static final int CANCELLED =  1;//表示當前的線程被取消
	static final int SIGNAL    = -1;//表示當前節點的後繼節點包含的線程需要被運行【被unpark】,
	static final int CONDITION = -2;//表示當前節點在等待condition,也就是在condition隊列中
	static final int PROPAGATE = -3;//表示當前場景下後續的acquireShared能夠得以執行
	volatile int waitStatus;//節點狀態;表示當前節點在sync隊列中,等待著獲取鎖

	volatile Node prev; // 指向當前節點的前驅
	volatile Node next;// 指向當前節點的後繼
	volatile Thread thread;//節點所對應的線程
	Node nextWaiter;// 下一個等待者    只跟condition有關
    private transient volatile Node head; // 頭節點  懶載入
    private transient volatile Node tail; //尾節點  懶載入
    private volatile int state;  // 同步狀態

	// 節點是否在共用模式下等待
	final boolean isShared() {
	    return nextWaiter == SHARED;
	}
	// 獲取前驅節點,若前驅節點為空,拋出異常
	final Node predecessor() throws NullPointerException {
	    Node p = prev;// 保存前驅節點
	    if (p == null)
	        throw new NullPointerException();
	    else
	        return p;
	}
	// 無參構造函數
	Node() { // Used to establish initial head or SHARED marker
	}
	// 構造函數
	Node(Thread thread, Node mode) {	// Used by addWaiter
	    this.nextWaiter = mode;
	    this.thread = thread;
	}

	Node(Thread thread, int waitStatus) { // Used by Condition
	    this.waitStatus = waitStatus;
	    this.thread = thread;
	}
}

關於Node說明

每個被阻塞的線程都會被封裝成一個Node節點,放入隊列。Node包含了一個Thread類型的引用,並且有自己的狀態:

  • CANCELLED:1,表示當前的線程被取消。
  • SIGNAL:-1,表示負責unPark後繼【由前一個節點unPark下一個節點】。
  • CONDITION:-2,表示當前節點在等待condition,也就是在condition queue中。
  • PROPAGATE:-3,表示當前場景下後續的acquireShared能夠得以執行。
  • 預設值:0,發生在:1、節點加入到隊列成為tail節點,2、節點成為head,並準備喚醒後繼

4.4 構造函數

protected AbstractQueuedSynchronizer() { }    //預設的無參構造

4.5 核心方法分析

4.5.1 核心方法概覽

public final void acquireShared(int arg) {...} // 獲取共用資源的入口(忽略中斷)
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void doAcquireShared(int arg) {...} // AQS中獲取共用資源流程整合
private Node addWaiter(Node mode){...} // 將node加入到同步隊列的尾部
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void setHeadAndPropagate(Node node, int propagate) {...} // 設置 同步隊列的head節點,以及觸發"傳播"操作
private void doReleaseShared() {...} // 遍歷同步隊列,調整節點狀態,喚醒待申請節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {...} // 如果獲取資源失敗,則整理隊中節點狀態,並判斷是否要將線程掛起
private final boolean parkAndCheckInterrupt() {...} // 將線程掛起,併在掛起被喚醒後檢查是否要中斷線程(返回是否中斷)
private void cancelAcquire(Node node) {...} // 取消當前節點獲取資源,將其從同步隊列中移除

4.5.2 acquire()方法

該函數以獨占模式獲取(資源),忽略中斷
流程如下:

源碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt(); //來到這裡,表示線程拿到鎖,並且讀取到線程的中斷標識為true,調用selfInterrupt()來恢複線程的interrupted中斷標誌(被parkAndCheckInterrupt()擦除了,所以再設置一次)。
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();//線程設置interrupted中斷標誌
}
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

acquire()總結

  1. 先調用tryAcquire(),由子類實現來嘗試加鎖,如果獲取到鎖,則線程繼續執行;反則,節點加入隊列
  2. 調用addWaiter(),將調用線程封裝成為一個節點並放入AQS隊列。
  3. 調用acquireQueued(),先park阻塞等待,直到被unPark喚醒。
  4. 如果線程被設置中斷,那麼acquire結束前,需要重新設置中斷。

4.5.3 addWaiter()方法

addWaiter:快速添加的方式往sync queue尾部添加節點

// 添加等待者
    private Node addWaiter(Node mode) {
        // 新生成一個節點
        Node node = new Node(Thread.currentThread(), mode);
        // 創建臨時引用pred,跟tail指向相同地址
        Node pred = tail;
        if (pred != null) { // 尾節點不為空,即隊列已經初始化過
            // 將node的prev域連接到尾節點
            node.prev = pred; 
            if (compareAndSetTail(pred, node)) { // cas更新tail,指向新創建的node
                // 設置尾節點的next域為node
                pred.next = node;  // 結合 node.prev = pred;  形成雙向鏈表
                return node; // 返回新生成的節點
            }
        }
        enq(node); // 隊列還未初始化,或者是compareAndSetTail操作失敗,則進入enq
        return node;
    }
    //關於併發情景說明:
    // 從 Node pred = tail;  到 compareAndSetTail(pred, node); 期間,隊列可能插入了新的節點,pred指向的不是最新的tail,那麼compareAndSetTail(pred, node) 就會執行失敗,同時 node.prev = pred; node的前驅也不是最新的tail。
    // 通過enq()來解決併發問題,enq()通過自旋+cas來保證線程安全

addWaiter()說明

  1. 使用快速添加的方式(失敗不重試)創建新節點並添加到往隊列尾部,更新tail
  2. 如果隊列還沒有初始化或者cas失敗,則調用enq()插入隊列

4.5.4 enq()方法

    // 線程安全地創建隊列、或者將節點插入隊列、
    private Node enq(final Node node) {
        for (;;) { // 自旋+cas,確保節點能夠成功入隊列
            Node t = tail;//尾節點
            if (t == null) { // 尾節點為空,即還沒被初始化
                if (compareAndSetHead(new Node())) // 設置head。 !!!!註意,這裡是new node,沒有使用參數的node,因此head節點不引用任何線程
                    tail = head; // 頭節點與尾節點都指向同一個新生節點。迴圈繼續,進入else後,參數node將插入到隊列
            } else { // 尾節點不為空,即已經被初始化過
                node.prev = t;  // 將node節點的prev域連接到尾節點
                if (compareAndSetTail(t, node)) { // 比較更新tail,node成為新的tail
                    // 設置尾節點的next域為node
                    t.next = node;   // 結合 node.prev = t;   形成雙向鏈表
                    return t; // 返回Node的前驅節點
                }
            }
        }
    }
    
    //CAS head field. Used only by enq.
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    //CAS head field. Used only by enq.
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

enq()方法總結:

  1. 功能:cas+自旋方式將節點插入隊列
  2. 如果隊列未初始化,先創建頭節點head(head不指向任務線程),再將節點插入到隊列(當第一個節點被創建後,隊列實際有兩個節點:head+業務節點)。
  3. 如果隊列已經初始化,則直接插入隊列

4.5.5 acquireQueue()方法

作用:sync隊列中的節點在獨占且忽略中斷的模式下獲取(資源)

源碼如下:

// sync隊列中的節點在獨占且忽略中斷的模式下獲取(資源):
    final boolean acquireQueued(final Node node, int arg) {
        // 標誌
        boolean failed = true;
        try {
            // 中斷標識。如果線程喚醒後,中斷標識是true,外層的acquire()將進入selfInterrupt()。
            boolean interrupted = false;
            // 無限迴圈 :如果前驅不是head,那線程將park阻塞,等待前面的節點依次執行,直到被unPark喚醒
            for (;;) {
                // 獲取node的前驅,如果前驅是head,則表明前面已經沒有線程等待了,該線程可能成為工作線程
                final Node p = node.predecessor(); 
                // 前驅為頭節點並且成功獲得鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node); // node晉升為head
                    p.next = null; // 舊head的next域指向null,將會被GC,移出隊列
                   
                   failed = false; // 設置標誌
                    return interrupted; //拿到鎖,break迴圈,並返回中斷標識
                }
                //執行到這裡,前驅非head 或者 前驅是head但獲取鎖失敗,那麼:1、將前驅狀態改為signal 2、當前線程unPark阻塞
                //shouldParkAfterFailedAcquire():尋找非取消狀態的前驅,如果狀態為signal返回true 反則,將前驅狀態改為signal、再返回false
                //前驅是signal ,執行parkAndCheckInterrupt()後,當前線程park阻塞。一直到線程被unPark喚醒,再返回線程的中斷狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//parkAndCheckInterrupt返回true表明線程中斷狀態為true
                   //上面if同時成立,才會執行。
                   interrupted = true;  //那麼把中斷標識置為true
            }
        } finally { //(有異常,在拋出之前執行finally;沒有異常,在return之前執行finally)
            if (failed)//只有try的代碼塊出現異常,failed才會是true。什麼情景會產生異常?cancelAcquire分析時有說明
                cancelAcquire(node); //執行取消邏輯
        }
    }
    
    final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
      private void setHead(Node node) {
        head = node;
        node.thread = null;//再次表明head的thread屬性是空的
        node.prev = null;
    }

acquireQueue()總結

  1. 功能:節點進入AQS隊列後,先park阻塞等待,直到被unPark喚醒,或者中斷喚醒
  2. 找到非取消狀態的前驅(取消狀態的將會被移出隊列並GC),如果前驅是SIGNAL,那麼當前節點進入park阻塞,否則,先將前驅改為SIGNAL,再進入park阻塞。
  3. 被unPark喚醒後,判斷前驅是頭節點且獲取到資源(tryAcquire成功),當前節點晉升為頭節點。自此,線程獲取到鎖
  4. 調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt函數,表明只有當該節點的前驅節點的狀態為SIGNAL時,才可以對該節點所封裝的線程進行park操作。

4.5.6 shouldParkAfterFailedAcquire()方法

// 當獲取(資源)失敗後:1、判斷能否將當前線程park;2、修改前驅節點狀態為signal
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 獲取前驅節點的狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 狀態為SIGNAL
            // 只有當前驅節點為 signal時,才返回true ,表示當前線程可以安全地park阻塞;其它情況返回false
            return true; 
            //跳過那些CANCELLED狀態的前驅
        if (ws > 0) { // 表示狀態為CANCELLED,為1 
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); // 找到pred節點前面最近的一個狀態不為CANCELLED的節點;然後跳出迴圈並返回false
            pred.next = node; 
        } else { // 為PROPAGATE -3 或者是0 ,(為CONDITION -2時,表示此節點在condition queue中) 
             // cas更新前驅的狀態為SIGNAL.如果前驅是頭節點,那麼頭節點ws=SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
        }
        // 不能進行park操作
        return false;
    }
    //CAS waitStatus field of a node.
    private static final boolean compareAndSetWaitStatus(Node node,int expect,  int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }

shouldParkAfterFailedAcquire()總結:

  1. 如果前驅狀態是:SIGNAL,返回true。表示當前節點可以安全地unPark()阻塞
  2. 遇到取消的前驅節點,則跳過。這些被取消的節點會從隊列中移除並GC
  3. 如果前驅狀態不是:SIGNAL,將前驅狀態改為:SIGNAL,返回false,回到1 繼續

4.5.7 parkAndCheckInterrupt()方法

// 進行park操作並且返回該線程的中斷標識
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); //外面的for迴圈可能會導致多次park,不過沒關係,park允許多次執行
       //被喚醒之後,返回中斷標記,即如果是正常喚醒則返回false,如果是由於中斷醒來,就返回true
        return Thread.interrupted(); // acquireQueued() 中聲明的interrupted 將會被更新為這裡的返回結果
    }
    public static boolean interrupted() {
        return currentThread().isInterrupted(true);//返回當前線程interrupted中斷標記,同時會清除此interrupted標記
    }

方法總結:

  1. 執行park操作(前提:前驅狀態是SIGNAL),在隊列中阻塞等待。
  2. 被unPark()喚醒後,返回線程的interrupted中斷標識,並且清除interrupted標記

4.5.8 cancelAcquire()方法

什麼時候才會執行cancelAcquire?

在lockInterruptibly()會通過拋出中斷異常來執行cancelAcquire方法,lock方法過程中則不會執行該代碼,作者這麼些的意圖在於for迴圈內部如果出現不可控的因素導致產生未知的異常,則會執行cancelAcquire,很明顯這屬於一種相對偏保守的保險代碼。
// 取消獲取鎖
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null) // node為空,返回
            return;
        node.thread = null;// thread置空 備註1
        // Skip cancelled predecessors
        Node pred = node.prev;// pred表示:最靠近node並且狀態不等於取消的前驅節點
        while (pred.waitStatus > 0) 
            node.prev = pred = pred.prev; //更新pred,往列頭推進 
            
        Node predNext = pred.next; //predNext表示:pred的後繼
        // 設置node節點的狀態為CANCELLED
        node.waitStatus = Node.CANCELLED; //備註2
        if (node == tail && compareAndSetTail(node, pred)) { // 若node節點為尾節點,則pred成為尾節點  備註3
            // pred的next域置為null
            compareAndSetNext(pred, predNext, null);
        } else { // 2、node節點不為尾節點,或者比較設置不成功
            int ws;
            //下麵一串判斷,最終目標:在node移除隊列前,將有效的前驅節點狀態改為signal
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) { 
                // pred節點不為頭節點,並且
                    //pred節點的狀態為SIGNAL)或者 
                      // pred節點狀態小於等於0,並且比較並設置等待狀態為SIGNAL成功,並且pred節點所封裝的線程不為空
                Node next = node.next;
                if (next != null && next.waitStatus <= 0) // 後繼不為空並且後繼的狀態小於等於0
                    compareAndSetNext(pred, predNext, next); // 比較並設置pred.next = next;  到這裡:node的前驅節點指向node的後繼節點。 備註4
            } else {
            // 這裡,pred==head (3、即node是head的後繼)或者pred.status=0,-2時 【前面while (pred.waitStatus > 0) 已經限制了pred一定是<=0】,執行:
                unparkSuccessor(node); // 喚醒node的後繼
            }

            node.next = node; // help GC  後繼節點指向自身  備註5
        }
    }
    //修改參數node的next域
     private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

對cancelAcquire()總結之前,先明確以下兩點:

  1. 基於對acquire()方法的分析,調用鏈:addWait()->enq()->acquireQueue()->cancelAcquire(node),進入到cancelAcquire()時,節點node一定已經在隊列中,而且它不會是head,並且沒有持有鎖
  2. AQS通過管理這些屬性:waitStatus、thread、prev、next、head、tail、nextWaiter ,成為一個虛擬的列隊。

cancelAcquire(node)總結

cancelAcquire()負責將node移出隊列,並保持隊列中其他節點的順序關係不變,它做了以下工作:

  • waitStatus更新為cancel (備註2)
  • thread更新為null(備註1)
  • tail:如果node是尾節點,更新tail引用 (備註3)
  • head:不需要更新(node不會是head)
  • prev:沒有更新
  • next: node前置的next域更新指向node後繼,並且node的next指向了自身 (備註4、備註5)
  • nextWaiter:不需要更新(跟condition有關,這裡不涉及)

執行cancelAcquire後,隊列變成這樣的:

發現:

  1. node沒有移出隊列,因為被後繼的prev所引用。
  2. node.next變了,指向了自身,這就能解釋為什麼unparkSuccessor()是從後往前遍歷:因為取消節點的next域指向了自身,所以不能從通過next來遍歷,但prev是完整的,所以通過prev來遍歷。
  3. 取消節點,暫存在隊列中,當後繼節點被喚醒,執行shouldParkAfterFailedAcquire後,取消節點的引用鏈清空,移出隊列,最後GC回收

4.5.9 unparkSuccessor()方法

    // 喚醒node節點的後繼
    private void unparkSuccessor(Node node) {
       
        // 獲取node節點的等待狀態
        int ws = node.waitStatus;
        if (ws < 0) // 狀態值小於0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
            // cas節點狀態為0
            compareAndSetWaitStatus(node, ws, 0);//如果head沒有後繼的情況下,狀態會一直=0
        
        Node s = node.next;
        //若後繼為空,或後繼已取消,則從尾部往前遍歷 找到最靠近的一個處於正常阻塞狀態的節點進行喚醒
        // 什麼時候s==null ?   node的後繼節點是取消狀態時,node.next為null
        if (s == null || s.waitStatus > 0) { 
            s = null; 
            // 由尾節點向前倒著遍歷隊列,但不會超過node節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) 
                    s = t; 
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒s節點線程
    }

unparkSuccessor()總結

  1. 作用:找到有效的後繼節點unPark喚醒
  2. 尋找有效後繼時從尾往前倒著遍歷:因為取消節點的next域指向了自身,所以不能從通過next來遍歷
  3. 將發起unPark喚醒的節點(只能是head)狀態改為0(意味著在head喚醒後繼,到被後繼推出隊列的期間,狀態變為0)

4.5.10 release()方法

以獨占模式釋放對象,其源碼如下:

public final boolean release(int arg) {
        if (tryRelease(arg)) { //如果釋放鎖成功
            Node h = head; 
            // 線程A調用acquire()獲取到鎖之後,A線程節點變為head,然後A調用release 釋放鎖,存在兩種情況:
            // 1、 如果有新的線程B入隊,B成為後繼節點,B會將A狀態改為SIGNAL,那麼(h != null && h.waitStatus != 0 )成立,unparkSuccessor()喚醒後繼節點
            // 2、如果A後面沒有節點,A狀態是預設值:0 ,那麼h.waitStatus != 0 不成立,直接返回true,不需要喚醒後繼節點。
            if (h != null && h.waitStatus != 0) // 頭節點不為空並且頭節點狀態不為0
                unparkSuccessor(h); //由head喚醒後繼節點
            return true;
        }
        return false;
    }

release()總結:

  1. 功能:釋放獨占鎖
  2. 先調用tryRelease()由子類實現釋放鎖
  3. 如果釋放鎖成功,然後unPark喚醒後繼節點(沒有後繼就不需要喚醒)

4.5.11 acquireSharedInterruptibly()方法

   //獲取共用資源,響應中斷
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) //讀取線程中斷標記,然後擦除標記
            throw new InterruptedException(); //中斷標記為true,拋出中斷異常,停止執行
        if (tryAcquireShared(arg) < 0)  //調用子類實現方法 獲取資源
            doAcquireSharedInterruptibly(arg); //沒有獲取到,那麼再嘗試獲取(進入隊列排隊等待)
    }

獲取共用資源流程圖:

acquireSharedInterruptibly()總結

  1. 共用模式獲取對象,響應中斷並終止獲取
  2. 先調用子類實現獲取資源,沒有獲取到再加隊隊列等待。

4.5.12 doAcquireSharedInterruptibly()方法

//獲取共用資源,響應中斷
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //增加等待節點
        boolean failed = true;
        try {
            for (;;) {//無限迴圈,直到r>0
                final Node p = node.predecessor(); // p表示 剛插入節點的前驅
               //1、如果前驅是head
               if (p == head) {
                    int r = tryAcquireShared(arg);//調用子類實現方法 嘗試獲取共用資源
                    if (r >= 0) { // >0 表示 獲取到資源
                    // 1、如果是ReentrantReadWriteLock、CountDownLatch ,有可能r=1
                    // 2、如果是Semaphore,有可能r=0
                    // 1、2 都調用setHeadAndPropagate進行共用傳播判斷
                        setHeadAndPropagate(node, r);// 更新head併進行共用傳播
                        p.next = null; // 將隊列頭節點的next域置空,之後,這個節點將被GC回收
                        failed = false;
                        return;
                    }
                }
                // 2、前驅不是head
                //線程park阻塞,直至被unPark喚醒,或者被其它線程中斷喚醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); //進入這裡表示線程中斷標記為true,那麼拋出中斷異常
            }
        } finally {
            if (failed) //當try 代碼塊有異常:中斷異常 或 其他未知異常,failed才是true
                cancelAcquire(node);//取消獲取資源
        }
    }

doAcquireSharedInterruptibly()總結

  1. 創建節點並插入aqs隊列,將前驅狀態改為signal,park阻塞,等待unPark喚醒。
  2. 正常喚醒後,無限迴圈直到前驅是head並且調用子類方法獲取共用資源成功,調用setHeadAndPropagate()成為head併進行共用傳播
  3. 被中斷喚醒、或者迴圈等待過程發生中斷異常,執行cancelAcquire()取消獲取資源

4.5.13 setHeadAndPropagate()方法

setHeadAndPropagate在獲取共用資源的時候被調用

// 設置 同步隊列的head節點,以及觸發"傳播"操作
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 記錄更新前的head
        setHead(node); //參數node 成為新的head
         //判斷:
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next; //獲取node後繼
            //後繼為空或者後繼是等待共用資源的節點
            if (s == null || s.isShared()) 
                doReleaseShared(); //釋放共用資源
        }
    }

滿足調用doReleaseShared的條件分析

  1. propagate > 0
    ReentrantReadWriteLock、CountDownLatch 調用tryAcquireShared()返回1進入,滿足條件;Semaphore 進入,propagate可能等於0,不滿足,繼續2

  2. h == null
    h == null 表示舊head變為null,程式沒有地方設置head=null,並且這裡h引用著head意味著head不會被GC。 因此,h == null不滿足條件,繼續3 【不知道哪種情況下h==null todo】

  3. h.waitStatus < 0

  • h.waitStatus==1:取消,不能由取消節點喚醒後繼,不滿足條件

setHeadAndPropagate()總結:

  1. 方法功能:設置 同步隊列的head節點,以及觸發"傳播"操作:
  2. 如果head的後繼是共用類型節點或者為null,調用doReleaseShared()來喚醒後繼

4.5.14 doReleaseShared()方法

//遍歷同步隊列,調整節點狀態,喚醒待申請節點
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //1、head 不等於 tail 且不等於 null
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {  //如果head狀態為signal ,cas修改為0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); //喚醒後繼
                }
                //如果節點的後繼還沒有將其前驅改為signal,這裡ws==0是成立的
                else if (ws == 0 &&  //如果head狀態為0,cas修改為propagate 
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               //  如果在int ws = h.waitStatus; 之後,後繼將head節點改為signal,那麼cas失敗,continue繼續迴圈後, if (ws == Node.SIGNAL) 滿足,那麼將會喚醒後繼。
            }
           // 只有head沒有發生變化,迴圈才會結束,若head改變,繼續迴圈  
            if (h == head)                   // loop if head changed
                break;
        }
    }

doReleaseShared()總結

  1. 如果頭節點狀態為signal,那麼CAS更新頭節點狀態為0,成功則調用unparkSuccessor()喚醒後繼,失敗則重試
  2. 如果頭節點狀態為0,那麼將CAS更新頭節點狀態為PROPAGTATE ,失敗則重試。
  3. 最後如果判斷head是否發生變化,有變化則重覆1、2,沒有變化則方法結束。
  4. PROPAGTATE狀態的意義是,增加一個狀態判斷,當前驅獲取資源,後繼同時也有機會獲取到資源

4.5.15 releaseShared()方法

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

releaseShared()方法總結

  1. 調用子類的實現方法tryReleaseShared()釋放n個共用資源,釋放成功則繼續調用doReleaseShared()來喚醒隊列中的等待節點

5 取消節點移出鏈表分析

有兩種情景,會將取消節點徹底移出鏈表:

  1. 頭節點unPark喚醒後繼時,後繼節點喚醒後重新進入shouldParkAfterFailedAcquire()
  2. 取消節點後面有新節點入列時,新節點執行shouldParkAfterFailedAcquire()

以第一個情景為例子分析:

6 在shared模式中為什麼需要PROPAGATE狀態

結論:在前驅節點獲取資源時,後繼也能夠有機會申請資源,不需要等待前驅通過releaseShare()來喚醒
分析如下:

1:A B 先後進入隊列
 2:A被喚醒,獲得資源,調用setHeadAndPropagate(),晉升為head
 3、B調用shouldParkAfterFailedAcquire(),嘗試將A狀態改為signal但未執行
 	4、A進入doReleaseShared(),A狀態等於0(3還沒執行),進入ws == 0 分支處理。
 		5、此時3執行完成,B將A的狀態改為signal,然後B park阻塞
 		6、A執行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)失敗,continue繼續
 			7、A進入(ws == Node.SIGNAL)分支,執行compareAndSetWaitStatus(h, Node.SIGNAL, 0)成功,然後再執行unparkSuccessor(),將B喚醒。
 				8、A將B喚醒後,A去執行拿到資源後的操作,B也成功拿到資源並執行。
 				因為步驟6的continue,B不需要等待A執行releaseShare()被喚醒,在A獲取到資源時同時B也能快速獲取到資源,A、B可以同時執行獲得資源後的任務


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 代理模式、適配器模式與裝飾器模式 這三種設計模式在代碼形式上十分相似。但是為瞭解決不同的問題而提出的: 代理模式 代理模式在不改變原始介面的條件下,為原始類控制訪問、新增一些業務無關的功能。 適配器模式 將不相容的介面轉換為可相容的介面。(一種以修正為目的的設計模式)。 裝飾器模式 裝飾器模式是對原 ...
  • SaaS產品就像一座冰山,冰山以上的部分是功能、數據(可見部分)、用戶界面,冰山以下是系統架構、完整的數據模型、開放體系、非功能性需求(擴展性、可維護性、性能、安全等)。 短期內想要快速上線產品,可能只需關註冰山以上的部分就夠了,但是SaaS公司想要在市場上建立長期的競爭優勢,比拼的一定是冰山以下的 ...
  • 以下內容為本人的著作,如需要轉載,請聲明原文鏈接 微信公眾號「englyf」https://www.cnblogs.com/englyf/p/16656222.html 大小端存儲的劃分是為瞭解決長度大於一個位元組的數據類型內容在存儲地址上以不同順序分佈的問題。 比如16位的short整形,32位的i ...
  • 線程基礎02 3.繼承Thread和實現Runnable的區別 從java的設計來看,通過繼承Thread或者實現Runnable介面本身來創建線程本質上沒有區別,從jdk幫助文檔我們可以看到Thread類本身就實現了Runnable介面 實現Runnable介面方式更加適合多個線程共用一個資源的情 ...
  • 內置函數 # 1.abs函數 print(abs(-1)) # 絕對值方法 # 2.all函數 print(all([1,'aaa',''])) # false print(all([])) # true #all方法裡面是一個可迭代對象,all會自動將這個對象for迴圈一下,如果所有的值為true ...
  • 簡介 本項目是在newbee-mall項目的基礎上改造而來, 使用mybatis-plus,集成RedisSearch作為商城搜索中間件,商城首頁集成tianai-captcha作為滑塊驗證碼,還添加了高級秒殺、優惠劵以及完善可用的後臺全部功能,喜歡的話麻煩給我個star 商城集成RedisSear ...
  • 目錄 一.OpenGL ES圖像對比度調節 1.原始圖片 2.效果演示 二.OpenGL ES 圖像對比度調節源碼下載 三.猜你喜歡 零基礎 OpenGL ES 學習路線推薦 : OpenGL ES 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL ES 學習路線推薦 : OpenGL ...
  • 函數 定義 # 定義函數 def fn(): print("這是函數內部") # 調用 fn() fn() # 區分 fn: 這是真正意義上的函數本身 fn(): 這是調用函數 參數 形參 實參 函數參數可有可無,要看具體使用 形參是定義函數的參數 實參就是調用函數時需要傳遞的參數,函數有多少個形參 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...