本文內容主要總結自《Java併發編程的藝術》第5章——Java中的鎖。 一、AQS AbstractQueuedSynchronizer(簡稱AQS),隊列同步器,是用來構建鎖或者其他同步組建的基礎框架。該類主要包括: 1、模式,分為共用和獨占。 2、volatile int state,用來表示鎖 ...
本文內容主要總結自《Java併發編程的藝術》第5章——Java中的鎖。
一、AQS
AbstractQueuedSynchronizer(簡稱AQS),隊列同步器,是用來構建鎖或者其他同步組建的基礎框架。該類主要包括:
1、模式,分為共用和獨占。
2、volatile int state,用來表示鎖的狀態。
3、FIFO雙向隊列,用來維護等待獲取鎖的線程。
AQS部分代碼及說明如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { /** 共用模式,表示可以多個線程獲取鎖,比如讀寫鎖中的讀鎖 */ static final Node SHARED = new Node(); /** 獨占模式,表示同一時刻只能一個線程獲取鎖,比如讀寫鎖中的寫鎖 */ static final Node EXCLUSIVE = null; volatile Node prev; volatile Node next; volatile Thread thread; } /** AQS類內部維護一個FIFO的雙向隊列,負責同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等 構造成一個節點Node並加入同步隊列;當同步狀態釋放時,會把首節點中線程喚醒,使其再次嘗試同步狀態 */ private transient volatile Node head; private transient volatile Node tail; /** 狀態,主要用來確定lock是否已經被占用;在ReentrantLock中,state=0表示鎖空閑,>0表示鎖已被占用;可以自定義,改寫tryAcquire(int acquires)等方法即可 */ private volatile int state; }
這裡主要說明下雙向隊列,通過查看源碼分析,隊列是這個樣子的:
head -> node1 -> node2 -> node3(tail)
註意:head初始時是一個空節點(所謂的空節點意思是節點中沒有具體的線程信息),之後表示的是獲取了鎖的節點。因此實際上head->next(即node1)才是同步隊列中第一個可用節點。
AQS的設計基於模版方法模式,使用者通過繼承AQS類並重寫指定的方法,可以實現不同功能的鎖。可重寫的方法主要包括:
二、通過ReentrantLock學習AQS的使用
1、公平鎖的獲取
/** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * 首先嘗試獲取鎖,如果tryAcquire(arg)返回true,獲取鎖成功; * 如果失敗,則調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),將當前線程封裝成Node節點加入到同步隊列隊尾,之後阻塞當前線程 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * 獲取state的值,如果等於0表示鎖空閑,可以嘗試獲取; * 查看當前線程是否是FIFO隊列中的第一個可用節點,如果是第一個,則嘗試通過CAS方式獲取鎖, 這保證了等待時間最長的必定先獲取鎖 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); /** * 如果發現當前節點的前一個節點為head,那麼嘗試獲取鎖,成功之後刪除head節點並將自己設置為head,退出迴圈; * 如果當前節點為阻塞狀態,需要unpark()喚醒,release()方法會執行喚醒操作 */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } /** * 為了避免無意義的自旋,同步隊列中的線程會通過park(this)方法用於阻塞當前線程 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } }
2、公平鎖的釋放
更新狀態值state,之後喚醒同步隊列中的第一個等待節點,unparkSuccessor(Node node)。
三、公平鎖和非公平鎖
ReentrantLock預設的鎖為非公平鎖,其主要原因在於:與公平鎖相比,可以避免大量的線程切換,極大的提高性能。
先看一個非公平鎖的例子:
public class AQS2 { private ReentrantLock lock = new ReentrantLock(false); private Thread[] threads = new Thread[3]; public AQS2() { for (int i = 0; i < 3 ; i++) { threads[i] = new Thread(new Runnable() { public void run() { for (int i = 0; i < 2; i++) { try { lock.lock(); Thread.sleep(100); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }); } } public void startThreads() { for (Thread thread : threads) { thread.start(); } } public static void main(String[] args) { AQS2 aqs2 = new AQS2(); aqs2.startThreads(); } }
運行結果為:
這段代碼(每個線程2次獲取鎖/釋放鎖)的運行結果我一開始沒有想清楚,之前我是這麼想的:
Thread0先獲取鎖,之後sleep 100ms,那麼等待獲取鎖的同步隊列為:
head -> thread1 -> thread2 -> thread0 -> thread1 -> thread2。
從運行結果可知,第二次獲取鎖的還是thread0,但是鎖的釋放release(int args)卻總是從同步隊列的第一個可用節點開始,那就把thread1從隊列中移除了,邏輯明顯不對了。
後來重新看了代碼,比較了非公平鎖和公平鎖之間的不同時,才終於明白。
非公平鎖獲取鎖最大的不一樣的地方在於:線程可以無視sync同步隊列插隊!一旦插隊成功,獲得了鎖,那麼該線程當然也就不用在排隊了。所以以上程式的同步隊列應該為:
head -> thread1 -> thread2。
非公平鎖源代碼主要的不同點有2點:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; //不同點1 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { //不同點2 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
thread0第一次釋放鎖之後,會立刻通過lock.lock()操作繼續嘗試獲取鎖。非公平鎖的lock()方法會直接嘗試獲取鎖,無視同步隊列,因此很大概率會再次獲得鎖;如果失敗了,那麼執行nonfairTryAcquire(int acquires)方法,該方法和tryAcquire(int acquires)最大的不同在於,缺少了hasQueuedPredecessors()的判斷,即不需要判斷當前線程是否是同步隊列的第一個可用節點,甚至也不需要判斷當前線程是否在同步隊列中,直接嘗試獲取鎖即可。
四、ReentrantReadWriteLnock
理解了AQS的原理後,讀寫鎖也就不難理解了。讀寫鎖分為2個鎖,讀鎖和寫鎖。讀鎖在同一時刻允許多個線程訪問,通過改寫int tryAcquireShared(int arg)以及boolean tryReleaseShared(int arg)方法即可;寫鎖為獨占鎖,通過改寫boolean tryAcquire(int arg)以及boolean tryRelease(int arg)方法即可。
由於AQS中只提供了一個int state來表示鎖的狀態,那麼如何表示讀和寫2個鎖呢?解決辦法是前16位表示讀鎖,後16位表示寫鎖。由於鎖的狀態只有16位,因此無論是對於讀鎖或者是寫鎖,其state最大值均為65525,即所有獲得了鎖的線程的拿到鎖的總次數(由於是重進入鎖,因此每個線程可以拿到n個鎖)不超過65536。由於讀寫鎖主要的應用場景為多讀少寫,所以如果感覺讀鎖的65525不夠用,可以自己改寫讀寫鎖即可,比如分配int state的前24位為讀鎖,後8位為寫鎖。
讀寫鎖還提供了一些新的方法,比如final int getReadHoldCount(),返回當前線程獲取讀鎖的次數。由於讀狀態保存的是所有獲取讀鎖的線程讀鎖次數的總和,因此每個線程自己的讀鎖次數需要單獨保存,引入了ThreadLocal,由線程自身維護。