顯式鎖和AQS 一、顯式鎖 Synchronized 關鍵字結合對象的監視器,JVM 為我們提供了一種『內置鎖』的語義,這種鎖很簡便,不需要我們關心加鎖和釋放鎖的過程,我們只需要告訴虛擬機哪些代碼塊需要加鎖即可,其他的細節會由編譯器和虛擬機自己實現。 可以將我們的『內置鎖』理解為是 JVM ...
顯式鎖和AQS
一、顯式鎖
Synchronized 關鍵字結合對象的監視器,JVM 為我們提供了一種『內置鎖』的語義,這種鎖很簡便,不需要我們關心加鎖和釋放鎖的過程,我們只需要告訴虛擬機哪些代碼塊需要加鎖即可,其他的細節會由編譯器和虛擬機自己實現。
可以將我們的『內置鎖』理解為是 JVM 的一種內置特性, 所以一個很顯著的問題就是,它不支持某些高級功能的定製,比如說,我想要這個鎖支持公平競爭,我想要根據不同的條件將線程阻塞在不同的隊列上,我想要支持定時競爭鎖,超時返回,我還想讓被阻塞的線程能夠響應中斷請求等等。
這些特殊的需求是『內置鎖』滿足不了的,所以在 JDK 層面又引入了『顯式鎖』的概念,不再由 JVM 來負責加鎖和釋放鎖,這兩個動作釋放給我們程式來做,程式層面難免複雜了些,但鎖靈活性提高了,可以支持更多定製功能,但要求你對鎖具有更深層次的理解。
【1】Lock 顯式鎖
Lock 介面位於 java.util.concurrent.locks 包下,基本定義如下:
public interface Lock {
//獲取鎖,失敗則阻塞
void lock();
//響應中斷式獲取鎖
void lockInterruptibly()
//嘗試一次獲取鎖,成功返回true,失敗返回false,不會阻塞
boolean tryLock();
//定時嘗試
boolean tryLock(long time, TimeUnit unit)
//釋放鎖
void unlock();
//創建一個條件隊列
Condition newCondition();
}
顯式鎖的實現類主要有三個,ReentrantLock 是其最主要的實現類,ReadLock 和 WriteLock 是 ReentrantReadWriteLock 內部定義的兩個內部類,他們繼承自 Lock 並實現了其定義的所有方法,精細化讀寫分離。而 ReentrantReadWriteLock 向外提供讀鎖寫鎖。
【2】ReentrantLock(可重入鎖)
ReentrantLock 作為 Lock 顯式鎖的最基本實現,也是使用最頻繁的一個鎖實現類。可重入就是可以重新獲取這個鎖,例如遞歸操作。
它提供了兩個構造函數,用於支持公平競爭鎖。
public ReentrantLock()
//參數fair為是否支持公平競爭鎖
public ReentrantLock(boolean fair)
公平鎖和非公平鎖的區別之處在於,公平鎖在選擇下一個占有鎖的線程時,參考先到先得原則,等待時間越長的線程將具有更高的優先順序。而非公平鎖則無視這種原則。
那麼假設這麼一種情況,A 獲得鎖正在運行,B 嘗試獲得鎖失敗被阻塞,此時 C 也嘗試獲得鎖,失敗而阻塞,雖然 C 只需要很短運行時間,它依然需要等待 B 執行結束才有機會獲得鎖來運行。
非公平鎖的前提下,A 執行結束,找到隊列首部的 B 線程,開始上下文切換,假如此時的 C 過來競爭鎖,非公平策略前提下,C 是可以獲得鎖的,並假設它迅速的執行結束了,當 B 線程被切換回來之後再去獲取鎖也不會有什麼問題,結果是,C 線程在 B 線程的上下文切換過程中執行結束。顯然,非公平策略下 CPU 的吞吐量是提高的。
但是,非公平策略的鎖可能會造成某些線程饑餓,始終得不到運行,各有利弊,適時取捨。
【3】ReadWriteLock(讀寫鎖)
① ReadWriteLock同Lock一樣也是一個介面,提供了readLock和writeLock兩種鎖的操作機制,一個是只讀的鎖,一個是寫鎖。
讀鎖可以在沒有寫鎖的時候被多個線程同時持有,寫鎖是獨占的(排他的)。 每次只能有一個寫線程,但是可以有多個線程併發地讀數據。
所有讀寫鎖的實現必須確保寫操作對讀操作的記憶體影響。換句話說,一個獲得了讀鎖的線程必須能看到前一個釋放的寫鎖所更新的內容。
理論上,讀寫鎖比互斥鎖允許對於共用數據更大程度的併發。與互斥鎖相比,讀寫鎖是否能夠提高性能取決於讀寫數據的頻率、讀取和寫入操作的持續時間、以及讀線程和寫線程之間的競爭。
② 互斥原則:
- 讀-讀能共存,
- 讀-寫不能共存,
- 寫-寫不能共存。
③ReentrantReadWriteLock
ReentrantReadWriteLock為ReadWriteLock的實現類
這個鎖允許讀線程和寫線程以ReentrantLock的語法重新獲取讀寫鎖。在寫入線程保持的所有寫入鎖被釋放之前,不允許不可重入的讀線程。
另外,寫鎖(寫線程)可以獲取讀鎖,但是不允許讀鎖(讀線程)獲取寫鎖。在其他應用程式中,當對在讀鎖下執行讀取的方法或回調期間保持寫鎖時,可重入性可能非常有用。
實例代碼如下:
public class TestReadWriteLock {
public static void main(String[] args){
final ReadWriteLockDemo rwd = new ReadWriteLockDemo();
//啟動100個讀線程
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
rwd.get();
}
}).start();
}
//寫線程
new Thread(new Runnable() {
@Override
public void run() {
rwd.set((int)(Math.random()*101));
}
},"Write").start();
}
}
class ReadWriteLockDemo{
//模擬共用資源--Number
private int number = 0;
// 實際實現類--ReentrantReadWriteLock,預設非公平模式
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//讀
public void get(){
//使用讀鎖
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" : "+number);
}finally {
readWriteLock.readLock().unlock();
}
}
//寫
public void set(int number){
readWriteLock.writeLock().lock();
try {
this.number = number;
System.out.println(Thread.currentThread().getName()+" : "+number);
}finally {
readWriteLock.writeLock().unlock();
}
}
}
/**
Thread-50 : 0
Thread-19 : 0
Thread-54 : 0
Thread-57 : 0
Thread-31 : 0
Write : 40
Thread-61 : 40
Thread-62 : 40
Thread-35 : 40
Thread-32 : 40
*/
首先啟動讀線程,此時number為0;然後某個時刻寫線程修改了共用資源number數據,讀線程再次讀取最新值
二、AQS深入分析
【1】什麼是AQS
AQS是AbustactQueuedSynchronizer的簡稱,它是一個Java提供的底層同步工具類,用一個int類型的變數表示同步狀態(state),並提供了一系列的CAS操作來管理這個同步狀態。AQS的主要作用是為Java中的併發同步組件提供統一的底層支持,例如ReentrantLock,CountdowLatch就是基於AQS實現的,用法是通過繼承AQS實現其模版方法,然後將子類作為同步組件的內部類。
【2】同步隊列
同步隊列是AQS很重要的組成部分,它是一個雙端隊列,遵循FIFO原則,主要作用是用來存放在鎖上阻塞的線程,當一個線程嘗試獲取鎖時,如果已經被占用,那麼當前線程就會被構造成一個Node節點加入到同步隊列的尾部,隊列的頭節點是成功獲取鎖的節點,當頭節點線程釋放鎖時,會喚醒後面的節點並釋放當前頭節點的引用。
【3】state狀態
AbstractQueuedSynchronizer維護了一個volatile int類型的變數,用戶表示當前同步狀態。volatile雖然不能保證操作的原子性,但是保證了當前變數state的可見性。至於volatile的具體語義,可以參考我的相關文章。state的訪問方式有三種:
- getState()
- setState()
- compareAndSetState()
這三種叫做均是原子操作,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法。代碼實現如下:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
【4】資源的共用方式
AQS定義兩種資源共用方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share(共用,多個線程可同時執行,如Semaphore/CountDownLatch)。
不同的自定義同步器爭用共用資源的方式也不同。自定義同步器在實現時只需要實現共用資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共用方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int):共用方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。
【5】獲取鎖和釋放鎖的流程
下麵基於獨占鎖講解在AQS獲取鎖和釋放鎖的流程:
獲取:
- 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- 沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記為獨占模式;
- acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
- 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。
源碼解析:
acquire是一種以獨占方式獲取資源,如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨占模式下線程獲取共用資源的頂層入口。獲取到資源後,線程就可以去執行其臨界區代碼了。下麵是acquire()的源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire嘗試以獨占的方式獲取資源,如果獲取成功,則直接返回true,否則直接返回false。該方法可以用於實現Lock中的tryLock()方法。該方法的預設實現是拋出UnsupportedOperationException
,具體實現由自定義的擴展了AQS的同步類來實現。AQS在這裡只負責定義了一個公共的方法框架。這裡之所以沒有定義成abstract,是因為獨占模式下只用實現tryAcquire-tryRelease,而共用模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實現另一模式下的介面。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
addWaiter該方法用於將當前線程根據不同的模式(Node.EXCLUSIVE
互斥模式、Node.SHARED
共用模式)加入到等待隊列的隊尾,並返回當前線程所在的結點。如果隊列不為空,則以通過compareAndSetTail
方法以CAS的方式將當前線程節點加入到等待隊列的末尾。否則,通過enq(node)方法初始化一個等待隊列,並返回當前節點。源碼如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
acquireQueued用於隊列中的線程自旋地以獨占且不可中斷的方式獲取同步狀態(acquire),直到拿到鎖之後再返回。該方法的實現分成兩部分:如果當前節點已經成為頭結點,嘗試獲取鎖(tryAcquire)成功,然後返回;否則檢查當前節點是否應該被park,然後將該線程park並且檢查當前線程是否被可以被中斷。
final boolean acquireQueued(final Node node, int arg) {
//標記是否成功拿到資源,預設false
boolean failed = true;
try {
boolean interrupted = false;//標記等待過程中是否被中斷過
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
釋放:
release方法是獨占模式下線程釋放共用資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。這也正是unlock()的語義,當然不僅僅只限於unlock()。下麵是release()的源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
與acquire()方法中的tryAcquire()類似,tryRelease()方法也是需要獨占模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那麼它肯定已經拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要註意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。
unparkSuccessor(Node)
方法用於喚醒等待隊列中下一個線程。這裡要註意的是,下一個線程並不一定是當前節點的next節點,而是下一個可以用來喚醒的線程,如果這個節點存在,調用unpark()
方法喚醒。
總之,release()是獨占模式下線程釋放共用資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。
三、自定義獨占鎖
public class Mutex implements Lock {
// 靜態內部類,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否處於占用狀態
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 當狀態為0的時候獲取鎖
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 釋放鎖,將狀態設置為0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new
IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一個Condition,每個condition都包含了一個condition隊列
Condition newCondition() {
return new ConditionObject();
}
}
// 僅需要將操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}