Java併發包原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析

itread01 2021-01-23 23:23:29
java jdk jdk1.8 系列 原始


[toc]系列傳送門:- [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373)- [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359)- [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838)- [Java併發包原始碼學習系列:ReentrantLock可重入獨佔鎖詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112454874)- [Java併發包原始碼學習系列:ReentrantReadWriteLock讀寫鎖解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112689635)- [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669)- [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098)注:本篇基於JDK1.8。## 為什麼要使用ConcurrentHashMap?在思考這個問題之前,我們可以思考:如果不用ConcurrentHashMap的話,有哪些其他的容器供我們選擇呢?並且它們的缺陷是什麼?雜湊表利用雜湊演算法能夠花費O(1)的時間複雜度高效地根據key找到value值,能夠滿足這個需求的容器還有HashTable和HashMap。**HashTable**HashTable使用synchronized關鍵字保證了多執行緒環境下的安全性,但加鎖的實現方式是獨佔式的,所有訪問HashTable的執行緒都必須競爭同一把鎖,效能較為低下。```java public synchronized V put(K key, V value) { // ... }```**HashMap**JDK1.8版本的HashMap在讀取hash槽的時候讀取的是工作記憶體中引用指向的物件,在多執行緒環境下,其他執行緒修改的值不能被及時讀到。- 關於HashMap的原始碼解析:[【JDK1.8】Java集合原始碼學習系列:HashMap原始碼詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675)> 這就引發出可能存在的一些問題:>> 比如在插入操作的時候,**第一次將會根據key的hash值判斷當前的槽內是否被佔用,如果沒有的話就會插入value**。在併發環境下,如果A執行緒判斷槽未被佔用,在執行寫入操作的時候正巧時間片耗盡,此時B執行緒正巧也執行了同樣的操作,率先插入了B的value,此時A正巧被CPU重新排程繼續執行寫入操作,進而將執行緒B的value覆蓋。>> 還有一種情況是在同一個hash槽內,HashMap總是保持key唯一,在插入的時候,如果存在key,就會進行value覆蓋。併發情況下,如果A執行緒判斷最後一個節點仍未發現重複的key,那麼將會執行插入操作,如果B執行緒在A判斷和插入之間執行了同樣的操作,也會發生資料的覆蓋,也就是資料的丟失。>> 當然,像這樣的併發問題其實還有一些,這裡就不細說了,剛興趣的小夥伴可以查閱下資料。## ConcurrentHashMap的結構特點### Java8之前在Java8之前,底層採用Segment+HashEntry的方式實現。採用分段鎖的概念,底層使用Segment陣列,Segment通過繼承ReentrantLock來進行加鎖,每次需要加鎖的操作會鎖住一個segment,分段保證每個段是執行緒安全的。> 圖源:[Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析](https://javadoop.com/post/hashmap)![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210123181713661-756701711.png)### Java8之後JDK1.8之後採用`CAS + Synchronized`的方式來保證併發安全。採用【Node陣列】加【連結串列】加【紅黑樹】的結構,與HashMap類似。> 圖源:[Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析](https://javadoop.com/post/hashmap)![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210123181722181-597581021.png)## 基本常量過一遍即可,不用過於糾結,有些欄位也許是為了相容Java8之前的版本。```java /* ---------------- Constants -------------- */ // 允許的最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; // 預設初始值16,必須是2的冪 private static final int DEFAULT_CAPACITY = 16; // toArray相關方法可能需要的量 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //為了和Java8之前的分段相關內容相容,並未使用 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 負載因子 private static final float LOAD_FACTOR = 0.75f; // 連結串列轉紅黑樹閥值> 8 連結串列轉換為紅黑樹 static final int TREEIFY_THRESHOLD = 8; // 樹轉連結串列閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo)) static final int UNTREEIFY_THRESHOLD = 6; // 連結串列樹化的最小容量 treeifyBin的時候,容量如果不足64,會優先選擇擴容到64 static final int MIN_TREEIFY_CAPACITY = 64; // 每一步最小重繫結數量 private static final int MIN_TRANSFER_STRIDE = 16; // sizeCtl中用於生成標記的位數 private static int RESIZE_STAMP_BITS = 16; // 2^15-1,help resize的最大執行緒數 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 32-16=16,sizeCtl中記錄size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // forwarding nodes的hash值 static final int MOVED = -1; // 樹根節點的hash值 static final int TREEBIN = -2; // ReservationNode的hash值 static final int RESERVED = -3; // 提供給普通node節點hash用 static final int HASH_BITS = 0x7fffffff; // 可用處理器數量 static final int NCPU = Runtime.getRuntime().availableProcessors();```## 重要成員變數```java /* ---------------- Fields -------------- */ // 就是我們說的底層的Node陣列,懶初始化的,在第一次插入的時候才初始化,大小需要是2的冪 transient volatile Node [] table; /** * 擴容resize的時候用的table */ private transient volatile Node [] nextTable; /** * 基礎計數器,是通過CAS來更新的 */ private transient volatile long baseCount; /** * Table initialization and resizing control. * 如果為負數:表示正在初始化或者擴容,具體如下、 * -1表示初始化, * -N,N-1表示正在進行擴容的執行緒數 * 預設為0,初始化之後,儲存下一次擴容的大小 */ private transient volatile int sizeCtl; /** * 擴容時分割table的索引 */ private transient volatile int transferIndex; /** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */ private transient volatile int cellsBusy; /** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells; // 檢視 private transient KeySetView keySet; private transient ValuesView values; private transient EntrySetView entrySet;```## 構造方法和HashMap一樣,table陣列的初始化是在第一次插入的時候才進行的。```java /** * 建立一個新的,空的map,預設大小為16 */ public ConcurrentHashMap() { } /** * 指定初始容量 */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : // hashmap中講過哦,用來返回的是大於等於傳入值的最小2的冪次方 // https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675#tableSizeFor_105 tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // sizeCtl 初始化為容量 this.sizeCtl = cap; } /** * 接收一個map物件 */ public ConcurrentHashMap(Map m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } /** * 指定初始容量和負載因子 * @since 1.6 */ public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } /** * 最全的:容量、負載因子、併發級別 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }```這裡的tableSizeFor方法在HashMap中有解析過:[https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675#tableSizeFor_105](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675#tableSizeFor_105)### tableSizeFor我們通過註解可以知道,這個方法返回的是大於等於傳入值的最小2的冪次方(傳入1時,為1)。它到底是怎麼實現的呢,我們來看看具體的原始碼:```java static final int tableSizeFor(int cap) { int n = cap - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }123456789```說實話,我再看到這個方法具體實現之後,感嘆了一句,數學好牛!我通過代入具體數字,翻閱了許多關於這部分的文章與視訊,通過簡單的例子,來做一下總結。- 我們先試想一下,我們想得到比n大的最小2次冪只需要**在最高位的前一位置1,後面全置0**就ok了吧。如0101代表的是5,1000就符合我們的需求為8。- 我們再傳入更大的數,為了寫著方便,這裡就以8位為例:![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210123181737808-614670333.png)- 第一步`int n = cap -1`這一步其實是為了防止cap本身為2的冪次的情況,如果沒有這一步的話,在一頓操作之後,會出現翻倍的情況。比如傳入為8,算出來會是16,所以事先減去1,保證結果。- 最後n<0的情況的判定,排除了傳入容量為0的情況。- n>=MAXIMUM_CAPACITY的情況的判定,排除了移位和或運算之後全部為1的情況。## put方法存值### putVal```java public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 對key進行雜湊計算 : (h ^ (h >>> 16)) & HASH_BITS; int hash = spread(key.hashCode()); // 記錄連結串列長度 int binCount = 0; for (Node [] tab = table;;) { Node f; int n, i, fh; //第一次put,就是這裡進行初始化的 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 找該 hash 值對應的陣列下標,得到第一個節點 f, // 這裡判斷f是否為空,就是這個位置上有沒有節點佔著 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果沒有,用CAS嘗試將值放入,插入成功,則退出for迴圈 // 如果CAS失敗,則表示存在併發競爭,再次進入迴圈 if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } // hash是Node節點f的一個屬性,等於MOVED的情況表示該節點處於遷移狀態 else if ((fh = f.hash) == MOVED) // 幫助遷移【內部其實呼叫了transfer,後面分析】 tab = helpTransfer(tab, f); else { // 進入這個分支表示:根據key計算出的hash,得到的位置上是存在Node的,接著我將遍歷連結串列了 V oldVal = null; // 鎖住Node節點 synchronized (f) { // 加鎖後的二次校驗,針對tab可能被其他執行緒修改的情況 if (tabAt(tab, i) == f) { if (fh >= 0) { // 頭節點的hash屬性 >= 0 binCount = 1; // 記錄連結串列長度 // 連結串列的遍歷操作,你懂的 for (Node e = f;; ++binCount) { K ek; // 找到一樣的key了 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // onlyIfAbsent為false的話,這裡就要覆蓋了,預設是覆蓋的 if (!onlyIfAbsent) e.val = value; // 接著也就結束遍歷了 break; } Node pred = e; // 遍歷到最後了,把Node插入尾部 if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 紅黑樹 Node p; binCount = 2; // 紅黑樹putTreeVal if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 判斷是否需要將連結串列轉換為紅黑樹 節點數 >= 8的時候 if (binCount >= TREEIFY_THRESHOLD) //樹化 後面單獨分析 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }```其實你會發現,如果你看過HashMap的原始碼,理解ConcurrentHashMap的操作其實還是比較清晰的,相信看下來你已經基本瞭解了。接下來將會具體分析一下幾個關鍵的方法### initTable採用延遲初始化,第一次put的時候,呼叫initTable()初始化Node陣列。```java /** * Initializes table, using the size recorded in sizeCtl. */ private final Node [] initTable() { Node [] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 如果小於0,表示已經有其他執行緒搶著初始化了 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // 這裡就試著cas搶一下,搶到就將sc設定為-1,宣告主權,搶不到就再次進入迴圈 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { // 設定初始容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 建立容量為n的陣列 @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node [n]; // 賦值給volatile變數table底層陣列 table = tab = nt; // 這裡其實就是 sc = n - n/4 = 0.75 * n sc = n - (n >>> 2); } } finally { // 就當12吧 sizeCtl = sc; } break; } } return tab; }```初始化的併發問題如何解決呢?通過volatile的sizeCtl變數進行標識,在第一次初始化的時候,如果有多個執行緒同時進行初始化操作,他們將會判斷sizeCtl是否小於0,小於0表示已經有其他執行緒在進行初始化了。因為獲取到初始化權的執行緒已經通過cas操作將sizeCtl的值改為-1了,且volatile的特性保證了變數在各個執行緒之間的可見性。接著,將會建立合適容量的陣列,並將sizeCtl的值設定為`cap*loadFactor`。### treeifyBin這部分包含連結串列轉紅黑樹的邏輯,當然,需要滿足一些前提條件:1. 首先當然是需要連結串列的節點數量`>=TREEIFY_THRESHOLD`的時候啦,預設是8。```java if (binCount != 0) { // 判斷是否需要將連結串列轉換為紅黑樹 節點數 >= 8的時候 if (binCount >= TREEIFY_THRESHOLD) // 樹化 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }```2. 其實還有一個條件,就是:如果陣列長度`< MIN_TREEIFY_CAPACITY`的時候,會優先呼叫`tryPresize`進行陣列擴容。```java private final void treeifyBin(Node [] tab, int index) { Node b; int n, sc; if (tab != null) { // 如果陣列長度小於MIN_TREEIFY_CAPACITY 會優先擴容tryPresize if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 鎖住頭節點 synchronized (b) { if (tabAt(tab, index) == b) { // 遍歷連結串列, 建立紅黑樹 TreeNode hd = null, tl = null; for (Node e = b; e != null; e = e.next) { TreeNode p = new TreeNode (e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 將紅黑樹設定對應的位置上 setTabAt(tab, index, new TreeBin (hd)); } } } } }```### tryPresize陣列擴容操作一般都是核心,仔細看看。```java private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);// tableSizeFor(1.5*size) int sc; while ((sc = sizeCtl) >= 0) { Node [] tab = table; int n; // 這裡就是之前說的initTable部分的程式碼 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node [n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { //1 0000 | 0000 0000 0000 0000 1000 0000 0000 0000 int rs = resizeStamp(n); // sc小於0表示已經有執行緒正在進行擴容操作 if (sc < 0) { Node [] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // cas將sizeCtl加1, 如果成功, 則執行transfer操作 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 沒有執行緒在擴容,將sizeCtl的值改為(rs << RESIZE_STAMP_SHIFT) + 2) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } } static final int resizeStamp(int n) { // Integer.numberOfLeadingZeros(n) 其實是返回n的前導零個數, 每次擴容翻倍,個數會少1 // 如果n = 16 , 返回27 // 1 << (RESIZE_STAMP_BITS - 1) 1 左移15位標識 第16位為1,低15位全為0 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }````resizeStamp(int n)`方法可以參照[https://blog.csdn.net/tp7309/article/details/76532366](https://blog.csdn.net/tp7309/article/details/76532366)的解析。### transfer這個方法涉及到資料遷移的操作,支援併發執行,第一個發起資料遷移的執行緒,nextTab引數傳null,之後再呼叫此方法時,nextTab不會為null。併發執行實現:使用stride將一次遷移任務拆分成一個個的小任務,第一個發起資料遷移的執行緒將會將transferIndex指向原陣列最後的位置,然後從後向前的stride分任務屬於第一個執行緒,再將transferIndex指向新的位置,再往前的stride個任務屬於第二個執行緒,依次類推。```java private final void transfer(Node [] tab, Node [] nextTab) { int n = tab.length, stride; // 多核情況下, stride為 (n >>> 3) / NCPU , 單核情況下,就是陣列的容量 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 最小是16 stride = MIN_TRANSFER_STRIDE; // subdivide range // 第一個發起資料遷移的執行緒,nextTab引數傳null if (nextTab == null) { // initiating try { // n<<1 表示容量翻倍 @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node [n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 為nextTable 和transferIndex賦值,transferIndex從最後一個開始 nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 構造一個hash == MOVED 的節點,標記已經遷移完畢的位置 ForwardingNode fwd = new ForwardingNode (nextTab); // 這裡的advance表示已經做完一個位置的遷移,可以準備下一個位置了 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; // advance為true表示可以準備下一個位置了 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; // nextIndex會等於transferIndex // transferIndex 一旦小於等於 0,說明原陣列的所有位置都有相應的執行緒去處理了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // bound 指向了 transferIndex-stride bound = nextBound; // i指向 transferIndex - 1 // 從後向前執行遷移任務 i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; // 所有的遷移操作都完成了 if (finishing) { nextTable = null; table = nextTab; // 將nextTab賦值給table sizeCtl = (n << 1) - (n >>> 1); // 重新計算sizeCtl return; } // sizeCtl在前設定為 (rs << RESIZE_STAMP_SHIFT) + 2 // 之後每有一個執行緒參與遷移就會 將sizeCtl加1 // 這裡可以看成逆操作, 每次-1,代表完成了自己的任務 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 走到這裡表示(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT // 所有的任務完成, 下一迴圈就進入上面的finish分支了 finishing = advance = true; i = n; // recheck before commit } } // 下面是具體的遷移操作 // 如果i位置是null,那就將剛剛初始化hash=MOVED的節點cas放入 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果已經是hash==MOVED ,代表這個位置已經遷移過了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 對這個位置單獨加鎖,處理該位置的遷移工作 synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; // 連結串列節點 if (fh >= 0) { // 將連結串列一分為二 int runBit = fh & n; Node lastRun = f; // 下面幾步都在尋找lastRun的位置,表示lastRun之後的節點需要放到一起 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node (ph, pk, pv, ln); else hn = new Node (ph, pk, pv, hn); } // 其中一個連結串列放在新陣列的i位置上 setTabAt(nextTab, i, ln); // 另一個連結串列放在新陣列的i + n 位置上 setTabAt(nextTab, i + n, hn); // 將原陣列的i位置上設定為fwd表示已經處理完畢 // 這裡的fwd是我們之前建立的ForwardingNode, // 下一進行判斷的時候,就會將advance設定為true了 setTabAt(tab, i, fwd); // 宣告該位置已經遷移完畢了 advance = true; } // 下面是紅黑樹的遷移 else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 節點數小於6 將紅黑樹轉化為連結串列untreeify ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }```## get方法取值### getget方法相對來說就簡單很多了,根據key計算出的hash值,找到對應的位置,判斷頭節點是不是要的值,不是的話就從紅黑樹或者連結串列裡找。```java public V get(Object key) { Node [] tab; Node e, p; int n, eh; K ek; // 計算hash值 int h = spread(key.hashCode()); // 找到對應的hash桶的位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 正好在頭節點上 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 樹形結構上 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 在連結串列上 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }```可以看到get方法是無鎖的,通過volatile修飾的next來每次都獲取最新的值。> 再提一下吧:>> volatile能夠保證變數線上程之間的可見性,能夠被多執行緒同時讀且保證不會讀到過期的值,因為為根據Java記憶體模型的happen before原則,對volatile欄位的寫入操作先於讀操作,即使兩個執行緒同時修改和獲取volatile變數,get操作也能拿到最新的值。>> 但只能被單執行緒寫,【有一種情況可以被多執行緒寫,就是寫入的值不依賴於原值】,讀操作是不需要加鎖的。## 總結- ConcurrentHashMap在JDK1.7和JDK1.8的實現思路上發生比較大的變化: - JDK1.7採用`Segment+HashEntry`的方式和`分段鎖`的概念實現。 - JDK1.8放棄了鎖分段的概念,使用`Node + CAS + Synchronized` 的方式實現併發,使用`陣列+連結串列+紅黑樹`的結構。- put操作會進行如下判斷: - 如果沒有初始化,先進行初始化[懶初始化],預設容量為16,同時設定了SizeCtl。 - 如果tab陣列對應的hash槽位置上沒有節點,CAS操作給該位置賦值,成功則跳出迴圈。 - 如果當前插槽節點正處於遷移狀態即`f.hash == MOVED`,則先幫助節點完成遷移操作。 - 發生hash衝突,率先使用`synchronized`鎖住首節點,接下來判斷是連結串列節點或是紅黑樹節點,找到合適的位置,插入或覆蓋值。 - **如果節點數量超過樹化的閾值8,且陣列容量也達到樹化的閾值64,進行樹化**。- 當某個桶的位置的節點數量超過8,但是陣列容量沒有達到64時,會先進行**擴容操作n*2,執行tryPresize**。- 擴容操作涉及到transfer**資料遷移**,支援併發,通過將資料遷移分為stride個小任務,通過transferIndex和nextBound兩個指標來分配任務。- get操作不需要加鎖,根據key計算出的hash值,找到對應的位置,判斷頭節點是不是要的值,不是的話就從紅黑樹或者連結串列裡找。## 參考閱讀- [佔小狼: 談談ConcurrentHashMap1.7和1.8的不同實現](https://www.jianshu.com/p/e694f1e868ec)- [jianshu_xw:淺析JDK1.8 下HashMap執行緒安全性](https://www.jianshu.com/p/06e7e626dcad)- [javadoop:Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析](https://javadoop.com/post/hashmap)- [Java3y ConcurrentHashMap基於JDK1.8原始碼剖析](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247484161&idx=1&sn=6f52fb1f714f3ffd2f96a5ee4ebab146&chksm=ebd74200dca0cb16288db11f566cb53cafc580e08fe1c570e0200058e78676f527c014ffef41&scene=21###wechat_r
版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://www.itread01.com/content/1611413465.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云