Exchanger應該算併發包中工具使用相對少的,因為它主要用於線程之間交換數據,它的用法比較簡單在不同線程之間使用exchange方法交換數據,但是內部實現比較巧妙,使用了unsafe的CAS原子操作、自旋來解決衝突問題,下麵我們通過源碼一探究竟。 ...
前言
Exchanger應該算併發包中工具使用相對少的,因為它主要用於線程之間交換數據,它的用法比較簡單在不同線程之間使用exchange方法交換數據,但是內部實現比較巧妙,使用了unsafe的CAS原子操作、自旋來解決衝突問題,下麵我們通過源碼一探究竟。
源碼
先看看源碼註釋中關於核心演算法的介紹
for (;;) { if (slot is empty) { // slot為空時,將item 設置到Node 中 place item in a Node; if (can CAS slot from empty to node) { // 當將node通過CAS交換到slot中時,掛起線程等待被喚醒 wait for release; // 被喚醒後返回node中匹配到的item return matching item in node; } } else if (can CAS slot from node to empty) { // release // 將slot設置為空 // 獲取node中的item,將需要交換的數據設置到匹配的item get the item in node; set matching item in node; // 喚醒等待的線程 release waiting thread; } // else retry on CAS failure }
比如有2條線程A和B,A線程交換數據時,發現slot為空,則將需要交換的數據放在slot中等待其它線程進來交換數據,等線程B進來,讀取A設置的數據,然後設置線程B需要交換的數據,然後喚醒A線程,原理就是這麼簡單。當時當多個線程之間進行交換數據時就會出現問題,所以Exchanger加入了slot數組。
Exchanger 屬性及構造器
// 用於左移Node數組下標,從而得出數據在記憶體中的偏移量來獲取數據,避免偽共用 private static final int ASHIFT = 7; // note數組最大下標 private static final int MMASK = 0xff; // 用於遞增bound,每次加一個SEQ private static final int SEQ = MMASK + 1; // CPU核心數 private static final int NCPU = Runtime.getRuntime().availableProcessors(); // 當前數組最大的下標(多處理器情況下) static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 自旋次數,CPU核心為1個時,自旋被禁用 private static final int SPINS = 1 << 10; // 空對象,用於當線程exchange方法中參數為null時傳遞給其他線程的對象 private static final Object NULL_ITEM = new Object(); // 用於超時時傳遞的對象 private static final Object TIMED_OUT = new Object(); // Participant 繼承了ThreadLocal,也就是說該對象用於獲取每條線程中存放的值 private final Participant participant; // 多個線程交換 private volatile Node[] arena; // 用於2個線程交換 private volatile Node slot; // 該值主要用於與 private volatile int bound; // 通過unsafe用於CAS操作 private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; // bound屬性在Exchanger對象中的偏移地址 BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); // slot屬性在Exchanger對象中的偏移地址 SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); // slot屬性在Node對象中的偏移地址 MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); // parkBlocker屬性在Thread對象中的偏移地址 BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); // 獲取Node[]數組中每個元素的大小,這裡是4 s = U.arrayIndexScale(ak); // ABASE absorbs padding in front of element 0 // 獲取Node[]數組中第一個元素的偏移地址 + 128 ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) // 這裡是為了保證 Node數組中的元素不會爭用一個緩存行 throw new Error("Unsupported array scale"); }
構造器及內部類
public Exchanger() { participant = new Participant(); } // 內部類,用於記錄每個線程的狀態 static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } } // 包含需要交換的數據等信息 // Contended為 JDK8 新增的註解,用於避免偽共用,提高程式性能 @sun.misc.Contended static final class Node { // arana數組中的下標 int index; // 上一次記錄的bound int bound; // cas操作失敗的次數 int collides; // 用於自旋的偽隨機數 int hash; // Pseudo-random for spins // 當前線程需要交換的數據 Object item; // This thread's current item // 匹配線程交換的數據 volatile Object match; // Item provided by releasing thread // 記錄當前掛起的線程 volatile Thread parked; // Set to this thread when parked, else null }
方法exchange
// 交換數據,參數X為本線程提供給其它線程的數據 public V exchange(V x) throws InterruptedException { Object v; // 當參數為null時需要將item設置為空的對象 Object item = (x == null) ? NULL_ITEM : x; // translate null args // 註意到這裡的這個表達式是整個方法的核心 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
仔細看if里的條件表達式,得知: 只有當arena為null時,才會執行slotExchange方法;
當arena不為null或者(arena為null且slotExchange方法返回null)時,此時線程未中斷,才會執行arenaExchange方法;
線程中斷時,就會直接拋出線程中斷異常。
下麵我們來看slotExchange方法
1 // timed 為true表示設置了超時時間,ns為>0的值,反之沒有設置超時時間 2 private final Object slotExchange(Object item, boolean timed, long ns) { 3 // 獲取當前線程node對象 4 Node p = participant.get(); 5 Thread t = Thread.currentThread(); 6 if (t.isInterrupted()) // preserve interrupt status so caller can recheck 7 // 線程中斷返回null 8 return null; 9 10 // 自旋 11 for (Node q;;) { 12 // 將slot值賦給q 13 if ((q = slot) != null) { 14 // slot 不為null,即表示已有線程已經把需要交換的數據設置在slot中了 15 // 通過CAS將slot設置成null 16 if (U.compareAndSwapObject(this, SLOT, q, null)) { 17 // CAS操作成功後,將slot中的item賦值給對象v,以便返回。 18 // 這裡也是就讀取之前線程要交換的數據 19 Object v = q.item; 20 // 將當前線程需要交給的數據設置在q中的match 21 q.match = item; 22 // 獲取被掛起的線程 23 Thread w = q.parked; 24 if (w != null) 25 // 如果線程不為null,喚醒它 26 U.unpark(w); 27 // 返回其他線程給的V 28 return v; 29 } 30 // CAS 操作失敗,表示有其它線程競爭,在此線程之前將數據已取走 31 // create arena on contention, but continue until slot null 32 if (NCPU > 1 && bound == 0 && 33 U.compareAndSwapInt(this, BOUND, 0, SEQ)) 34 // CPU為多核心 35 // bound == 0 表示arena數組未初始化過,CAS操作bound將其增加SEQ 36 // 初始化arena數組 37 arena = new Node[(FULL + 2) << ASHIFT]; 38 } 39 // 上面分析過,只有當arena才會執行slotExchange方法的 40 // 所以表示剛好已有其它線程加入進來將arena初始化 41 else if (arena != null) 42 // 這裡就需要去執行arenaExchange 43 return null; // caller must reroute to arenaExchange 44 else { 45 // 這裡表示當前線程是以第一個線程進來交換數據 46 // 或者表示之前的數據交換已進行完畢,這裡可以看作是第一個線程 47 // 將需要交換的數據先存放在當前線程變數p中 48 p.item = item; 49 // 將需要交換的數據通過CAS設置到交換區slot 50 if (U.compareAndSwapObject(this, SLOT, null, p)) 51 // 交換成功後跳出自旋 52 break; 53 // CAS操作失敗,表示有其它線程剛好先於當前線程將數據設置到交換區slot 54 // 將當前線程變數中的item設置為null,然後自旋獲取其它線程存放在交換區slot的數據 55 p.item = null; 56 } 57 } 58 // 執行到這裡表示當前線程已將需要的交換的數據放置於交換區slot中了, 59 // 等待其它線程交換數據然後喚醒當前線程 60 // await release 61 int h = p.hash; 62 long end = timed ? System.nanoTime() + ns : 0L; 63 // 自旋次數 64 int spins = (NCPU > 1) ? SPINS : 1; 65 Object v; 66 // 自旋等待直到p.match不為null,也就是說等待其它線程將需要交換的數據放置於交換區slot 67 while ((v = p.match) == null) { 68 // 下麵的邏輯主要是自旋等待,直到spins遞減到0為止 69 if (spins > 0) { 70 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; 71 if (h == 0) 72 h = SPINS | (int)t.getId(); 73 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) 74 Thread.yield(); 75 } 76 // slot 和 p本應該是相等,除非其它線程執行了第16行代碼中的CAS操作將slot置為null, 77 // 還未來得及設置match的值,此時只需要自旋等待第21行代碼被其它線程執行, 78 // 這樣p.match才會不為null跳出迴圈 79 else if (slot != p) 80 spins = SPINS; 81 // 此處表示未設置超時或者時間未超時 82 else if (!t.isInterrupted() && arena == null && 83 (!timed || (ns = end - System.nanoTime()) > 0L)) { 84 // 設置線程t被當前對象阻塞 85 U.putObject(t, BLOCKER, this); 86 // 給p掛機線程的值賦值 87 p.parked = t; 88 if (slot == p) 89 // 如果slot還沒有被置為null,也就表示暫未有線程過來交換數據,需要將當前線程掛起 90 U.park(false, ns); 91 // 線程被喚醒,將被掛起的線程設置為null 92 p.parked = null; 93 // 設置線程t未被任何對象阻塞 94 U.putObject(t, BLOCKER, null); 95 } 96 // 不是以上條件時(可能是arena已不為null或者超時) 97 else if (U.compareAndSwapObject(this, SLOT, p, null)) { 98 // arena不為null則v為null,其它為超時則v為超市對象TIMED_OUT,並且跳出迴圈 99 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; 100 break; 101 } 102 } 103 // 取走match值,並將p中的match置為null 104 U.putOrderedObject(p, MATCH, null); 105 // 設置item為null 106 p.item = null; 107 p.hash = h; 108 // 返回交換值 109 return v; 110 }
再來看arenaExchange方法,此方法被執行時表示多個線程進入交換區交換數據,arena數組已被初始化,此方法中的一些處理方式和slotExchange比較類似,它是通過遍歷arena數組找到需要交換的數據
// timed 為true表示設置了超時時間,ns為>0的值,反之沒有設置超時時間 private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; // 獲取當前線程中的存放的node Node p = participant.get(); //index初始值0 for (int i = p.index;;) { // access slot at i // 遍歷,如果在數組中找到數據則直接交換並喚醒線程, // 如未找到則將需要交換給其它線程的數據放置於數組中 int b, m, c; long j; // j is raw array offset // 其實這裡就是向右遍曆數組,只是用到了元素在記憶體偏移的偏移量 // q實際為arena數組偏移(i + 1) * 128個地址位上的node Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 如果q不為null,並且CAS操作成功,將下標j的元素置為null if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 表示當前線程已發現有交換的數據,然後獲取數據,喚醒等待的線程 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // q 為null 並且 i 未超過數組邊界 else if (i <= (m = (b = bound) & MMASK) && q == null) { // 將需要給其它線程的item賦予給p中的item p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { // 交換成功 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 自旋直到有其它線程進入,遍歷到該元素並與其交換,同時當前線程被喚醒 for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { // 其它線程設置的需要交換的數據match不為null // 將match設置null,item設置為null U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { // 遞減自旋次數 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的類似,arena數組中的數據已被CAS設置 // match值還未設置,讓其再自旋會等待match被設置 spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { // 設置線程t被當前對象阻塞 U.putObject(t, BLOCKER, this); // emulate LockSupport // 線程t賦值 p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) // 數組中對象還相等,表示線程還未被喚醒,喚醒線程 U.park(false, ns); p.parked = null; // 設置線程t未被任何對象阻塞 U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // 這裡給bound增加加一個SEQ if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else // 交換失敗,表示有其它線程更改了arena數組中下標i的元素 p.item = null; // clear offer } else { // 此時表示下標不在bound & MMASK或q不為null但CAS操作失敗 // 需要更新bound變化後的值 if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; // 反向遍歷 i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 記錄CAS失敗的次數 p.collides = c + 1; // 迴圈遍歷 i = (i == 0) ? m : i - 1; // cyclically traverse } else // 此時表示bound值增加了SEQ+1 i = m + 1; // grow // 設置下標 p.index = i; } } }
總結
讀到這裡是不是還是感覺有很多疑問?
- 先看為什麼
ASHIFT
設置成7,這是為了儘量避免slot數組中不同的元素在同一個緩存行上,<< ASHIFT
左移7位,表示至少移動了128地址位,而我們主流的緩存行大小一般為32位元組到256位元組,所以128個地址位基本覆蓋到了常見的處理器平臺。arena數組中元素的分佈如圖,它們之間間隔128個整數倍地址位,也就是說最小相差128個地址位。 - 為什麼Node類用
@sun.misc.Contended
註解呢?該註解是jdk8新增的註解,是為瞭解決之前手動填充數據的問題。填充數據也是為了避免arena數組中的不同的元素共用同一個緩存行,導致多線程修改數據時性能受到影響。