【JDK源碼分析】同步工具Exchanger,它的內部實現原理你看懂了嗎?

来源:https://www.cnblogs.com/d-homme/archive/2018/07/29/9387948.html
-Advertisement-
Play Games

Exchanger應該算併發包中工具使用相對少的,因為它主要用於線程之間交換數據,它的用法比較簡單在不同線程之間使用exchange方法交換數據,但是內部實現比較巧妙,使用了unsafe的CAS原子操作、自旋來解決衝突問題,下麵我們通過源碼一探究竟。 ...


前言

Exchanger應該算併發包中工具使用相對少的,因為它主要用於線程之間交換數據,它的用法比較簡單在不同線程之間使用exchange方法交換數據,但是內部實現比較巧妙,使用了unsafe的CAS原子操作、自旋來解決衝突問題,下麵我們通過源碼一探究竟。

源碼

先看看源碼註釋中關於核心演算法的介紹

        for (;;) {
            if (slot is empty) { 
                // slot為空時,將item 設置到Node 中                   
                place item in a Node;
                if (can CAS slot from empty to node) {
                    // 當將node通過CAS交換到slot中時,掛起線程等待被喚醒
                    wait for release;
                    // 被喚醒後返回node中匹配到的item
                    return matching item in node;
                }
            } else if (can CAS slot from node to empty) { // release
                // 將slot設置為空
                // 獲取node中的item,將需要交換的數據設置到匹配的item
                get the item in node;
                set matching item in node;
                // 喚醒等待的線程
                release waiting thread;
            }
            // else retry on CAS failure
        }

比如有2條線程A和B,A線程交換數據時,發現slot為空,則將需要交換的數據放在slot中等待其它線程進來交換數據,等線程B進來,讀取A設置的數據,然後設置線程B需要交換的數據,然後喚醒A線程,原理就是這麼簡單。當時當多個線程之間進行交換數據時就會出現問題,所以Exchanger加入了slot數組。

Exchanger 屬性及構造器

    // 用於左移Node數組下標,從而得出數據在記憶體中的偏移量來獲取數據,避免偽共用
    private static final int ASHIFT = 7;
    // note數組最大下標
    private static final int MMASK = 0xff;
    // 用於遞增bound,每次加一個SEQ
    private static final int SEQ = MMASK + 1;
    // CPU核心數
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
    // 當前數組最大的下標(多處理器情況下)
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
    // 自旋次數,CPU核心為1個時,自旋被禁用
    private static final int SPINS = 1 << 10;
    // 空對象,用於當線程exchange方法中參數為null時傳遞給其他線程的對象
    private static final Object NULL_ITEM = new Object();
    // 用於超時時傳遞的對象
    private static final Object TIMED_OUT = new Object();
    // Participant 繼承了ThreadLocal,也就是說該對象用於獲取每條線程中存放的值
    private final Participant participant;
    // 多個線程交換
    private volatile Node[] arena;
    // 用於2個線程交換
    private volatile Node slot;
    // 該值主要用於與
    private volatile int bound; 
    // 通過unsafe用於CAS操作
    private static final sun.misc.Unsafe U;
    private static final long BOUND;
    private static final long SLOT;
    private static final long MATCH;
    private static final long BLOCKER;
    private static final int ABASE;
    static {
        int s;
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> ek = Exchanger.class;
            Class<?> nk = Node.class;
            Class<?> ak = Node[].class;
            Class<?> tk = Thread.class;
            // bound屬性在Exchanger對象中的偏移地址
            BOUND = U.objectFieldOffset
                (ek.getDeclaredField("bound"));
            // slot屬性在Exchanger對象中的偏移地址   
            SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));
            // slot屬性在Node對象中的偏移地址
            MATCH = U.objectFieldOffset
                (nk.getDeclaredField("match"));
           // parkBlocker屬性在Thread對象中的偏移地址
            BLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            // 獲取Node[]數組中每個元素的大小,這裡是4
            s = U.arrayIndexScale(ak);
            // ABASE absorbs padding in front of element 0
            // 獲取Node[]數組中第一個元素的偏移地址 + 128
            ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
            // 這裡是為了保證 Node數組中的元素不會爭用一個緩存行
            throw new Error("Unsupported array scale");
    }

構造器及內部類

    public Exchanger() {
        participant = new Participant();
    }
    // 內部類,用於記錄每個線程的狀態
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }
    // 包含需要交換的數據等信息
    // Contended為 JDK8 新增的註解,用於避免偽共用,提高程式性能
    @sun.misc.Contended static final class Node {
        // arana數組中的下標
        int index;  
        // 上一次記錄的bound            
        int bound;  
        // cas操作失敗的次數            
        int collides; 
        // 用於自旋的偽隨機數         
        int hash;               // Pseudo-random for spins
        // 當前線程需要交換的數據
        Object item;            // This thread's current item
        // 匹配線程交換的數據
        volatile Object match;  // Item provided by releasing thread
        // 記錄當前掛起的線程
        volatile Thread parked; // Set to this thread when parked, else null
    }     

方法exchange

    // 交換數據,參數X為本線程提供給其它線程的數據
    public V exchange(V x) throws InterruptedException {
        Object v;
        // 當參數為null時需要將item設置為空的對象
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        // 註意到這裡的這個表達式是整個方法的核心
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

仔細看if里的條件表達式,得知: 
只有當arena為null時,才會執行slotExchange方法; 
當arena不為null或者(arena為null且slotExchange方法返回null)時,此時線程未中斷,才會執行arenaExchange方法; 
線程中斷時,就會直接拋出線程中斷異常。 
下麵我們來看slotExchange方法

  1     // timed 為true表示設置了超時時間,ns為>0的值,反之沒有設置超時時間
  2     private final Object slotExchange(Object item, boolean timed, long ns) {
  3         // 獲取當前線程node對象
  4         Node p = participant.get();
  5         Thread t = Thread.currentThread();
  6         if (t.isInterrupted()) // preserve interrupt status so caller can recheck
  7             // 線程中斷返回null
  8             return null;
  9         
 10         // 自旋
 11         for (Node q;;) {
 12             // 將slot值賦給q
 13             if ((q = slot) != null) {
 14                 // slot 不為null,即表示已有線程已經把需要交換的數據設置在slot中了
 15                 // 通過CAS將slot設置成null
 16                 if (U.compareAndSwapObject(this, SLOT, q, null)) {
 17                     // CAS操作成功後,將slot中的item賦值給對象v,以便返回。
 18                     // 這裡也是就讀取之前線程要交換的數據
 19                     Object v = q.item;
 20                     // 將當前線程需要交給的數據設置在q中的match
 21                     q.match = item;
 22                     // 獲取被掛起的線程
 23                     Thread w = q.parked;
 24                     if (w != null)
 25                         // 如果線程不為null,喚醒它
 26                         U.unpark(w);
 27                     // 返回其他線程給的V
 28                     return v;
 29                 }
 30                 // CAS 操作失敗,表示有其它線程競爭,在此線程之前將數據已取走
 31                 // create arena on contention, but continue until slot null
 32                 if (NCPU > 1 && bound == 0 &&
 33                     U.compareAndSwapInt(this, BOUND, 0, SEQ))
 34                     // CPU為多核心
 35                     // bound == 0 表示arena數組未初始化過,CAS操作bound將其增加SEQ
 36                     // 初始化arena數組
 37                     arena = new Node[(FULL + 2) << ASHIFT];
 38             }
 39             // 上面分析過,只有當arena才會執行slotExchange方法的
 40             // 所以表示剛好已有其它線程加入進來將arena初始化
 41             else if (arena != null)
 42                 // 這裡就需要去執行arenaExchange
 43                 return null; // caller must reroute to arenaExchange
 44             else {
 45                 // 這裡表示當前線程是以第一個線程進來交換數據
 46                 // 或者表示之前的數據交換已進行完畢,這裡可以看作是第一個線程
 47                 // 將需要交換的數據先存放在當前線程變數p中
 48                 p.item = item;
 49                 // 將需要交換的數據通過CAS設置到交換區slot
 50                 if (U.compareAndSwapObject(this, SLOT, null, p))
 51                     // 交換成功後跳出自旋
 52                     break;
 53                 // CAS操作失敗,表示有其它線程剛好先於當前線程將數據設置到交換區slot
 54                 // 將當前線程變數中的item設置為null,然後自旋獲取其它線程存放在交換區slot的數據
 55                 p.item = null;
 56             }
 57         }
 58         // 執行到這裡表示當前線程已將需要的交換的數據放置於交換區slot中了,
 59         // 等待其它線程交換數據然後喚醒當前線程
 60         // await release
 61         int h = p.hash;
 62         long end = timed ? System.nanoTime() + ns : 0L;
 63         // 自旋次數
 64         int spins = (NCPU > 1) ? SPINS : 1;
 65         Object v;
 66         // 自旋等待直到p.match不為null,也就是說等待其它線程將需要交換的數據放置於交換區slot
 67         while ((v = p.match) == null) {
 68             // 下麵的邏輯主要是自旋等待,直到spins遞減到0為止
 69             if (spins > 0) {
 70                 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
 71                 if (h == 0)
 72                     h = SPINS | (int)t.getId();
 73                 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
 74                     Thread.yield();
 75             }
 76             // slot 和 p本應該是相等,除非其它線程執行了第16行代碼中的CAS操作將slot置為null,
 77             // 還未來得及設置match的值,此時只需要自旋等待第21行代碼被其它線程執行,
 78             // 這樣p.match才會不為null跳出迴圈
 79             else if (slot != p)
 80                 spins = SPINS;
 81             // 此處表示未設置超時或者時間未超時
 82             else if (!t.isInterrupted() && arena == null &&
 83                      (!timed || (ns = end - System.nanoTime()) > 0L)) {
 84                 // 設置線程t被當前對象阻塞
 85                 U.putObject(t, BLOCKER, this);
 86                 // 給p掛機線程的值賦值
 87                 p.parked = t;
 88                 if (slot == p)
 89                     // 如果slot還沒有被置為null,也就表示暫未有線程過來交換數據,需要將當前線程掛起
 90                     U.park(false, ns);
 91                 // 線程被喚醒,將被掛起的線程設置為null
 92                 p.parked = null;
 93                 // 設置線程t未被任何對象阻塞
 94                 U.putObject(t, BLOCKER, null);
 95             }
 96             // 不是以上條件時(可能是arena已不為null或者超時)
 97             else if (U.compareAndSwapObject(this, SLOT, p, null)) {
 98                 // arena不為null則v為null,其它為超時則v為超市對象TIMED_OUT,並且跳出迴圈
 99                 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
100                 break;
101             }
102         }
103         // 取走match值,並將p中的match置為null
104         U.putOrderedObject(p, MATCH, null);
105         // 設置item為null
106         p.item = null;
107         p.hash = h;
108         // 返回交換值
109         return v;
110     }

再來看arenaExchange方法,此方法被執行時表示多個線程進入交換區交換數據,arena數組已被初始化,此方法中的一些處理方式和slotExchange比較類似,它是通過遍歷arena數組找到需要交換的數據

    // timed 為true表示設置了超時時間,ns為>0的值,反之沒有設置超時時間
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        Node[] a = arena;
        // 獲取當前線程中的存放的node
        Node p = participant.get();
        //index初始值0
        for (int i = p.index;;) {                      // access slot at i
            // 遍歷,如果在數組中找到數據則直接交換並喚醒線程,
            // 如未找到則將需要交換給其它線程的數據放置於數組中
            int b, m, c; long j;                       // j is raw array offset
            // 其實這裡就是向右遍曆數組,只是用到了元素在記憶體偏移的偏移量
            // q實際為arena數組偏移(i + 1) *  128個地址位上的node
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            // 如果q不為null,並且CAS操作成功,將下標j的元素置為null
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                // 表示當前線程已發現有交換的數據,然後獲取數據,喚醒等待的線程
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                    U.unpark(w);
                return v;
            }
            // q 為null 並且 i 未超過數組邊界
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // 將需要給其它線程的item賦予給p中的item
                p.item = item;                         // offer
                if (U.compareAndSwapObject(a, j, null, p)) {
                    // 交換成功
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    // 自旋直到有其它線程進入,遍歷到該元素並與其交換,同時當前線程被喚醒
                    for (int h = p.hash, spins = SPINS;;) {
                        Object v = p.match;
                        if (v != null) {
                            // 其它線程設置的需要交換的數據match不為null
                            // 將match設置null,item設置為null
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        }
                        else if (spins > 0) {
                            // 遞減自旋次數
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        else if (U.getObjectVolatile(a, j) != p)
                            // 和slotExchange方法中的類似,arena數組中的數據已被CAS設置
                            // match值還未設置,讓其再自旋會等待match被設置
                            spins = SPINS;       // releaser hasn't set match yet
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            // 設置線程t被當前對象阻塞
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            // 線程t賦值
                            p.parked = t;              // minimize window
                            if (U.getObjectVolatile(a, j) == p)
                                // 數組中對象還相等,表示線程還未被喚醒,喚醒線程
                                U.park(false, ns);
                            p.parked = null;
                            // 設置線程t未被任何對象阻塞
                            U.putObject(t, BLOCKER, null);
                        }
                        else if (U.getObjectVolatile(a, j) == p &&
                                 U.compareAndSwapObject(a, j, p, null)) {
                            // 這裡給bound增加加一個SEQ
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1;        // descend
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart
                        }
                    }
                }
                else
                    // 交換失敗,表示有其它線程更改了arena數組中下標i的元素
                    p.item = null;                     // clear offer
            }
            else {
                // 此時表示下標不在bound & MMASK或q不為null但CAS操作失敗
                // 需要更新bound變化後的值
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    // 反向遍歷
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL ||
                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    // 記錄CAS失敗的次數
                    p.collides = c + 1;
                    // 迴圈遍歷
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                }
                else
                    // 此時表示bound值增加了SEQ+1
                    i = m + 1;                         // grow
                // 設置下標
                p.index = i;
            }
        }
    }

總結

讀到這裡是不是還是感覺有很多疑問?

  1. 先看為什麼ASHIFT設置成7,這是為了儘量避免slot數組中不同的元素在同一個緩存行上,<< ASHIFT 左移7位,表示至少移動了128地址位,而我們主流的緩存行大小一般為32位元組到256位元組,所以128個地址位基本覆蓋到了常見的處理器平臺。arena數組中元素的分佈如圖,它們之間間隔128個整數倍地址位,也就是說最小相差128個地址位。 
    arena數組
  2. 為什麼Node類用@sun.misc.Contended註解呢?該註解是jdk8新增的註解,是為瞭解決之前手動填充數據的問題。填充數據也是為了避免arena數組中的不同的元素共用同一個緩存行,導致多線程修改數據時性能受到影響。

參考: 
偽共用(False Sharing)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 女孩:談Java了,好耶? 男孩:夜談一下,Java的類的定義~ 女孩:那談Java的類的什麼呢? 男孩:類的定義,對象的定義,類中的方法,構造方法,this關鍵字,方法的重載,Java中的類的訪問許可權,set和get方法,static關鍵字~ 面向對象設計思想 設計思想的變化 例如:學生行為 對於 ...
  • 1. 學習計劃 1、Activemq整合spring的應用場景 2、添加商品同步索引庫 3、商品詳情頁面動態展示 4、展示詳情頁面使用緩存 2. Activemq整合spring 2.1. 使用方法 第一步:引用相關的jar包。 第二步:配置Activemq整合spring。配置Connection ...
  • 搭建環境 1. 安裝Java SDK及添加環境變數 2. 安裝Elipse Java及CDT插件 3. 安裝tdm64 gcc及添加環境變數 "百度雲" (密碼:mjdi) 3. 安裝msys及添加環境變數 "百度雲" (密碼:j2i6) JNI使用步驟 創建java工程jni_demo 創建類 J ...
  • 從python轉golang開發已經3個月了,因為寫過c++,所以對golang接受的還算快,這段經歷也不是很痛苦。伯樂線上上看了一些大神關於python轉golang過程中的不適應和吐槽,決定寫下篇博客。接下來,我會列出golang開發過程中與python的不同點,主要是在語法方面,golang的 ...
  • 條件變數同步 有一類線程需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition 對象用於條件變數線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。 lock_con=threading ...
  • 所需環境: 1. IDEA UItimate 2. JDK 3. Maven 創建工程 一開始創建一個普通的maven項目即可, 下麵展示最終完成的工程目錄,其中創建res文件夾以及放入testNG.xml文件的步驟後面會提到: 在pom中添加依賴: 創建package 與測試類 HelloTest ...
  • 列表(list):列表是有序的,且可以修改 說明:列表裡面可以存放:數字、字元串、列表、布爾值,可以嵌套任意類型 列表存放元素的原理:通過鏈表的方式 1.創建列表:通過list(列表)類,創建了一個test_list的對象 test_list=["Tom","Mary","Jim","Disk"] ...
  • 給定一組(串)數據,根據輸入得號碼,查詢歸屬地 如有錯誤,感謝指正! ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...