Exchanger,併發工具類,線程協作,用於線程間的數據交換。 ...
Exchanger,併發工具類,線程協作,用於線程間的數據交換。
常量介紹
1 private static final int ASHIFT = 7; // 兩個有效槽(slot -> Node)之間的位元組地址長度(記憶體地址,以位元組為單位),1 << 7至少為緩存行的大小,防止偽共用 2 private static final int MMASK = 0xff; // 場地(一排槽,arena -> Node[])的可支持的最大索引,可分配的大小為 MMASK + 1 3 private static final int SEQ = MMASK + 1; // bound的遞增單元,確立其唯一性 4 private static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的個數,用於場地大小和自旋控制 5 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 最大的arena索引 6 private static final int SPINS = 1 << 10; // 自旋,NCPU = 1時,禁用 7 private static final Object NULL_ITEM = new Object();// 空對象,對應null 8 private static final Object TIMED_OUT = new Object();// 超時對象,對應timeout
說明
arena,一排slot,為的是獲得良好的伸縮性,避免所有的線程爭用同一個槽位。
偽共用,高速緩存與記憶體之間是以緩存行為單位交換數據的,根據局部性原理,相鄰地址空間的數據會被載入到高速緩存的同一個數據塊上(緩存行),而數組是連續的(邏輯,涉及到虛擬記憶體)記憶體地址空間,因此,多個slot會被載入到同一個緩存行上,當一個slot改變時,會導致這個slot所在的緩存行上所有的數據(包括其他的slot)無效,需要從記憶體重新載入,影響性能。所以,為了避免這種情況,需要填充數據,使得有效的slot不被載入到同一個緩存行上。
SEQ,bound的遞增單元,確定其唯一性(高位)
FULL,最大的arena索引,依賴與CPU的個數,最大是MMASK的值
SPINS,用於自旋等待,是最輕量的等待,依次是 spin -> yield -> block
數據結構Node
1 static final class Node { 2 int index; // arena的索引 3 int bound; // 記錄上次的bound 4 int collides; // 當前bound下CAS失敗的次數 5 int hash; // 偽隨機,用於自旋 6 Object item; // 當前線程攜帶的數據 7 volatile Object match; // 存放別的線程攜帶的數據 8 volatile Thread parked; // 掛在此結點上阻塞著的線程 9 }
說明
collides,記錄當前bound下CAS失敗的次數,最大為m,m(bound & MMASK)為當前bound下最大有效索引,從右往左遍歷,等待collides == m時,有效索引的槽位也已經遍歷完了,這時需要增長槽位,增長的方式是重置bound(依賴SEQ更新其版本,高位;+1,低位),同時collides重置
數據結構Participant
1 // 每個線程攜帶一個Node 2 static final class Participant extends ThreadLocal<Node> { 3 public Node initialValue() { 4 return new Node(); 5 } 6 }
說明
Participant直接繼承自ThreadLocal保存當前線程攜帶的Node,交換操作主要依賴Node的行為
屬性介紹
1 private final Participant participant;// 每個線程攜帶一個Node 2 private volatile Node[] arena; // 場地,Node數組 3 private volatile Node slot;// 槽,單個Node 4 private volatile int bound;// 當前最大有效arena索引,高8位+SEQ確立其唯一性,低8位記錄有效索引
說明
bound,記錄最大有效的arena索引,動態變化,競爭激烈時(槽位全滿)增加, 槽位空曠時減小。bound + SEQ +/- 1,其高位+ 1(SEQ,oxff + 1)確定其版本唯一性(比如,+1後,又-1,實際上是兩個版本的bound,collides要重置的),低位+/-1實際有效的索引(&MMASK)
exchange方法
1 public V exchange(V x) throws InterruptedException { 2 Object v; 3 Object item = (x == null) ? NULL_ITEM : x; // 轉換成空對象 4 // arena == null, 路由到slotExchange(單槽交換), 如果arena != null或者單槽交換失敗,且線程沒有被中斷,則路由到arenaExchange(多槽交換),返回null,則拋出中斷異常 5 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) 6 && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) 7 throw new InterruptedException(); 8 return (v == NULL_ITEM) ? null : (V) v; 9 }
說明
首先判斷arena是否為null,如果為null,則調用slotExchange方法
如果arena不為null,或者slotExchange方法返回null,然後判斷當前線程是否被中斷(中斷標記),有則拋出中斷異常,沒有則繼續調用arenaExchange方法,如果該方法返回null,拋出中斷異常
最後返回結果。
帶超時的exchange方法
1 public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { 2 Object v; 3 Object item = (x == null) ? NULL_ITEM : x;// 轉換成空對象 4 long ns = unit.toNanos(timeout); 5 // arena == null, 路由到slotExchange(單槽交換), 如果arena != null或者單槽交換失敗,且線程沒有被中斷,則路由到arenaExchange(多槽交換),返回null,則拋出中斷異常 6 if ((arena != null || (v = slotExchange(item, true, ns)) == null) 7 && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) 8 throw new InterruptedException(); 9 if (v == TIMED_OUT)// 超時 10 throw new TimeoutException(); 11 return (v == NULL_ITEM) ? null : (V) v; 12 }
說明
同上,加了超時的判斷。
slotExchange方法
1 private final Object slotExchange(Object item, boolean timed, long ns) { 2 Node p = participant.get(); // 獲取當前線程攜帶的Node 3 Thread t = Thread.currentThread(); // 當前線程 4 if (t.isInterrupted()) // 保留中斷狀態,以便調用者可以重新檢查,Thread.interrupted() 會清除中斷狀態標記 5 return null; 6 for (Node q;;) { 7 if ((q = slot) != null) { // slot不為null, 說明已經有線程在這裡等待了 8 if (U.compareAndSwapObject(this, SLOT, q, null)) { // 將slot重新設置為null, CAS操作 9 Object v = q.item; // 取出等待線程攜帶的數據 10 q.match = item; // 將當前線程的攜帶的數據交給等待線程 11 Thread w = q.parked; // 可能存在的等待線程(可能中斷,不等了) 12 if (w != null) 13 U.unpark(w); // 喚醒等待線程 14 return v; // 返回結果,交易成功 15 } 16 // CPU的個數多於1個,並且bound為0時創建 arena,並將bound設置為SEQ大小 17 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) 18 arena = new Node[(FULL + 2) << ASHIFT]; // 根據CPU的個數估計Node的數量 19 } else if (arena != null) 20 return null; // 如果slot為null, 但arena不為null, 則轉而路由到arenaExchange方法 21 else { // 最後一種情況,說明當前線程先到,則占用此slot 22 p.item = item; // 將攜帶的數據卸下,等待別的線程來交易 23 if (U.compareAndSwapObject(this, SLOT, null, p)) // 將slot的設為當前線程攜帶的Node 24 break; // 成功則跳出迴圈 25 p.item = null; // 失敗,將數據清除,繼續迴圈 26 } 27 } 28 // 當前線程等待被釋放, spin -> yield -> block/cancel 29 int h = p.hash; // 偽隨機,用於自旋 30 long end = timed ? System.nanoTime() + ns : 0L; // 如果timed為true,等待超時的時間點; 0表示沒有設置超時 31 int spins = (NCPU > 1) ? SPINS : 1; // 自旋次數 32 Object v; 33 while ((v = p.match) == null) { // 一直迴圈,直到有線程來交易 34 if (spins > 0) { // 自旋,直至spins不大於0 35 h ^= h << 1; // 偽隨機演算法, 目的是等h小於0(隨機的) 36 h ^= h >>> 3; 37 h ^= h << 10; 38 if (h == 0) // 初始值 39 h = SPINS | (int) t.getId(); 40 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) 41 Thread.yield(); // 等到h < 0, 而spins的低9位也為0(防止spins過大,CPU空轉過久),讓出CPU時間片,每一次等待有兩次讓出CPU的時機(SPINS >>> 1) 42 } else if (slot != p) // 別的線程已經到來,正在準備數據,自旋等待一會兒,馬上就好 43 spins = SPINS; 44 // 如果線程沒被中斷,且arena還沒被創建,並且沒有超時 45 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { 46 U.putObject(t, BLOCKER, this); // 設置當前線程將阻塞在當前對象上 47 p.parked = t; // 掛在此結點上的阻塞著的線程 48 if (slot == p) 49 U.park(false, ns); // 阻塞, 等著被喚醒或中斷 50 p.parked = null; // 醒來後,解除與結點的聯繫 51 U.putObject(t, BLOCKER, null); // 解除阻塞對象 52 } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超時或其他(取消),給其他線程騰出slot 53 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; 54 break; 55 } 56 } 57 // 歸位 58 U.putOrderedObject(p, MATCH, null); 59 p.item = null; 60 p.hash = h; 61 return v; 62 }
說明
1. 檢查slot是否為空(null),不為空,說明已經有線程在此等待,嘗試占領該槽位,如果占領成功,與等待線程交換數據,並喚醒等待線程,交易結束,返回。
2. 如果占領槽位失敗,創建arena,但要繼續【步驟1】嘗試搶占slot,直至slot為空,或者搶占成功,交易結束返回。
3. 如果slot為空,則判斷arena是否為空,如果arena為空,返回null,重新路由到arenaExchange方法
4. 如果arena為空,說明當前線程是先到達的,嘗試占有slot,如果成功,將slot標記為自己占用,跳出迴圈,繼續【步驟5】,如果失敗,則繼續【步驟1】
5 當前線程等待被釋放,等待的順序是先自旋(spin),不成功則讓出CPU時間片(yield),最後還不行就阻塞(block),spin -> yield -> block
6. 如果超時(設置超時的話)或被中斷,則退出迴圈。
7. 最後,重置數據,下次重用,返回結果,結束。
arenaExchange方法
1 private final Object arenaExchange(Object item, boolean timed, long ns) { 2 Node[] a = arena; // 交換場地,一排slot 3 Node p = participant.get(); // 獲取當前線程攜帶的Node 4 for (int i = p.index;;) { // arena的索引,數組下標 5 int b, m, c; 6 long j; // 原數組偏移量,包括填充值 7 // 從場地中選出偏移地址為(i << ASHIFT) + ABASE的記憶體值,也即真正可用的Node 8 Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); 9 if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 此槽位不為null, 說明已經有線程在這裡等了,重新將其設置為null, CAS操作 10 Object v = q.item; // 取出等待線程攜帶的數據 11 q.match = item; // 將當前線程攜帶的數據交給等待線程 12 Thread w = q.parked; // 可能存在的等待線程 13 if (w != null) 14 U.unpark(w); // 喚醒等待線程 15 return v; // 返回結果, 交易成功 16 } else if (i <= (m = (b = bound) & MMASK) && q == null) { // 有效交換位置,且槽位為空 17 p.item = item; // 將攜帶的數據卸下,等待別的線程來交易 18 if (U.compareAndSwapObject(a, j, null, p)) { // 槽位占領成功 19 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; // 計算出超時結束時間點 20 Thread t = Thread.currentThread(); // 當前線程 21 for (int h = p.hash, spins = SPINS;;) { // 一直迴圈,直到有別的線程來交易,或超時,或中斷 22 Object v = p.match; // 檢查是否有別的線程來交換數據 23 if (v != null) { // 有則返回 24 U.putOrderedObject(p, MATCH, null); // match重置,等著下次使用 25 p.item = null; // 清空,下次接著使用 26 p.hash = h; 27 return v; // 返回結果,交易結束 28 } else if (spins > 0) { // 自旋 29 h ^= h << 1; 30 h ^= h >>> 3; 31 h ^= h << 10; // 移位加異或,偽隨機 32 if (h == 0) // 初始值 33 h = SPINS | (int) t.getId(); 34 else if (h < 0 && // SPINS >>> 1, 一半的概率 35 (--spins & ((SPINS >>> 1) - 1)) == 0) 36 Thread.yield(); // 每一次等待有兩次讓出CPU的時機 37 } else if (U.getObjectVolatile(a, j) != p) 38 spins = SPINS; // 別的線程已經到來,正在準備數據,自旋等待一會兒,馬上就好 39 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { 40 U.putObject(t, BLOCKER, this); // 設置當前線程將阻塞在當前對象上 41 p.parked = t; // 掛在此結點上的阻塞著的線程 42 if (U.getObjectVolatile(a, j) == p) 43 U.park(false, ns); // 阻塞, 等著被喚醒或中斷 44 p.parked = null; // 醒來後,解除與結點的聯繫 45 U.putObject(t, BLOCKER, null); // 解除阻塞對象 46 } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { 47 if (m != 0) // 嘗試縮減 48 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 更新bound, 高位遞增,低位 -1 49 p.item = null; // 重置 50 p.hash = h; 51 i = p.index >>>= 1; // 索引減半,為的是快速找到匯合點(最左側) 52 if (Thread.interrupted())// 保留中斷狀態,以便調用者可以重新檢查,Thread.interrupted() 會清除中斷狀態標記 53 return null; 54 if (timed && m == 0 && ns <= 0L) // 超時 55 return TIMED_OUT; 56 break; // 重新開始 57 } 58 } 59 } else 60 p.item = null; // 重置 61 } else { 62 if (p.bound != b) { // 別的線程更改了bound,重置collides為0, i的情況如下:當i != m, 或者m = 0時,i = m; 否則,i = m-1; 從右往左遍歷 63 p.bound = b; 64 p.collides = 0; 65 i = (i != m || m == 0) ? m : m - 1; // index 左移 66 } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 更新bound, 高位遞增,低位 +1 67 p.collides = c + 1; 68 i = (i == 0) ? m : i - 1; // 左移,遍歷槽位,m == FULL時,i == 0(最左側),重置i = m, 重新從右往左迴圈遍歷 69 } else 70 i = m + 1; // 槽位增長 71 p.index = i; 72 } 73 } 74 }
說明
1. 從場地中選出偏移地址為(i << ASHIFT) + ABASE的記憶體值,也即第i個真正可用的Node,判斷其槽位是否為空,不為空,說明有線程在此等待,嘗試搶占改槽位,如果搶占成功,交換數據,並喚醒等待線程,返回,結束。
2. 如果搶占失敗,轉向【步驟7】,或者槽位為空,轉向【步驟3】
3. 判斷當前結點的索引是否大於當前bound下最大有效索引,如果大於,或者槽位為空,轉向【步驟7】,否則繼續【步驟4】
4. 槽位為空,嘗試占領,如果失敗,轉向【步驟1】,否則繼續【步驟5】
5. 當前線程等待被釋放,等待的順序是先自旋(spin),不成功則讓出CPU時間片(yield),最後還不行就阻塞(block),spin -> yield -> block
6 放棄改槽位,如果放棄失敗,說明有線程來了,準備交換數據,轉向【步驟1】,否則放棄成功,嘗試減少槽位,縮短索引(-1),並且改結點的索引減半;或中斷/超時返回,結束;或跳出迴圈,繼續【步驟1】
7. 每一個版本的bound,使線程從右往左遍歷,嘗試交換數據,失敗次數達到最大值,則增加槽位,更新bound,重置collies,並更新結點的索引值,使其從最右邊往左遍歷,嘗試交換數據。繼續【步驟1】
Unsafe
1 private static final sun.misc.Unsafe U; 2 private static final long BOUND; 3 private static final long SLOT; 4 private static final long MATCH; 5 private static final long BLOCKER; 6 private static final int ABASE; 7 static { 8 int s; 9 try { 10 U = sun.misc.Unsafe.getUnsafe(); 11 Class<?> ek = Exchanger.class; 12 Class<?> nk = Node.class; 13 Class<?> ak = Node[].class; 14 Class<?> tk = Thread.class; 15 BOUND = U.objectFieldOffset(ek.getDeclaredField("bound")); 16 SLOT = U.objectFieldOffset(ek.getDeclaredField("slot")); 17 MATCH = U.objectFieldOffset(nk.getDeclaredField("match")); 18 BLOCKER = U.objectFieldOffset(tk.getDeclaredField("parkBlocker")); 19 s = U.arrayIndexScale(ak); // 數組增量地址 20 ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); // 數組首元素偏移地址 21 } catch (Exception e) { 22 throw new Error(e); 23 } 24 if ((s & (s - 1)) != 0 || s > (1 << ASHIFT)) 25 throw new Error("Unsupported array scale"); 26 }
說明
s為數組中每個元素占用的地址空間大小
ABASE為數組首元素偏移地址,防止偽共用
最後
arena = new Node[(FULL + 2) << ASHIFT]
FULL,<= MMASK
scale,<= 1 << ASHIFT,
說明(FULL + 2)<< ASHIFT 個Node,真正可用的是FULL + 2個,而且還是當scale == 1 << ASHIFT的時候,正常情況下,可用的Node應該多於這個值。最大的有效索引是MMASK(bound & MMASK),但m(實際的最大索引)增長到FULL時,不再增長,會迴圈遍歷槽位,嘗試交換數據。由於數組元素最少可用的個數為FULL + 2個,因此不會出現下標越界的情況。
備註
由於水平有限,難免有紕漏,如有錯誤,歡迎批評指正,晚安。