前幾天和朋友閑聊,說遇到了一個ConcurrentHashMap死迴圈問題,當時心裡想這不科學呀?ConcurrentHashMap怎麼還有死迴圈呢,畢竟它已經解決HashMap中rehash中死迴圈問題了,但是隨著深入的分析,發現事情並沒有之前想的那麼簡單~ (以下分析基於jdk版本:jdk1.8 ...
前幾天和朋友閑聊,說遇到了一個ConcurrentHashMap死迴圈問題,當時心裡想這不科學呀?ConcurrentHashMap怎麼還有死迴圈呢,畢竟它已經解決HashMap中rehash中死迴圈問題了,但是隨著深入的分析,發現事情並沒有之前想的那麼簡單~ (以下分析基於jdk版本:jdk1.8.0_171)
保險起見,不能直接貼出出現問題的業務代碼,因此將該問題簡化成如下代碼:
ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>(); // map預設capacity 16,當元素個數達到(capacity - capacity >> 2) = 12個時會觸發rehash for (int i = 0; i < 11; i++) { map.put(i, i); } map.computeIfAbsent(12, (k) -> { // 這裡會導致死迴圈 :( map.put(100, 100); return k; }); // 其他操作
感興趣的小伙伴可以在電腦上運行下,話不說多,先說下問題原因:當執行computeIfAbsent
時,如果key對應的slot為空,此時會創建ReservationNode
對象(hash值為RESERVED=-3
)放到當前slot位置,然後調用mappingFunction.apply(key)
生成value,根據value創建Node之後賦值到slow位置,此時完成computeIfAbsent
流程。但是上述代碼mappingFunction
中又對該map進行了一次put操作,並且觸發了rehash操作,在transfer
中遍歷slot數組時,依次判斷slot對應Node是否為null、hash值是否為MOVED=-1、hash值否大於0(list結構)、Node類型是否是TreeBin(紅黑樹結構),唯獨沒有判斷hash值為RESERVED=-3
的情況,因此導致了死迴圈問題。
問題分析到這裡,原因已經很清楚了,當時我們認為,這可能是jdk的“bug”
,因此我們最後給出的解決方案是:
- 如果在rehash時出現了
slot
節點類型是ReservationNode
,可以給個提示,比如拋異常; - 理論上來說,
mappingFunction
中不應該再對當前map進行更新操作了,但是jdk並沒有禁止不能這樣用,最好說明下。
最後,另一個朋友看了computeIfAbsent
的註釋:
1 /** 2 * If the specified key is not already associated with a value, 3 * attempts to compute its value using the given mapping function 4 * and enters it into this map unless {@code null}. The entire 5 * method invocation is performed atomically, so the function is 6 * applied at most once per key. Some attempted update operations 7 * on this map by other threads may be blocked while computation 8 * is in progress, so the computation should be short and simple, 9 * and must not attempt to update any other mappings of this map. 10 */ 11 public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)
我們發現,其實人家已經知道了這個問題,還特意註釋說明瞭。。。我們還是too yong too simple
啊。至此,ConcurrentHashMap死迴圈問題告一段落,還是要遵循編碼規範,不要在mappingFunction
中再對當前map進行更新操作。其實ConcurrentHashMap死迴圈不僅僅出現在上述討論的場景中,以下場景也會觸發,原因和上述討論的是一樣的,代碼如下,感興趣的小伙伴也可以本地跑下:
1 ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>(); 2 map.computeIfAbsent(12, (k) -> { 3 map.put(k, k); 4 return k; 5 }); 6 7 System.out.println(map); 8 // 其他操作
最後,一起跟著computeIfAbsent源碼來分下上述死迴圈代碼的執行流程,限於篇幅,只分析下主要流程代碼:
1 public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { 2 if (key == null || mappingFunction == null) 3 throw new NullPointerException(); 4 int h = spread(key.hashCode()); 5 V val = null; 6 int binCount = 0; 7 for (Node<K,V>[] tab = table;;) { 8 Node<K,V> f; int n, i, fh; 9 if (tab == null || (n = tab.length) == 0) 10 tab = initTable(); 11 else if ((f = tabAt(tab, i = (n - 1) & h)) == null) { 12 Node<K,V> r = new ReservationNode<K,V>(); 13 synchronized (r) { 14 // 這裡使用synchronized針對局部對象意義不大,主要是下麵的cas操作保證併發問題 15 if (casTabAt(tab, i, null, r)) { 16 binCount = 1; 17 Node<K,V> node = null; 18 try { 19 // 這裡的value返回可能為null呦 20 if ((val = mappingFunction.apply(key)) != null) 21 node = new Node<K,V>(h, key, val, null); 22 } finally { 23 setTabAt(tab, i, node); 24 } 25 } 26 } 27 if (binCount != 0) 28 break; 29 } 30 else if ((fh = f.hash) == MOVED) 31 tab = helpTransfer(tab, f); 32 else { 33 boolean added = false; 34 synchronized (f) { 35 // 僅僅判斷了node.hash >=0和node為TreeBin類型情況,未判斷`ReservationNode`類型 36 // 擴容時判斷和此處類似 37 if (tabAt(tab, i) == f) { 38 if (fh >= 0) { 39 binCount = 1; 40 for (Node<K,V> e = f;; ++binCount) { 41 K ek; V ev; 42 if (e.hash == h && 43 ((ek = e.key) == key || 44 (ek != null && key.equals(ek)))) { 45 val = e.val; 46 break; 47 } 48 Node<K,V> pred = e; 49 if ((e = e.next) == null) { 50 if ((val = mappingFunction.apply(key)) != null) { 51 added = true; 52 pred.next = new Node<K,V>(h, key, val, null); 53 } 54 break; 55 } 56 } 57 } 58 else if (f instanceof TreeBin) { 59 binCount = 2; 60 TreeBin<K,V> t = (TreeBin<K,V>)f; 61 TreeNode<K,V> r, p; 62 if ((r = t.root) != null && 63 (p = r.findTreeNode(h, key, null)) != null) 64 val = p.val; 65 else if ((val = mappingFunction.apply(key)) != null) { 66 added = true; 67 t.putTreeVal(h, key, val); 68 } 69 } 70 } 71 } 72 if (binCount != 0) { 73 if (binCount >= TREEIFY_THRESHOLD) 74 treeifyBin(tab, i); 75 if (!added) 76 return val; 77 break; 78 } 79 } 80 } 81 if (val != null) 82 // 計數統計&閾值判斷+擴容操作 83 addCount(1L, binCount); 84 return val; 85 }
推薦閱讀:
更多文章可掃描以下二維碼: