1. 前言 HashMap是非線程安全的,在多線程訪問時沒有同步機制,併發場景下put操作可能導致同一數組下的鏈表形成閉環,get時候出現死迴圈,導致CPU利用率接近100%。 HashTable是線程安全的,使用synchronized鎖住整個table的方式來保證併發訪問下的線程安全,但效率卻比 ...
1. 前言
HashMap是非線程安全的,在多線程訪問時沒有同步機制,併發場景下put操作可能導致同一數組下的鏈表形成閉環,get時候出現死迴圈,導致CPU利用率接近100%。
HashTable是線程安全的,使用synchronized鎖住整個table的方式來保證併發訪問下的線程安全,但效率卻比較低下。因為線程1調用HashTable的put同步方法時,線程2的put或get等方法則進入阻塞狀態,所以競爭越激烈,效率越低。
ConcurrentHashMap是支持高併發、高吞吐量的線程安全的Map實現。下麵會通過閱讀 ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的源碼來瞭解它的演變過程。
2. ConcurrentHashMap在JDK1.7中的設計
2.1. 數據結構和鎖分段
HashTable在競爭激烈的併發環境中效率低下的原因是:訪問HashTable的線程都競爭同一把鎖。
ConcurrentHashMap卻允許多個修改操作併發的進行,其關鍵是運用了鎖分段技術:將容器內的數據分為一段段(Segment)的來存儲,每段可以看成一個小的HashTable,每段數據都分配一把鎖。當一個線程占用鎖訪問這一段數據時,其他線程可以訪問其他段的數據。那麼當多線程併發訪問容器內不同鎖鎖住的數據時,線程間就不存在鎖競爭,從而有效的提升效率。
ConcurrentHashMap的數據結構如下:
ConcurrentHashMap 由 Segment 數組和 HashEntry 數組組成。一個 ConcurrentHashMap 里包含一個 Segment 數組,Segment 的結構和 HashMap 類似,是一種數組和鏈表結構。一個 Segment 里包含一個 HashEntry 數組,每個 HashEntry 用於存儲 key-value 鍵值對數據,同時指向下一個 HashEntry 節點。當定位 key 在 ConcurrentHashMap 中的位置時,需要先經過一次 hash 定位到 Segment 的位置,然後在 hash 定位到指定的HashEntry。
Segment 是一種可重入鎖 ReentrantLock,在 ConcurrentHashMap 里扮演鎖的角色,每個 Segment 守護著一個HashEntry 數組裡的元素,當對 HashEntry 數組的數據進行修改時,必須首先獲得它對應的Segment鎖。而大多讀操作是不加鎖的,但是 size()、containsValue() 遇到併發修改競爭時需要全表加鎖。
2.2. 結構
ConcurrentHashMap:下麵是ConcurrentHashMap中的數據成員以及構造函數源碼:
構造函數主要做了兩件事:1)參數的校驗;2)table初始化長度
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * Mask value for indexing into segments. The upper bits of a * key's hash code are used to choose the segment. */ final int segmentMask; /** * Shift value for indexing within segments. */ final int segmentShift; /** * The segments, each of which is a specialized hash table. */ final Segment<K,V>[] segments; // 構造函數 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 找到第一個 >= concurrencyLevel 的 2次方數,作為後續 Segment數組大小 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 上文中通過比較concurrencyLevel而計算出的ssize作為數組大小 // 使用的是StoreStore記憶體屏障,而不是較慢的StoreLoad記憶體屏障(使用在volatile寫操作上)。實現寫入不會被JIT重新排序指令,性能雖然提升,但寫結果不會被立即看到。 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } }
所有成員都是final修飾的,保證線程的安全發佈。segmentMask 和 segmentShift 主要是為了定位段。
concurrencyLevel 參數表示期望併發的修改 ConcurrentHashMap 的線程數量,用於決定 Segment 的數量,具體方式是通過找到第一個 >= concurrentcyLevel 的 2的次方數作為 Segment 數組的大小。預設值為16,Segment數組大小也為16,如果設置 concurrentcyLevel = 100,那麼 Segment 數組大小則為128。
Segment:每個Segment相當於ConcurrentHashMap的一個子 hash表,Segment繼承了ReetrantLock,為了方便使用加鎖的功能,如lock,tryLock等。數據成員如下:static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; }
table:鏈表數組,每個數組元素是一個hash鏈表的頭部。table是volatile的,使得每次都能讀取到最新的table值而不需要同步。
count:Segment中元素的數量。每次修改操作做了結構上的改變,如添加/刪除節點(更新節點的value不算結構上的改變),都要更新count值。
modCount:對table的結構進行修改的次數。
threshold:若Segment里的元素數量超過這個值,則就會對Segment進行擴容。
loadFactor:負載因數,threshold = capacity * threshold。
HashEntry:Segment中的類,代表 hash 鏈表中的一個節點,源碼如下:
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
2.3. put
下麵是CouncurrentHashMap的put操作的源碼:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; // 對key求hash,並取模,然後找到對應Segment數組下標位置 if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) s = ensureSegment(j); // 當 Segment 為空時,會創建 return s.put(key, hash, value, false); // put操作會委托給Segment的put }
下麵是ensureSegment的代碼:
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // UNSAFE進行volatile讀Segment數組對應位置的值 Segment<K,V> proto = ss[0]; // use segment 0 as prototype 如果為空,則從Segment[0]複製Entry數組長度capacity,loadFactor int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck 再次volatile讀Segment數組,如果為空,繼續新建Segment對象 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 使用while迴圈和UNSAFE的CAS原子性替換Segment數組對應下標的元素。使用樂觀鎖的方式 // 當線程t1和t2都讀取Segment[u]==null時,只有一個線程能通過CAS替換成功。假設t1替換成功了,下一次while迴圈t2能volatile讀取到替換的值 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
put最終交給 Segment 的 put 方法,每個Segment相當於一個HashMap,put操作就是要在HashMap中尋找對應的key是否存在,如果存在則更新value,如不存在則新建一個HashEntry。put需要加鎖,使用了ReetrantLock的tryLock的非阻塞加鎖方法。源碼如下:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // tryLock嘗試加鎖,如果加鎖成功,node=null。否則自旋等待並尋找對應key的節點是否存在 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); // 獲取鎖後,對HashEntry的table數組取模,獲取數組下標index的第一個節點 for (HashEntry<K,V> e = first;;) { // first節點後面的鏈表,向後遍歷,尋找與key相同的節點 if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; // 找到與key相同的節點,按onlyIfAbsent判斷是否替換舊的value值 ++modCount; } break; } e = e.next; } else { // e==null,代表沒有找到。新建HashEntry節點或使用scanAndLockForPut中創建的節點作為新的鏈表頭節點 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 如果當前總節點個數 > threashold,則rehash擴容 // Segment rehash是獲取鎖之後進行的,是將數組長度擴大一倍,將舊的數組元素複製到新數組中(前後有獲取鎖和釋放鎖的語義,不需要考慮多線程問題) rehash(node); else setEntryAt(tab, index, node); // 設置新HashEntry節點到table對應的位置中 ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); // 最後解鎖 } return oldValue; }
有一個點:HashEntry<K,V>[] tab = table。一開始將table賦值給tab局部變數,可以減少直接引用table時帶來的性能損失,因為table是一個volatile變數,不能進行優化,而賦值給tab普通變數後,可以實現編譯、運行時的優化。
2.4. get
ConcurrentHashMap的get操作,源碼如下:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); // 對key求hash long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // key對應的Segment的數組下標 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && // 先找對應的Segment。volatile讀語義保證記憶體的可見性 (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); // 再在Segment中找對應的HashEntry e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
get操作不需要加鎖,通過volatile保證可見性,如果同時有put併發操作增加HashEntry,由於是在鏈表頭部添加(頭插法),不會對get造成影響。
2.5. size
計算ConcurrentHashMap的 size 是一個有趣的問題,因為在計算的時候,還會在併發的插入數據,可能導致計算出的size和實際的size有出入。
JDK1.7中先後採取了兩個方案:
第一種方案:先使用不加鎖的模式先嘗試遍歷兩次ConcurrentHashMap計算size,如果兩次遍歷過程中所有segment中的modCount的和是一致的,則可以認為整個計算過程中的Map沒有發生變化(添加或刪除HashEntry節點),返回size。
第二種方案:如果第一種方案不符合(Map發生了結構變化),就給每個Segment加鎖,然後計算ConcurrentHashMap的size,解鎖,最後返回。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { // 如果for迴圈達到RETRIES_BEFORE_LOCK,則表示前民幾次累計的modCount都不相等,其他線程併發修改ConcurrentHashMap導致數據結構一直在改變。 // 降級為依次對Segment進行加鎖,此時其他線程改變數據結構就會阻塞等待 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { // 迴圈每個Segment,累加Segment中的count和modCount Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) // 如果當前sum和之前計算的sum相等,即各Segment累加的modCount相等。就可以認為兩次for迴圈間沒有其他線程修改內部數據結構。直接返回size break; last = sum; // last等於最近一次計算的sum值 } } finally { if (retries > RETRIES_BEFORE_LOCK) { // 如果加鎖了,最終解鎖 for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
3. ConcurrentHashMap在JDK1.8中的改進
3.1. 改進
改進1:取消segment分段鎖,直接使用 transient volatile Node<K,V>[] table; 來保存數組。採用table數組元素作為鎖,從而實現對每一行數據進行加鎖,進一步減少了衝突的概率。
改進2:將原先table數組 + 鏈表的數據結構,變更為table數組 + 鏈表 / 紅黑樹 的結構,同HashMap在JDK1.8的數據結構的改進。優化為紅黑樹的好處是:當一個鏈表長度過長時,查詢某個節點的時間複雜度為O(N),而當鏈表長度超過8時,將鏈表轉化為紅黑樹,查詢節點的時間複雜度可以降低為O(logN),從而提升了性能。
改進3:併發控制使用synchronized和CAS,使用synchronized替換ReetrantLock。在ConcurrentHashMap中可以看到很多的 U.compareAndSwapXXX,通過CAS演算法實現無鎖化修改值的操作,降低了鎖的性能消耗。CAS的思想是不斷的比較當前記憶體中的變數值和所指定的變數值是否相等,如果相等,則接受指定的修改值;否則,拒絕操作,類似與樂觀鎖。
數據結構如下:
3.2. 結構:Node和TreeBin
Node是ConcurrentHashMap存儲結構的基本單元,用於存儲key-value鍵值對,是一個鏈表,但只允許查找數據,不允許修改數據。源碼如下:
/** * Key-value entry. This class is never exported out as a * user-mutable Map.Entry (i.e., one supporting setValue; see * MapEntry below), but can be used for read-only traversals used * in bulk tasks. Subclasses of Node with a negative hash field * are special, and contain null keys and values (but are never * exported). Otherwise, keys and vals are never null. */ static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; // value和next使用volatile來保證可見性和禁止重排序 volatile V val; volatile Node<K,V> next; // 指向下一個Node節點 Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } // 不允許更新value public final V setValue(V value) { throw new UnsupportedOperationException(); } /** * Virtualized support for map.get(); overridden in subclasses. */ // 用於map.get()方法,子類重寫 Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
TreeBin是封裝TreeNode的容器,提供了轉化紅黑樹的一些條件和鎖的控制,部分源碼如下:
/** * TreeNodes used at the heads of bins. TreeBins do not hold user * keys or values, but instead point to list of TreeNodes and * their root. They also maintain a parasitic read-write lock * forcing writers (who hold bin lock) to wait for readers (who do * not) to complete before tree restructuring operations. */ static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; // 指向TreeNode的根節點 volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock ... }
3.3. put
put源碼如下:
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 求key的hash值,兩次hash,使hash值能均勻分佈 int hash = spread(key.hashCode()); int binCount = 0; // 迭代Node[] table數組 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 1) 如果table為空,則初始化,ConcurrentHashMap構造方法未初始化Node數組,而是在put中實現,屬於懶漢式初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 2) 如果table不為空,根據hash值計算得到key在table中的索引i,如果table[i]為null,使用CAS方式新建Node節點(table[i]為鏈表或紅黑樹的首節點) else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin,不進行加鎖 } // 3) 如果table[i]不為空,並且hash值為MOVED(-1),表明該鏈表正在進行transfer擴容操作,幫助擴容完成 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 4) 以上條件都不滿足,也就是存在hash衝突。對鏈表或紅黑樹的頭節點進行加鎖操作(不再是segment,進一步減少了線程衝突) synchronized (f) { if (tabAt(tab, i) == f) { // hash值>0,表示該節點是鏈表結構。只需向後遍歷即可 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 如果在鏈表中找到值為key的節點,按onlyIfAbsent判斷是否替換value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 沒有找到值為key的節點,直接新建Node並加入鏈表尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果首節點為紅黑樹結構,putTreeValue存放key-value對 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果鏈表節點數>8,則將鏈表結構轉換為紅黑樹結構 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // size加1,會檢查當前size是否超過sizeCtl,如果是則觸發transfer擴容操作 addCount(1L, binCount); return null; }
大致流程為:
- 如果table為空,沒有初始化,則先通過initTable()初始化
- 如果table[i]為null,沒有hash衝突,則CAS新建節點到table[i]
- 如果table在擴容中,則等待擴容結束再操作
- 如果存在hash衝突,則使用synchronized對Node首節點加鎖來保證線程安全,此時有2種方式:
- 1)如果是鏈表形式,則根據key是否存在來判斷,如果key存在,則覆蓋value;key不存在,則新建Node插入到鏈表尾端
- 2)如果是紅黑樹形式,就按照紅黑樹的結構插入
- 最後判斷鏈表長度是否 >8?如果大於,則將鏈表轉換為紅黑樹結構
- put成功,則通過addCount方法統計size,並檢查是否需要擴容
下麵看initTable的源碼:
sizeCtl是控制ConcurrentHashMap的控制標示符,用來控制初始化和擴容操作的,其不同的含義如下:
- 負數:代表正在進行初始化和擴容操作
- -1:正在初始化
- -N:有 N - 1 個線程正在進行擴容操作
- 正數或0:hash表還沒有被初始化,這個數值表示初始化或下一次擴容的大小
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // sizeCtl < 0表示其他線程正在初始化,當前線程掛起 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // CAS 將 sizeCtl置為 -1,表示正在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 初始化 table = tab = nt; sc = n - (n >>> 2); // 下次擴容的閥值,等於 0.75*n } } finally { sizeCtl = sc; } break; } } return tab; }
3.4. get
get的源碼如下:
/** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. * * <p>More formally, if this map contains a mapping from a key * {@code k} to a value {@code v} such that {@code key.equals(k)}, * then this method returns {@code v}; otherwise it returns * {@code null}. (There can be at most one such mapping.) * * @throws NullPointerException if the specified key is null */ public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 計算兩次hash int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 讀取首節點Node元素 // 如果首節點Node.key相等,返回value if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 樹的find()來查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 在鏈表中遍歷查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
3.5. size
在JDK1.8版本中,對於size的計算,在擴容和addCount()時已經在處理了。JDK1.7是在調用時才去計算。
為了幫助統計size,ConcurrentHashMap提供了baseCount和counterCells兩個輔助變數和CounterCell輔助類:
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } //ConcurrentHashMap中元素個數,但返回的不一定是當前Map的真實元素個數。基於CAS無鎖更新 private transient volatile long baseCount; private transient volatile CounterCell[] counterCells; // 部分元素變化的個數保存在此數組中
通過累計baseCount和 counterCells數組中的數量,即可得到元素的總個數。size源碼如下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { // 遍歷,累加所有counterCells.value for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
4. 參考
http://www.cnblogs.com/study-everyday/p/6430462.html
http://blog.csdn.net/u010723709/article/details/48007881
http://blog.csdn.net/jianghuxiaojin/article/details/52006118