死磕 java併發包之LongAdder源碼分析

来源:https://www.cnblogs.com/tong-yuan/archive/2019/05/13/LongAdder.html
-Advertisement-
Play Games

java8中為什麼要新增LongAdder? LongAdder的實現方式? LongAdder與AtomicLong的對比? ...


問題

(1)java8中為什麼要新增LongAdder?

(2)LongAdder的實現方式?

(3)LongAdder與AtomicLong的對比?

簡介

LongAdder是java8中新增的原子類,在多線程環境中,它比AtomicLong性能要高出不少,特別是寫多的場景。

它是怎麼實現的呢?讓我們一起來學習吧。

原理

LongAdder的原理是,在最初無競爭時,只更新base的值,當有多線程競爭時通過分段的思想,讓不同的線程更新不同的段,最後把這些段相加就得到了完整的LongAdder存儲的值。

LongAdder

源碼分析

LongAdder繼承自Striped64抽象類,Striped64中定義了Cell內部類和各重要屬性。

主要內部類

// Striped64中的內部類,使用@sun.misc.Contended註解,說明裡面的值消除偽共用
@sun.misc.Contended static final class Cell {
    // 存儲元素的值,使用volatile修飾保證可見性
    volatile long value;
    Cell(long x) { value = x; }
    // CAS更新value的值
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe實例
    private static final sun.misc.Unsafe UNSAFE;
    // value欄位的偏移量
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell類使用@sun.misc.Contended註解,說明是要避免偽共用的。

使用Unsafe的CAS更新value的值,其中value的值使用volatile修飾,保證可見性。

關於Unsafe的介紹請查看【死磕 java魔法類之Unsafe解析】。

關於偽共用的介紹請查看【雜談 什麼是偽共用(false sharing)?】。

主要屬性

// 這三個屬性都在Striped64中
// cells數組,存儲各個段的值
transient volatile Cell[] cells;
// 最初無競爭時使用的,也算一個特殊的段
transient volatile long base;
// 標記當前是否有線程在創建或擴容cells,或者在創建Cell
// 通過CAS更新該值,相當於是一個鎖
transient volatile int cellsBusy;

最初無競爭或有其它線程在創建cells數組時使用base更新值,有過競爭時使用cells更新值。

最初無競爭是指一開始沒有線程之間的競爭,但也有可能是多線程在操作,只是這些線程沒有同時去更新base的值。

有過競爭是指只要出現過競爭不管後面有沒有競爭都使用cells更新值,規則是不同的線程hash到不同的cell上去更新,減少競爭。

add(x)方法

add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中存儲的值增加x,x可為正可為負。

public void add(long x) {
    // as是Striped64中的cells屬性
    // b是Striped64中的base屬性
    // v是當前線程hash到的Cell中存儲的值
    // m是cells的長度減1,hash時作為掩碼使用
    // a是當前線程hash到的Cell
    Cell[] as; long b, v; int m; Cell a;
    // 條件1:cells不為空,說明出現過競爭,cells已經創建
    // 條件2:cas操作base失敗,說明其它線程先一步修改了base,正在出現競爭
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // true表示當前競爭還不激烈
        // false表示競爭激烈,多個線程hash到同一個Cell,可能要擴容
        boolean uncontended = true;
        // 條件1:cells為空,說明正在出現競爭,上面是從條件2過來的
        // 條件2:應該不會出現
        // 條件3:當前線程所在的Cell為空,說明當前線程還沒有更新過Cell,應初始化一個Cell
        // 條件4:更新當前線程所在的Cell失敗,說明現在競爭很激烈,多個線程hash到了同一個Cell,應擴容
        if (as == null || (m = as.length - 1) < 0 ||
            // getProbe()方法返回的是線程中的threadLocalRandomProbe欄位
            // 它是通過隨機數生成的一個值,對於一個確定的線程這個值是固定的
            // 除非刻意修改它
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 調用Striped64中的方法處理
            longAccumulate(x, null, uncontended);
    }
}

(1)最初無競爭時只更新base;

(2)直到更新base失敗時,創建cells數組;

(3)當多個線程競爭同一個Cell比較激烈時,可能要擴容;

longAccumulate()方法

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    // 存儲線程的probe值
    int h;
    // 如果getProbe()方法返回0,說明隨機數未初始化
    if ((h = getProbe()) == 0) {
        // 強制初始化
        ThreadLocalRandom.current(); // force initialization
        // 重新獲取probe值
        h = getProbe();
        // 都未初始化,肯定還不存在競爭激烈
        wasUncontended = true;
    }
    // 是否發生碰撞
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // cells已經初始化過
        if ((as = cells) != null && (n = as.length) > 0) {
            // 當前線程所在的Cell未初始化
            if ((a = as[(n - 1) & h]) == null) {
                // 當前無其它線程在創建或擴容cells,也沒有線程在創建Cell
                if (cellsBusy == 0) {       // Try to attach new Cell
                    // 新建一個Cell,值為當前需要增加的值
                    Cell r = new Cell(x);   // Optimistically create
                    // 再次檢測cellsBusy,並嘗試更新它為1
                    // 相當於當前線程加鎖
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 是否創建成功
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 重新獲取cells,並找到當前線程hash到cells數組中的位置
                            // 這裡一定要重新獲取cells,因為as並不在鎖定範圍內
                            // 有可能已經擴容了,這裡要重新獲取
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 把上面新建的Cell放在cells的j位置處
                                rs[j] = r;
                                // 創建成功
                                created = true;
                            }
                        } finally {
                            // 相當於釋放鎖
                            cellsBusy = 0;
                        }
                        // 創建成功了就返回
                        // 值已經放在新建的Cell裡面了
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                // 標記當前未出現衝突
                collide = false;
            }
            // 當前線程所在的Cell不為空,且更新失敗了
            // 這裡簡單地設為true,相當於簡單地自旋一次
            // 通過下麵的語句修改線程的probe再重新嘗試
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 再次嘗試CAS更新當前線程所在Cell的值,如果成功了就返回
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 如果cells數組的長度達到了CPU核心數,或者cells擴容了
            // 設置collide為false並通過下麵的語句修改線程的probe再重新嘗試
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 上上個elseif都更新失敗了,且上個條件不成立,說明出現衝突了
            else if (!collide)
                collide = true;
            // 明確出現衝突了,嘗試占有鎖,並擴容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 檢查是否有其它線程已經擴容過了
                    if (cells == as) {      // Expand table unless stale
                        // 新數組為原數組的兩倍
                        Cell[] rs = new Cell[n << 1];
                        // 把舊數組元素拷貝到新數組中
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 重新賦值cells為新數組
                        cells = rs;
                    }
                } finally {
                    // 釋放鎖
                    cellsBusy = 0;
                }
                // 已解決衝突
                collide = false;
                // 使用擴容後的新數組重新嘗試
                continue;                   // Retry with expanded table
            }
            // 更新失敗或者達到了CPU核心數,重新生成probe,並重試
            h = advanceProbe(h);
        }
        // 未初始化過cells數組,嘗試占有鎖並初始化cells數組
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 是否初始化成功
            boolean init = false;
            try {                           // Initialize table
                // 檢測是否有其它線程初始化過
                if (cells == as) {
                    // 新建一個大小為2的Cell數組
                    Cell[] rs = new Cell[2];
                    // 找到當前線程hash到數組中的位置並創建其對應的Cell
                    rs[h & 1] = new Cell(x);
                    // 賦值給cells數組
                    cells = rs;
                    // 初始化成功
                    init = true;
                }
            } finally {
                // 釋放鎖
                cellsBusy = 0;
            }
            // 初始化成功直接返回
            // 因為增加的值已經同時創建到Cell中了
            if (init)
                break;
        }
        // 如果有其它線程在初始化cells數組中,就嘗試更新base
        // 如果成功了就返回
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

(1)如果cells數組未初始化,當前線程會嘗試占有cellsBusy鎖並創建cells數組;

(2)如果當前線程嘗試創建cells數組時,發現有其它線程已經在創建了,就嘗試更新base,如果成功就返回;

(3)通過線程的probe值找到當前線程應該更新cells數組中的哪個Cell;

(4)如果當前線程所在的Cell未初始化,就占有占有cellsBusy鎖併在相應的位置創建一個Cell;

(5)嘗試CAS更新當前線程所在的Cell,如果成功就返回,如果失敗說明出現衝突;

(5)當前線程更新Cell失敗後並不是立即擴容,而是嘗試更新probe值後再重試一次;

(6)如果在重試的時候還是更新失敗,就擴容;

(7)擴容時當前線程占有cellsBusy鎖,並把數組容量擴大到兩倍,再遷移原cells數組中元素到新數組中;

(8)cellsBusy在創建cells數組、創建Cell、擴容cells數組三個地方用到;

sum()方法

sum()方法是獲取LongAdder中真正存儲的值的大小,通過把base和所有段相加得到。

public long sum() {
    Cell[] as = cells; Cell a;
    // sum初始等於base
    long sum = base;
    // 如果cells不為空
    if (as != null) {
        // 遍歷所有的Cell
        for (int i = 0; i < as.length; ++i) {
            // 如果所在的Cell不為空,就把它的value累加到sum中
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    // 返回sum
    return sum;
}

可以看到sum()方法是把base和所有段的值相加得到,那麼,這裡有一個問題,如果前面已經累加到sum上的Cell的value有修改,不是就沒法計算到了麽?

答案確實如此,所以LongAdder可以說不是強一致性的,它是最終一致性的。

LongAdder VS AtomicLong

直接上代碼:

public class LongAdderVSAtomicLongTest {
    public static void main(String[] args){
        testAtomicLongVSLongAdder(1, 10000000);
        testAtomicLongVSLongAdder(10, 10000000);
        testAtomicLongVSLongAdder(20, 10000000);
        testAtomicLongVSLongAdder(40, 10000000);
        testAtomicLongVSLongAdder(80, 10000000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times){
        try {
            System.out.println("threadCount:" + threadCount + ", times:" + times);
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms");

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        AtomicLong atomicLong = new AtomicLong();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    atomicLong.incrementAndGet();
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        LongAdder longAdder = new LongAdder();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    longAdder.add(1);
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }
}

運行結果如下:

threadCount:1, times:10000000
LongAdder elapse:158ms
AtomicLong elapse:64ms
threadCount:10, times:10000000
LongAdder elapse:206ms
AtomicLong elapse:2449ms
threadCount:20, times:10000000
LongAdder elapse:429ms
AtomicLong elapse:5142ms
threadCount:40, times:10000000
LongAdder elapse:840ms
AtomicLong elapse:10506ms
threadCount:80, times:10000000
LongAdder elapse:1369ms
AtomicLong elapse:20482ms

可以看到當只有一個線程的時候,AtomicLong反而性能更高,隨著線程越來越多,AtomicLong的性能急劇下降,而LongAdder的性能影響很小。

總結

(1)LongAdder通過base和cells數組來存儲值;

(2)不同的線程會hash到不同的cell上去更新,減少了競爭;

(3)LongAdder的性能非常高,最終會達到一種無競爭的狀態;

彩蛋

在longAccumulate()方法中有個條件是n >= NCPU就不會走到擴容邏輯了,而n是2的倍數,那是不是代表cells數組最大隻能達到大於等於NCPU的最小2次方?

答案是明確的。因為同一個CPU核心同時只會運行一個線程,而更新失敗了說明有兩個不同的核心更新了同一個Cell,這時會重新設置更新失敗的那個線程的probe值,這樣下一次它所在的Cell很大概率會發生改變,如果運行的時間足夠長,最終會出現同一個核心的所有線程都會hash到同一個Cell(大概率,但不一定全在一個Cell上)上去更新,所以,這裡cells數組中長度並不需要太長,達到CPU核心數足夠了。

比如,筆者的電腦是8核的,所以這裡cells的數組最大隻會到8,達到8就不會擴容了。

LongAdder


歡迎關註我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 將10086註冊到10087上: 再在10086服務的基礎上複製一個Eureka的服務,埠為10087,將其註冊到10086上: application-name的名稱保持一致,只是一個服務的兩個實例。 兩個都啟動: 10087: 10086: 如果有超過3台以上的集群,url的地址就是如下這種寫 ...
  • 什麼是Zookeeper臨時順序節點? 例如 : / 動物 植物 貓 倉鼠 荷花 松樹 Zookeeper的數據存儲結構就像一棵樹,這棵樹由節點組成,這種節點叫做Zonde.# Znode分為四種類型 : 1.持久節點(PERSISTENT) 預設的節點類型.創建節點的客戶端與zookeeper斷開 ...
  • Spring Cloud常用組件: 架構圖: 版本對應關係: ...
  • 環境: 前端 vue ip地址:192.168.1.205 後端 springboot2.0 ip地址:192.168.1.217 主要開發後端。 問題: 首先登陸成功時將用戶存在session中,後續請求在將用戶從session中取出檢查。後續請求取出的用戶都為null。 解決過程: 首先發現se ...
  • 有-W選項。 python -W ignore foo.py 有-W選項。 python -W ignore foo.py 有-W選項。 python -W ignore foo.py 所屬網站分類: python基礎 > 綜合&其它 作者:jiem 鏈接:http://www.pythonheid ...
  • 直接在堆外分配一個記憶體(即,native memory)來存儲數據,程式通過JNI直接將數據讀/寫到堆外記憶體中。因為數據直接寫入到了堆外記憶體中,所以這種方式就不會再在JVM管控的堆內再分配記憶體來存儲數據了,也就不存在堆內記憶體和堆外記憶體數據拷貝的操作了。這樣在進行I/O操作時,只需要將這個堆外記憶體地址... ...
  • # 事情是這樣的,我寫的一個程式幀率上不去。 然後發現了一個疑似有問題的地方,如下 這個函數每幀大概會運行幾千次,字典around_dict似乎會被“反覆生成”。如果是的話,那是十分低效的。不如提出來設成全局變數,一次創建反覆使用。 所以寫了下麵的程式驗證是否有這樣的問題 import time g ...
  • 首先我們要瞭解Python函數的基本定義: 函數是什麼? 函數是可以實現一些特定功能的小方法或是小程式。在Python中有很多內建函數,當然隨著學習的深入,你也可以學會創建對自己有用的函數。簡單的理解下函數的概念,就是你編寫了一些語句,為了方便使用這些語句,把這些語句組合在一起,給它起一個名字。使用 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...