本文是一個jdk.locks系列主題的頭篇,總體介紹JDK中Lock底層框架以及JDK中藉助該框架實現的各種同步手段。瞭解JDK基本的併發與同步的實現,對java併發編程更得心應手!
AbstractQueuedSynchronizer
@(Base)[JDK, locks, ReentrantLock, AbstractQueuedSynchronizer, AQS]
轉載請寫明:原文地址
Synchronize 最開始JDK只支持synchronize
關鍵字來提供方法級別、同步塊級別的同步。後續版本中提供了java.utils.concurrent.locks
包,其中包括可重入鎖,讀寫鎖,信號量,Condition等等,都是基於一個基本的等待隊列抽象完成的,在JDK的文檔中,這個抽象的隊列框架被稱為AQS
同步框架。
我們需要關註的類包括:
AbstractOwnableSynchronizer
,AbstractQueuedSynchronizer
其餘的包括,可重入鎖,讀寫鎖,Condition,信號量都是通過上述兩個抽象基類實現。
整個concurrent包源自於JSR-166,其作者就是大名鼎鼎的Doug Lea,說他是這個世界上對Java影響力最大的個人,一點也不為過。因為兩次Java歷史上的大變革,他都間接或直接的扮演了舉足輕重的角色。一次是由JDK 1.1到JDK 1.2,JDK1.2很重要的一項新創舉就是Collections,其Collections的概念可以說承襲自Doug Lea於1995年發佈的第一個被廣泛應用的collections;一次是2004年所推出的Tiger。Tiger廣納了15項JSRs(Java Specification Requests)的語法及標準,其中一項便是JSR-166。
What is Lock?
Lock 我們首先看wikipedia的定義:
In computer science, a lock or mutex (from mutual exclusion) is a synchronization mechanism for enforcing limits on access to a resource in an environment where there are many threads of execution. A lock is designed to enforce a mutual exclusion concurrency control policy.
在電腦科學中,鎖或者互斥量是一種同步機制,用於限制共用資源的在多線程環境下訪問。其次值得註意的是,不論是任何併發模型,其都是硬體的併發模型的抽象(進程線程模型),其只能更大程度的避免過度同步,以及提供了合理的抽象讓你更容易在多線程環境下編程。而非使用了某種併發模型之後,你的程式就Lock-Free了。你看到的只是在語言層次的抽象上是無鎖的,例如Actor-Model。
然後我們從使用的角度來看,一個鎖對象需要具備什麼功能。我們首先定義一個鎖對象,叫做XLock
(這是生造的一個概念)。如下代碼大家應該經常看到。
private XLock lock = new Xlock();
public void someMethod() {
lock.lock();
try {
// do something
} finally {
lock.unlock();
}
}
我們從表現形式上來看,lock.lock()
試著獲取鎖,如果有人占用的話就“等著”,直到獲取的人釋放鎖。lock.unlock()
寫在final塊中,意味著,無論是否異常,都需要釋放鎖。
其實,這個裡面唯一神秘的概念就是“等著”。什麼叫等著呢?如何實現等著呢?不明白的同學可能以為什麼JVM會調度什麼的。其實在電腦中,根本就沒有任何神奇的事情。下麵我們來解釋如何“等著”。
首先回一下操作系統的mutex是如何實現的:
lock:
if(mutex > 0){
mutex = 0;
return 0;
} else
掛起等待;
goto lock;
unlock:
mutex = 1;
喚醒等待Mutex的線程;
return 0;
上述代碼有一個小問題。例如lock
的if
可能兩個線程都同時進入,如果兩個線程同時調用lock,這時Mutex是1,兩個線程都判斷mutex>0成立,然後其中一個線程置mutex=0,而另一個線程並不知道這一情況,也置mutex=0,於是兩個線程都以為自己獲得了鎖。
這裡就需要註意的是,為了實現一個原子操作,必須要在CPU指令級別上支持才可以。在程式員看來就是需要一個原子的彙編指令。最早是交換記憶體地址和寄存器/記憶體與記憶體/寄存器與寄存器,這裡稱作swap,例如x86的xchgb指令,相當於抽象成setAndget()指令,使用方法如下代碼:
lock:
movb $0, %al
xchgb %al, mutex
if(al寄存器的內容 > 0){
return 0;
} else
掛起等待;
goto lock;
unlock:
movb $1, mutex
喚醒等待Mutex的線程;
return 0;
另外值得註意的就是如何掛起和喚醒線程。在C語言中,我們可以直接調用系統API操作某個線程的狀態,在內核中PC執行到掛起指令之後,就保存線程堆棧,直接切換。在調用系統API之前,我們需要記錄已經掛起了哪些線程,也就是需要一個等待隊列記錄當前有多少線程掛起在這個lock上。當持有lock的線程執行完畢之後,通知等待隊列的第一個線程。問題的關鍵在於如何通知,當然還是系統API,直接把線程的狀態設置為可以執行即可,等待下一次線程調度的時候,該線程就會繼續執行。
在上述代碼中值得註意的是
goto lock
這個語句。
根據上述說明,我們簡單翻譯成java的實現如下:
private AtomicBoolean on = new AtomicBoolean(false);
private Queue waiting = new Queue();
public void lock() {
while (true) {
if (on.compareAndSet(false, true)) { // get lock succeed
return;
}
waiting.add(Thread.currentThread()).
unsafe.park(Thread.currentThread()).
}
}
在上述代碼中我們有三個比較特殊的點:
- 原子變數的操作
- 等待隊列
- unsafe對象的park方法。
AtomicBoolean 底層實現也是需要CPU支持,是一個CPS操作。用來解決上述C偽碼中的只能允許一個線程進入臨界區問題,參考上述的xchgb操作。
Waiting Queue 我們通過一個普通的數組或者鏈表即可實現。
Unsafe Park Unsafe這個類是用於JDK類庫使用的一個JNI調用。讀者可以暫時理解成系統調用的一種抽象。park()
方法會立即掛起某個線程,PC(程式計數器)保留在原來的位置。
當我們看完上述內容之後,我們明白了幾個實現鎖的重要步驟:第一,我們需要一個state來描述鎖的狀態(參考上述的原子變數)。第二,我們需要一個隊列的數據結構。好了,到了這裡有了基本的概念之後,我們再看在Java中各種鎖是怎麼實現的。
AbstractOwnableSynchronizer
可獨占的同步器
A synchronizer that may be exclusively owned by a thread. This class provides a basis for creating locks and related synchronizers that may entail a notion of ownership. The
AbstractOwnableSynchronizer
class itself does not manage or use this information. However, subclasses and tools may use appropriately maintained values to help control and monitor access and provide diagnostics.
這個類作用就是提供了一種“獨占”模式的基本實現,其代碼非常簡單:
public abstract class AbstractOwnableSynchronizer {
/** The current owner of exclusive mode synchronization. */
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
通過一個私有域來保存當前占用這個“同步器”的線程(可認為是一個標誌位),並且提供了get,set方法來改變這個獨占的標誌位。這裡並沒有使用任何同步的手段,因為只是個基類啦。
transient
關鍵字是對java的serialization機制起作用的,意義是不要序列化這個欄位。這裡我們可以忽略,因為我也暫時想不到任何場景需要serialize一個同步器的場景。
獨占的含義這裡暫時不做解釋,在之後的讀寫鎖部分會詳細說明。理解成mysql中的X鎖和S鎖就行了。
AbstractQueuedSynchronizer.acquire()
隊列同步器框架 這個類就是所有的鎖的精華所在。為了避免大家在閱讀過程中的這個抽象類提供了5個方法需要子類繼承,即可完成某種意義上的鎖,如下:
tryAcquire();
tryRelease();
tryAcquireShared(); // 暫時不用關註
tryReleaseShared(); // 暫時不用關註
isHeldExclusively(); //暫時不用關註
看到這裡可能一頭霧水,其實在AbstractQueuedSynchronizer中已經規定出來了獲取鎖的步驟,和提供了基本的等待隊列的實現。我們只需要實現每一個方法,即可完成一個同步器。
首先我們還是從,上文中提到的Lock.lock()
語義的方法對應的就是AbstractQueuedSynchronizer的acquire(int)
函數開始入手:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
翻譯一下上述的代碼:
- 調用子類的tryAcquire方法,如果返回成功就說明獲取鎖成功
- 如果返回失敗,則入隊列。
interrupt的處理我們暫時忽略,這是java併發編程中的另外一個要點
其實對照我們上文中給出的java的lock的範例,tryAcquire函數
其實就是compareAndSet,而acquireQueued()
其實就是入隊並且掛起線程的操作。
所以說如果一個子類想要實現,tryAcquire裡面應該是原子的改變一個狀態即可。那是不是說資歷裡面就必須要維護一個狀態呢?答案是否定的。AbstractQueuedSynchronizer這個類已經提供了getState()
,setState()
以及compareAndSetState()
來讓子類控制這個表示sync的state。如下:
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
至於為什麼不直接使用AtomicInteger,或者直接繼承自AtomicInteger,而是自己在內部實現,原因如下:
Setup to support compareAndSet. We need to natively implement this here: For the sake of permitting future enhancements, we cannot explicitly subclass AtomicInteger, which would be efficient and useful otherwise. So, as the lesser of evils, we natively implement using hotspot intrinsics API. And while we are at it, we do the same for other CASable fields (which could otherwise be done with atomic field updaters).
Mutex.lock()
互斥量: 瞭解了AQS框架的Lock部分,我們可以試著利用AQS寫一個互斥量。
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class.
根據AQS類的推薦,我們需要用靜態內部類來繼承,在JDK中的大部分鎖都遵循了這一模式:
class Mutex implements Lock, java.io.Serializable {
private static class Sync extends AbstractQueuedSynchronizer {
// Acquire the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// some other methods
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// some other methods
}
上述代碼中,最核心的就是tryAcquire
方法,通過調用父類的compareAndSetState來改變狀態,然後把獨占線程設置為當前線程。根據上文中描述的父類中的運行模式,我們一個Lock.lock()
的語義也就已經完成了。
lock.lock(timeout)
這個語義其實類似,但是實際操作起來有更多的細節需要處理。底層使用unsafe.part(timeout)
來處理。
AbstractQueuedSynchronizer.release()
同步器釋放:與lock操作相對應的就是unlock,對應到同步器來說就是release()。我們首先看AbstractQueuedSynchronizer
的release()
方法是如何實現的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
與前文中java代碼的模式類似,tryRelease調用子類恢復狀態,如果恢覆成功,則獲取鏈表頭部,然後調用unsafe.unpark()
方法喚醒線程。
我們再看Mutex中應該怎麼實現tryRelease函數:
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
這裡只是個樣例,我們只是完成語義,還有很多細節並沒有確定,例如這裡假設tryRelease函數一定只有acquire成功的線程才會調用。所以請勿直接使用於生產環境。
小結
看完上述內容之後,我們對AQS的大致流程有了一定的瞭解,也通過其編寫了一個Mutex。在AQS中仍然有許多值得學習的細節。特別是讀者應該自行瞭解一下內部的隊列結構。因為在之後的Condition中也會有涉及到。
在之後的章節中,我們會陸續介紹ReentrantLock,Condition, ReentrantReadWriteLock,CountDownLatch,Semaphore的實現。
最後,下麵是完整的Mutex示例。感謝閱讀!
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Report whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquire the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Release the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provide a Condition
Condition newCondition() { return new ConditionObject(); }
// Deserialize properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}