Java.concurrent.locks(1)-AQS

来源:http://www.cnblogs.com/maxmys/archive/2016/02/04/5181775.html
-Advertisement-
Play Games

本文是一個jdk.locks系列主題的頭篇,總體介紹JDK中Lock底層框架以及JDK中藉助該框架實現的各種同步手段。瞭解JDK基本的併發與同步的實現,對java併發編程更得心應手!


AbstractQueuedSynchronizer

@(Base)[JDK, locks, ReentrantLock, AbstractQueuedSynchronizer, AQS]

轉載請寫明:原文地址

Synchronize 最開始JDK只支持synchronize關鍵字來提供方法級別、同步塊級別的同步。後續版本中提供了java.utils.concurrent.locks包,其中包括可重入鎖,讀寫鎖,信號量,Condition等等,都是基於一個基本的等待隊列抽象完成的,在JDK的文檔中,這個抽象的隊列框架被稱為AQS同步框架。

我們需要關註的類包括: AbstractOwnableSynchronizer , AbstractQueuedSynchronizer 其餘的包括,可重入鎖,讀寫鎖,Condition,信號量都是通過上述兩個抽象基類實現。

整個concurrent包源自於JSR-166,其作者就是大名鼎鼎的Doug Lea,說他是這個世界上對Java影響力最大的個人,一點也不為過。因為兩次Java歷史上的大變革,他都間接或直接的扮演了舉足輕重的角色。一次是由JDK 1.1到JDK 1.2,JDK1.2很重要的一項新創舉就是Collections,其Collections的概念可以說承襲自Doug Lea於1995年發佈的第一個被廣泛應用的collections;一次是2004年所推出的Tiger。Tiger廣納了15項JSRs(Java Specification Requests)的語法及標準,其中一項便是JSR-166。

What is Lock?

Lock 我們首先看wikipedia的定義:

In computer science, a lock or mutex (from mutual exclusion) is a synchronization mechanism for enforcing limits on access to a resource in an environment where there are many threads of execution. A lock is designed to enforce a mutual exclusion concurrency control policy.

在電腦科學中,鎖或者互斥量是一種同步機制,用於限制共用資源的在多線程環境下訪問。其次值得註意的是,不論是任何併發模型,其都是硬體的併發模型的抽象(進程線程模型),其只能更大程度的避免過度同步,以及提供了合理的抽象讓你更容易在多線程環境下編程。而非使用了某種併發模型之後,你的程式就Lock-Free了。你看到的只是在語言層次的抽象上是無鎖的,例如Actor-Model。

然後我們從使用的角度來看,一個鎖對象需要具備什麼功能。我們首先定義一個鎖對象,叫做XLock(這是生造的一個概念)。如下代碼大家應該經常看到。

private XLock lock = new Xlock();
public void someMethod() {
    lock.lock();
    try {
       // do something
    } finally {
        lock.unlock();
    }
}

我們從表現形式上來看,lock.lock()試著獲取鎖,如果有人占用的話就“等著”,直到獲取的人釋放鎖。lock.unlock()寫在final塊中,意味著,無論是否異常,都需要釋放鎖。

其實,這個裡面唯一神秘的概念就是“等著”。什麼叫等著呢?如何實現等著呢?不明白的同學可能以為什麼JVM會調度什麼的。其實在電腦中,根本就沒有任何神奇的事情。下麵我們來解釋如何“等著”。

首先回一下操作系統的mutex是如何實現的:

lock: 
    if(mutex > 0){ 
        mutex = 0; 
        return 0; 
    } else 
         掛起等待; 
    goto lock;
        
unlock: 
    mutex = 1; 
    喚醒等待Mutex的線程; 
    return 0;

上述代碼有一個小問題。例如lockif可能兩個線程都同時進入,如果兩個線程同時調用lock,這時Mutex是1,兩個線程都判斷mutex>0成立,然後其中一個線程置mutex=0,而另一個線程並不知道這一情況,也置mutex=0,於是兩個線程都以為自己獲得了鎖。

這裡就需要註意的是,為了實現一個原子操作,必須要在CPU指令級別上支持才可以。在程式員看來就是需要一個原子的彙編指令。最早是交換記憶體地址和寄存器/記憶體與記憶體/寄存器與寄存器,這裡稱作swap,例如x86的xchgb指令,相當於抽象成setAndget()指令,使用方法如下代碼:

lock: 
    movb $0, %al 
    xchgb %al, mutex 
    if(al寄存器的內容 > 0){ 
        return 0; 
    } else 
        掛起等待; 
    goto lock;
        
unlock: 
    movb $1, mutex 
    喚醒等待Mutex的線程; 
    return 0;

另外值得註意的就是如何掛起和喚醒線程。在C語言中,我們可以直接調用系統API操作某個線程的狀態,在內核中PC執行到掛起指令之後,就保存線程堆棧,直接切換。在調用系統API之前,我們需要記錄已經掛起了哪些線程,也就是需要一個等待隊列記錄當前有多少線程掛起在這個lock上。當持有lock的線程執行完畢之後,通知等待隊列的第一個線程。問題的關鍵在於如何通知,當然還是系統API,直接把線程的狀態設置為可以執行即可,等待下一次線程調度的時候,該線程就會繼續執行。

在上述代碼中值得註意的是goto lock這個語句。

根據上述說明,我們簡單翻譯成java的實現如下:

private AtomicBoolean on = new AtomicBoolean(false);
private Queue waiting = new Queue();
public void lock() {
    while (true) {
        if (on.compareAndSet(false, true)) { // get lock succeed
            return;
        }
        waiting.add(Thread.currentThread()).
        unsafe.park(Thread.currentThread()).
    }
}

在上述代碼中我們有三個比較特殊的點:

  • 原子變數的操作
  • 等待隊列
  • unsafe對象的park方法。

AtomicBoolean 底層實現也是需要CPU支持,是一個CPS操作。用來解決上述C偽碼中的只能允許一個線程進入臨界區問題,參考上述的xchgb操作。

Waiting Queue 我們通過一個普通的數組或者鏈表即可實現。

Unsafe Park Unsafe這個類是用於JDK類庫使用的一個JNI調用。讀者可以暫時理解成系統調用的一種抽象。park()方法會立即掛起某個線程,PC(程式計數器)保留在原來的位置。

當我們看完上述內容之後,我們明白了幾個實現鎖的重要步驟:第一,我們需要一個state來描述鎖的狀態(參考上述的原子變數)。第二,我們需要一個隊列的數據結構。好了,到了這裡有了基本的概念之後,我們再看在Java中各種鎖是怎麼實現的。

AbstractOwnableSynchronizer

可獨占的同步器

A synchronizer that may be exclusively owned by a thread. This class provides a basis for creating locks and related synchronizers that may entail a notion of ownership. The AbstractOwnableSynchronizer class itself does not manage or use this information. However, subclasses and tools may use appropriately maintained values to help control and monitor access and provide diagnostics.

這個類作用就是提供了一種“獨占”模式的基本實現,其代碼非常簡單:

public abstract class AbstractOwnableSynchronizer {
    /** The current owner of exclusive mode synchronization. */
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
} 

通過一個私有域來保存當前占用這個“同步器”的線程(可認為是一個標誌位),並且提供了get,set方法來改變這個獨占的標誌位。這裡並沒有使用任何同步的手段,因為只是個基類啦。

transient關鍵字是對java的serialization機制起作用的,意義是不要序列化這個欄位。這裡我們可以忽略,因為我也暫時想不到任何場景需要serialize一個同步器的場景。
獨占的含義這裡暫時不做解釋,在之後的讀寫鎖部分會詳細說明。理解成mysql中的X鎖和S鎖就行了。

AbstractQueuedSynchronizer.acquire()

隊列同步器框架 這個類就是所有的鎖的精華所在。為了避免大家在閱讀過程中的這個抽象類提供了5個方法需要子類繼承,即可完成某種意義上的鎖,如下:

tryAcquire();
tryRelease();
tryAcquireShared();  // 暫時不用關註
tryReleaseShared();  // 暫時不用關註
isHeldExclusively(); //暫時不用關註

看到這裡可能一頭霧水,其實在AbstractQueuedSynchronizer中已經規定出來了獲取鎖的步驟,和提供了基本的等待隊列的實現。我們只需要實現每一個方法,即可完成一個同步器。

首先我們還是從,上文中提到的Lock.lock()語義的方法對應的就是AbstractQueuedSynchronizer的acquire(int)函數開始入手:

 public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
 }

翻譯一下上述的代碼:

  • 調用子類的tryAcquire方法,如果返回成功就說明獲取鎖成功
  • 如果返回失敗,則入隊列。

interrupt的處理我們暫時忽略,這是java併發編程中的另外一個要點

其實對照我們上文中給出的java的lock的範例,tryAcquire函數其實就是compareAndSet,而acquireQueued()其實就是入隊並且掛起線程的操作。

所以說如果一個子類想要實現,tryAcquire裡面應該是原子的改變一個狀態即可。那是不是說資歷裡面就必須要維護一個狀態呢?答案是否定的。AbstractQueuedSynchronizer這個類已經提供了getState(),setState()以及compareAndSetState()來讓子類控制這個表示sync的state。如下:

   protected final int getState() {
        return state;
   }
   protected final void setState(int newState) {
        state = newState;
   }
   protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
   }

至於為什麼不直接使用AtomicInteger,或者直接繼承自AtomicInteger,而是自己在內部實現,原因如下:

Setup to support compareAndSet. We need to natively implement this here: For the sake of permitting future enhancements, we cannot explicitly subclass AtomicInteger, which would be efficient and useful otherwise. So, as the lesser of evils, we natively implement using hotspot intrinsics API. And while we are at it, we do the same for other CASable fields (which could otherwise be done with atomic field updaters).

Mutex.lock()

互斥量: 瞭解了AQS框架的Lock部分,我們可以試著利用AQS寫一個互斥量。

Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class.

根據AQS類的推薦,我們需要用靜態內部類來繼承,在JDK中的大部分鎖都遵循了這一模式:

class Mutex implements Lock, java.io.Serializable {
  private static class Sync extends AbstractQueuedSynchronizer {
    // Acquire the lock if state is zero
    public boolean tryAcquire(int acquires) {
      assert acquires == 1; // Otherwise unused
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    // some other methods
  }

  // The sync object does all the hard work. We just forward to it.
  private final Sync sync = new Sync();

  public void lock()                { sync.acquire(1); }
  public boolean tryLock()          { return sync.tryAcquire(1); }
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
      throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
  // some other methods
}

上述代碼中,最核心的就是tryAcquire方法,通過調用父類的compareAndSetState來改變狀態,然後把獨占線程設置為當前線程。根據上文中描述的父類中的運行模式,我們一個Lock.lock()的語義也就已經完成了。

lock.lock(timeout)這個語義其實類似,但是實際操作起來有更多的細節需要處理。底層使用unsafe.part(timeout)來處理。

AbstractQueuedSynchronizer.release()

同步器釋放:與lock操作相對應的就是unlock,對應到同步器來說就是release()。我們首先看AbstractQueuedSynchronizerrelease()方法是如何實現的:

public final boolean release(int arg) {
      if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
      }
      return false;
}

與前文中java代碼的模式類似,tryRelease調用子類恢復狀態,如果恢覆成功,則獲取鏈表頭部,然後調用unsafe.unpark()方法喚醒線程。

我們再看Mutex中應該怎麼實現tryRelease函數:

protected boolean tryRelease(int releases) {
    assert releases == 1; // Otherwise unused
    if (getState() == 0) throw new IllegalMonitorStateException();
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

這裡只是個樣例,我們只是完成語義,還有很多細節並沒有確定,例如這裡假設tryRelease函數一定只有acquire成功的線程才會調用。所以請勿直接使用於生產環境。

小結

看完上述內容之後,我們對AQS的大致流程有了一定的瞭解,也通過其編寫了一個Mutex。在AQS中仍然有許多值得學習的細節。特別是讀者應該自行瞭解一下內部的隊列結構。因為在之後的Condition中也會有涉及到。

在之後的章節中,我們會陸續介紹ReentrantLock,Condition, ReentrantReadWriteLock,CountDownLatch,Semaphore的實現。

最後,下麵是完整的Mutex示例。感謝閱讀!

class Mutex implements Lock, java.io.Serializable {

  // Our internal helper class
  private static class Sync extends AbstractQueuedSynchronizer {
    // Report whether in locked state
    protected boolean isHeldExclusively() {
      return getState() == 1;
    }

    // Acquire the lock if state is zero
    public boolean tryAcquire(int acquires) {
      assert acquires == 1; // Otherwise unused
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }

    // Release the lock by setting state to zero
    protected boolean tryRelease(int releases) {
      assert releases == 1; // Otherwise unused
      if (getState() == 0) throw new IllegalMonitorStateException();
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }

    // Provide a Condition
    Condition newCondition() { return new ConditionObject(); }

    // Deserialize properly
    private void readObject(ObjectInputStream s)
        throws IOException, ClassNotFoundException {
      s.defaultReadObject();
      setState(0); // reset to unlocked state
    }
  }

  // The sync object does all the hard work. We just forward to it.
  private final Sync sync = new Sync();

  public void lock()                { sync.acquire(1); }
  public boolean tryLock()          { return sync.tryAcquire(1); }
  public void unlock()              { sync.release(1); }
  public Condition newCondition()   { return sync.newCondition(); }
  public boolean isLocked()         { return sync.isHeldExclusively(); }
  public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
      throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • /** * @{#} Base64.java Create on Nov 5, 2008 7:19:56 PM * */package com.gren.remotecheck.util; import java.io.BufferedReader;import java.io.BufferedWr
  • unit Unit1; interface uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, StdCtrls, ComCtrls, ImgList; type TForm
  • Socket通道 上文講述了通道、文件通道,這篇文章來講述一下Socket通道,Socket通道與文件通道有著不一樣的特征,分三點說: 1、NIO的Socket通道類可以運行於非阻塞模式並且是可選擇的,這兩個性能可以激活大程式(如網路伺服器和中間件組件)巨大的可伸縮性和靈活性,因此,再也沒有為每個S
  • 集合: 1、作用: 保存多條數據。 嘮叨:同樣是保存數據,其保存的內容不限,長度不限。 2、集合間的相互關係: Collection--Set —HashSet --List—ArrayList —LinkedList Map—HashMap 集合在底層實現時:依然使用數組,但是性能優於數組。 一、
  • 1 #include<stdio.h> 2 #include<stdlib.h> 3 4 //typedef 80 MAXSIZE; 5 #define MAXSIZE 20 6 7 typedef struct Node{ 8 int data; 9 int cursor; 10 }Node,St
  • 1 #include<stdio.h> 2 #include<stdlib.h> 3 #include<malloc.h> 4 5 typedef int Elemtype; 6 7 typedef struct Node{ 8 int data; 9 struct Node* next; 10 }
  • 1 #include<stdio.h> 2 #include<stdlib.h> 3 4 typedef int Elemtype; 5 #define MAXSIZE 20 6 7 typedef struct List{ 8 Elemtype data[MAXSIZE]; 9 int lengt
  • 估計他們上完課就再也不玩了,自己那段時間上完課。也就基本上很少來了,主要是 沒有什麼記錄的習慣,奇怪的是,每每到了心情不好的時候,總會想要寫點什麼。 不管怎麼樣還是出去花錢學習了一下,這段經歷。嗯,很難評價,事實上如果不留下一筆,那麼的確沒有什麼學習的意義。 所以,李飛要一點兒一點兒扳過來。 出去學
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...