基於AQS的前世今生,來學習併發工具類ReentrantReadWriteLock。本文將從ReentrantReadWriteLock的產生背景、源碼原理解析和應用來學習這個併發工具類。 1、 產生背景 前面我們學習的重入鎖ReentrantLock本質上還是互斥鎖,每次最多只能有一個線程持有Re ...
基於AQS的前世今生,來學習併發工具類ReentrantReadWriteLock。本文將從ReentrantReadWriteLock的產生背景、源碼原理解析和應用來學習這個併發工具類。
1、 產生背景
前面我們學習的重入鎖ReentrantLock本質上還是互斥鎖,每次最多只能有一個線程持有ReentrantLock。對於維護數據完整性來說,互斥通常是一種過於強硬的規則,因此也就不必要的限制了併發性。互斥是一種保守的加鎖策略,雖然可以避免“寫/寫”衝突和“寫/讀”衝突,但也同樣避免了“讀/讀”衝突。和互聯網的“二八法則”一樣,大部分數據都是讀數據,可以存放在緩存中,數據結構的操作其實很多也是讀操作,可以考慮適當的放寬加鎖需求,允許多個讀操作線程同時訪問數據結構以提升程式的性能。在這樣的需求背景下,就產生了讀寫鎖ReadWriteLock,一個資源可以同時被多個讀操作訪問,或者被一個寫操作訪問,但是不能讀寫操作同時訪問。ReadWriteLock定義了介面規範,實際實現讀寫鎖控制的類是ReentrantReadWriteLock,該類為讀寫鎖提供了可重入的加鎖語義。
2、 源碼原理解析
2.1 讀寫鎖原理
既然是讀寫鎖,那就是有兩把鎖,可以用AQS的同步狀態表示其中的一把鎖,再引入一個新的屬性表示另外一把鎖,但是這麼做就變成了二元併發安全問題,使問題變得更加複雜。ReentrantReadWriteLock選擇了用一個屬性,即AQS的同步狀態來表示讀寫鎖,怎樣用一個屬性來表示讀寫鎖呢?那就是位運算,對位運算不熟悉的可以先看下此文。
ReentantReadWriteLock採用“按位切割”的方式,就是將這個32位的int型state變數分為高16位和低16位來使用,高16位代表讀狀態,低16位代表寫狀態。讀鎖是可以共用的,而寫鎖是互斥的,對於寫鎖而言,用低16位表示線程的重入次數,但是讀鎖因為可以同時有多個線程,所以重入次數需要通過其他的方式來記錄,那就是ThreadLocal變數。從這也可以總結出來和ReentrantLock相比,寫鎖的重入次數會減少,最多不能超過65535次。讀鎖的線程數也有限制,最對不能超過65535個。
假設狀態變數是c,則讀狀態就是c>>>16(無符號右移16位),其實就是通過無符號右移運算抹掉低的16位,剩下的就是c的高16位。寫狀態是c&((1 << 16) - 1),其實就是c&00000000000000001111111111111111,與運算之後,高的16位被抹掉,剩下的就是c的低16位。如果讀線程申請讀鎖,當前寫鎖重入次數不為 0 時,則等待,否則可以馬上分配;如果是寫線程申請寫鎖,當前狀態為 0 則可以馬上分配,否則等待。
2.2 讀鎖的獲取和釋放
讀鎖的獲取方法如下:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread();//當前線程 int c = getState(); //持有寫鎖的線程可以獲取讀鎖 if (exclusiveCount(c) != 0 && //已經分配了寫鎖 getExclusiveOwnerThread() != current) //當前線程不是持有寫鎖的線程 return -1; int r = sharedCount(c); //讀鎖獲取次數 if (!readerShouldBlock() && //由子類根據公平策略實現決定是否可獲取讀鎖 r < MAX_COUNT && //讀鎖獲取次數小於最大值 compareAndSetState(c, c + SHARED_UNIT)) {//更新讀鎖狀態 if (r == 0) {//讀鎖的第一個線程 此時可以不用記錄到ThreadLocal firstReader = current; firstReaderHoldCount = 1; //避免查找ThreadLocal 提升效率 } else if (firstReader == current) {//讀鎖的第一個線程重入 firstReaderHoldCount++; } else {//非讀鎖的第一個線程 HoldCounter rh = cachedHoldCounter; //下麵為重入次數更新 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); //獲取讀鎖失敗 迴圈重試 } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) {//獲取到寫鎖 if (getExclusiveOwnerThread() != current) return -1; //非寫鎖線程獲取失敗 // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) //讀鎖數量達到最大 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) {//讀鎖獲取成功 處理方式和之前類似 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
讀鎖的釋放方法如下:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {//當前線程是讀鎖的第一個線程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) //第一次占有讀鎖 直接清除該線程 firstReader = null; else firstReaderHoldCount--;//讀鎖的第一個線程重入次數減少 } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove();//讀鎖釋放 if (count <= 0) throw unmatchedUnlockException(); } --rh.count; //重入次數減少 } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; //減少讀鎖的線程數量 if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
2.3 寫鎖的獲取和釋放
寫鎖的獲取方法如下:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c);//寫鎖狀態 if (c != 0) {//表示鎖已經被分配出去了 if c != 0 and w == 0表示獲取讀鎖 // (Note: if c != 0 and w == 0 then shared count != 0) //其他線程獲取到了寫鎖 if (w == 0 || current != getExclusiveOwnerThread()) return false; //寫鎖重入次數超過最大值 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire 更新寫鎖重入次數 setState(c + acquires); return true; } if (writerShouldBlock() ||//子類實現寫鎖是否公平獲取 !compareAndSetState(c, c + acquires)) return false;//cas獲取寫鎖失敗 setExclusiveOwnerThread(current);//獲取寫鎖成功 獨占 return true; }
寫鎖的釋放方法如下:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively())//當前線程不持有寫鎖 throw new IllegalMonitorStateException(); int nextc = getState() - releases; //重入次數減少 boolean free = exclusiveCount(nextc) == 0; //減少到0寫鎖釋放 if (free) setExclusiveOwnerThread(null); //寫鎖釋放 setState(nextc); return free; }
2.4 鎖降級
鎖降級指的是寫鎖降級為讀鎖,首先持有當前寫鎖,然後獲取到讀鎖,在tryAcquireShared方法中已經體現了該過程,隨後再釋放該寫鎖的過程。鎖降級主要是為了保持數據的可見性,如果當前線程不獲取讀鎖而是直接釋放寫鎖,假設此時有另外的線程獲取到了寫鎖並修改了數據,那麼當前線程是無法知曉數據已經更新了。如果當前線程遵循鎖降級的過程,則其他線程會被阻塞,直到當前線程操作完成其他線程才可以獲取寫鎖進行數據更新。RentrantReadWriteLock不支持鎖升級(把持讀鎖、獲取寫鎖,最後釋放讀鎖的過程)。目的也是保證數據可見性,如果讀鎖已被多個線程獲取,其中任意線程成功獲取了寫鎖並更新了數據,則其更新對其他獲取到讀鎖的線程是不可見的。
3、 應用
概況性的總結RentrantReadWriteLock的應用,就是ReentrantLock能使用的地方,RentrantReadWriteLock都能使用,而且能提供更好的吞吐率。
參考資料:
https://github.com/lingjiango/ConcurrentProgramPractice