如此狂妄,自稱高性能隊列的Disruptor有啥來頭?

来源:https://www.cnblogs.com/jiagooushi/archive/2022/09/19/16707593.html
-Advertisement-
Play Games

併發框架Disruptor 1. Disruptor概述 1.1 背景 ​ Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決記憶體隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級),基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010 ...


併發框架Disruptor

file

1. Disruptor概述

1.1 背景

​ Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決記憶體隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級),基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關註,2011年,企業應用軟體專家Martin Fowler專門撰寫長文介紹。同年它還獲得了Oracle官方的Duke大獎。

​ 目前,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了Disruptor以獲取高性能。

​ 需要特別指出的是,這裡所說的隊列是系統內部的記憶體隊列,而不是Kafka這樣的分散式隊列。

有界無鎖 高併發隊列

1.2 什麼是Disruptor

​ Disruptor是用於一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是Disruptor從功能、性能都遠好於ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用Disruptor作為ArrayBlockingQueue的替代者。

​ 官方也對Disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,目測性能只有有5~10倍左右的提升。

1.3 為什麼使用Disruptor

​ 傳統阻塞的隊列使用鎖保證線程安全,而鎖通過操作系統內核上下文切換實現,會暫停線程去等待鎖,直到鎖釋放。

​ 執行這樣的上下文切換,會丟失之前保存的數據和指令。由於消費者和生產者之間的速度差異,隊列總是接近滿或者空的狀態,這種狀態會導致高水平的寫入爭用。

1.3.1 傳統隊列問題

首先這裡說的隊列也僅限於Java內部的消息隊列

隊列 有界性 結構 隊列類型
ArrayBlockingQueue 有界 加鎖 數組 阻塞
LinkedBlockingQueue 可選 加鎖 鏈表 阻塞
ConcurrentLinkedQueue 無界 無鎖 鏈表 非阻塞
LinkedTransferQueue 無界 無鎖 鏈表 阻塞
PriorityBlockingQueue 無界 加鎖 阻塞
DelayQueue 無界 加鎖 阻塞
1.3.2 Disruptor應用場景

參考使用到disruptor的一些框架.

1.3.2.1 log4j2

​ Log4j2非同步日誌使用到了disruptor, 日誌一般是有緩衝區, 滿了才寫到文件, 增量追加文件結合NIO等應該也比較快, 所以無論是EventHandler還是WorkHandler處理應該延遲比較小的, 寫的文件也不多, 所以場景是比較合適的。

1.3.2.2 Jstorm

​ 在流處理中不同線程中數據交換,數據計算可能蠻多記憶體中計算, 流計算快進快出,disruptor應該不錯的選擇。

1.3.2.3 百度uid-generator

​ 部分使用Ring buffer和去偽共用等思路緩存已生成的uid, 應該也部分參考了disruptor吧。

1.4 Disruptor 的核心概念

先從瞭解 Disruptor 的核心概念開始,來瞭解它是如何運作的。下麵介紹的概念模型,既是領域對象,也是映射到代碼實現上的核心對象。

1.4.1 Ring Buffer

Disruptor中的數據結構,用於存儲生產者生產的數據

​ 如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定義實現來完全替代。

1.4.2 Sequence

序號,在Disruptor框架中,任何地方都有序號

​ 生產者生產的數據放在RingBuffer中的哪個位置,消費者應該消費哪個位置的數據,RingBuffer中的某個位置的數據是什麼,這些都是由這個序號來決定的。這個序號可以簡單的理解為一個AtomicLong類型的變數。其使用了padding的方法去消除緩存的偽共用問題。

1.4.3 Sequencer

序號生成器,這個類主要是用來協調生產者的

​ 在生產者生產數據的時候,Sequencer會產生一個可用的序號(Sequence),然後生產者就就知道數據放在環形隊列的那個位置了。

​ Sequencer是Disruptor的真正核心,此介面有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發演算法。

1.4.4 Sequence Barrier

序號屏障

​ 我們都知道,消費者在消費數據的時候,需要知道消費哪個位置的數據。消費者總不能自己想取哪個數據消費,就取哪個數據消費吧。這個SequencerBarrier起到的就是這樣一個“柵欄”般的阻隔作用。你消費者想消費數據,得,我告訴你一個序號(Sequence),你去消費那個位置上的數據。要是沒有數據,就好好等著吧

1.4.5 Wait Strategy

Wait Strategy決定了一個消費者怎麼等待生產者將事件(Event)放入Disruptor中。

​ 設想一種這樣的情景:生產者生產的非常慢,而消費者消費的非常快。那麼必然會出現數據不夠的情況,這個時候消費者怎麼進行等待呢?WaitStrategy就是為瞭解決問題而誕生的。

1.4.6 Event

​ 從生產者到消費者傳遞的數據叫做Event。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。

1.4.7 EventHandler

​ Disruptor 定義的事件處理介面,由用戶實現,用於處理事件,是 Consumer 的真正實現。

1.4.8 Producer

​ 即生產者,只是泛指調用 Disruptor 發佈事件的用戶代碼,Disruptor 沒有定義特定介面或類型。

1.5 Disruptor特性

​ Disruptor其實就像一個隊列一樣,用於在不同的線程之間遷移數據,但是Disruptor也實現了一些其他隊列沒有的特性,如:

  • 同一個“事件”可以有多個消費者,消費者之間既可以並行處理,也可以相互依賴形成處理的先後次序(形成一個依賴圖);
  • 預分配用於存儲事件內容的記憶體空間;
  • 針對極高的性能目標而實現的極度優化和無鎖的設計;

2. Disruptor入門

我們使用一個簡單的例子來體驗一下Disruptor,生產者會傳遞一個long類型的值到消費者,消費者接受到這個值後會列印出這個值。

2.1 添加依賴

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

2.2 Disruptor API

Disruptor 的 API 十分簡單,主要有以下幾個步驟

2.2.1 定義事件

首先創建一個 LongEvent 類,這個類將會被放入環形隊列中作為消息內容。

事件(Event)就是通過 Disruptor 進行交換的數據類型。

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}
2.2.2 定義事件工廠

為了使用Disruptor的記憶體預分配event,我們需要定義一個EventFactory

​ 事件工廠(Event Factory)定義瞭如何實例化前面第1步中定義的事件(Event),需要實現介面 com.lmax.disruptor.EventFactory<T>。

Disruptor 通過 EventFactory 在 RingBuffer 中預創建 Event 的實例。

​ 一個 Event 實例實際上被用作一個“數據槽”,發佈者發佈前,先從 RingBuffer 獲得一個 Event 的實例,然後往 Event 實例中填充數據,之後再發佈到 RingBuffer 中,之後由 Consumer 獲得該 Event 實例並從中讀取數據。

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}
2.2.3 定義事件處理的具體實現

為了讓消費者處理這些事件,所以我們這裡定義一個事件處理器,負責列印event

通過實現介面 com.lmax.disruptor.EventHandler<T> 定義事件處理的具體實現。

public class LongEventHandler implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        //CommonUtils.accumulation();
        System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence);
    }
}
2.2.4 指定等待策略

Disruptor 定義了 com.lmax.disruptor.WaitStrategy 介面用於抽象 Consumer 如何等待新事件,這是策略模式的應用

WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
2.2.5 啟動 Disruptor

註意ringBufferSize的大小必須是2的N次方

// 指定事件工廠
LongEventFactory factory = new LongEventFactory();

// 指定 ring buffer位元組大小, 必須是2的N次方
int bufferSize = 1024;

//單線程模式,獲取額外的性能
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                                                          bufferSize, Executors.defaultThreadFactory(),
                                                          ProducerType.SINGLE,
                                                          new YieldingWaitStrategy());
//設置事件業務處理器---消費者
disruptor.handleEventsWith(new LongEventHandler());

//啟動disruptor線程
disruptor.start();
2.2.6 使用Translators發佈事件

在Disruptor的3.0版本中,由於加入了豐富的Lambda風格的API,可以用來幫組開發人員簡化流程。所以在3.0版本後首選使用Event Publisher/Event Translator來發佈事件。

public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.set(data);
                }
            };

    public void onData(Long data) {
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
}
2.2.7 關閉 Disruptor
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理

2.3 代碼整合

2.3.1 LongEventMain

消費者-生產者啟動類,其依靠構造Disruptor對象,調用start()方法完成啟動線程。Disruptor 需要ringbuffer環,消費者數據處理工廠,WaitStrategy等

  • ByteBuffer 類位元組buffer,用於包裝消息。

  • ProducerType.SINGLE為單線程 ,可以提高性能

public class LongEventMain {
    public static void main(String[] args) {
        // 指定事件工廠
        LongEventFactory factory = new LongEventFactory();

        // 指定 ring buffer位元組大小, 必須是2的N次方
        int bufferSize = 1024;

        //單線程模式,獲取額外的性能
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy());

        //設置事件業務處理器---消費者
        disruptor.handleEventsWith(new LongEventHandler());

        //啟動disruptor線程
        disruptor.start();
        // 獲取 ring buffer環,用於接取生產者生產的事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //為 ring buffer指定事件生產者
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        //迴圈遍歷
        for (int i = 0; i < 100; i++) {
            //獲取一個隨機數
            long value = (long) ((Math.random() * 1000000) + 1);
            //發佈數據
            producer.onData(value);
        }
        //停止disruptor線程
        disruptor.shutdown();
    }
}
2.3.2 運行測試

測試結果

consumer:pool-1-thread-1 Event: value=579797,sequence=0
consumer:pool-1-thread-1 Event: value=974942,sequence=1
consumer:pool-1-thread-1 Event: value=978977,sequence=2
consumer:pool-1-thread-1 Event: value=398080,sequence=3
consumer:pool-1-thread-1 Event: value=867251,sequence=4
consumer:pool-1-thread-1 Event: value=796707,sequence=5
consumer:pool-1-thread-1 Event: value=786555,sequence=6
consumer:pool-1-thread-1 Event: value=182193,sequence=7
.....

Event: value = 為消費者接收到的數據,sequence為數據在ringbuffer環的位置。
本文由傳智教育博學谷教研團隊發佈。

如果本文對您有幫助,歡迎關註點贊;如果您有任何建議也可留言評論私信,您的支持是我堅持創作的動力。

轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 今天帶大家一起來看看網上流傳關於程式員的經典硬核段子,快來看看你是否能get到笑點。 白嫖福利,傳送門 段子1 昨天晚上下班回家,一民警迎面巡邏而來。突然對我大喊:站住! 民警:int 類型占幾個位元組? 我:4 個。 民警:你可以走了。 我感到很詫異。 我:為什麼問這樣的問題? 民警:深夜還在街上走 ...
  • 前言 準備工作 步驟 1 配置fiddler和WX環境 fiddler配置 其他的照我截的圖片配置就好 這樣 fiddler 就配置好,是不是很簡單 WX配置 配置代理 註:埠號得和fiddler配置的一致,也就是這個位置 至於ip地址,使用這個即可 黑框調出方式:win+R,輸入cmd然後回車, ...
  • 我們先瞭解下Servlet的生命周期 Servlet部署在容器里,其生命周期由容器管理。 概括為以下幾個階段: 1)容器載入Servlet類。 當第一次有Web客戶請求Servlet服務或當Web服務啟動時。 2)創建Servlet對象實例。 容器環境根據客戶請求,創建一個或多個Servlet對象實 ...
  • 1 耳返功能簡介 ZEGO Express SDK 提供了Flutter耳返和雙聲道的功能,在視頻直播、K歌、音頻錄製等場景下廣泛應用,開發者可根據實際業務場景需要設置,一套代碼可實現跨平臺音視頻耳返功能,節省開發成本。 實時音視頻的耳返作用就是在嘈雜的環境下,清楚地聽伴奏和自己的聲音,來鑒定自己有 ...
  • 最近在完成軟體體繫結構上機實驗時,遇到一個有點點小難度的選做題,題目信息如下: 利用套接字技術實現應用程式中對資料庫的訪問。應用程式只是利用套接字連接向伺服器發送一個查詢的條件,而伺服器負責對資料庫的查詢,然後伺服器再將查詢的結果利用建立的套接字返回給客戶端,如下圖所示。 本來吧,選做題,不太想做的 ...
  • 一、bean被創建的時間 考慮一個問題,我們都知道spring通過xml的配置創建bean,那麼bean是什麼時間被創建的呢?是在我們getBean()的時候創建的嗎? 我們來做一個測試: 1.首先建立一個User類: package com.jms.pojo; public class User ...
  • Microsoft Word 提供了許多易於使用的文檔操作工具,同時也提供了豐富的功能集供創建複雜的文檔使用。在使用的時候,你可能需要複製一個文檔裡面的內容到另一個文檔。複製的內容可支持包括文本、圖片、表格、超鏈接、書簽、批註、形狀、編號列表、腳註、章節附註等等在內的多種元素。 ...
  • 摘要:本文講述圖像金字塔知識,瞭解專門用於圖像向上採樣和向下採樣的pyrUp()和pyrDown()函數。 本文分享自華為雲社區《[Python圖像處理] 二十一.圖像金字塔之圖像向下取樣和向上取樣》,作者:eastmount。 一.圖像金字塔 圖像金字塔是指由一組圖像且不同分別率的子圖集合,它是圖 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...