前言 Disruptor是一個高性能的無鎖併發框架,其主要應用場景是在高併發、低延遲的系統中,如金融領域的交易系統,游戲伺服器等。其優點就是非常快,號稱能支撐每秒600萬訂單。需要註意的是,Disruptor是單機框架,對標JDK中的Queue,而非可用於分散式系統的MQ 本文基於Disruptor ...
前言
Disruptor是一個高性能的無鎖併發框架,其主要應用場景是在高併發、低延遲的系統中,如金融領域的交易系統,游戲伺服器等。其優點就是非常快,號稱能支撐每秒600萬訂單。需要註意的是,Disruptor是單機框架,對標JDK中的Queue,而非可用於分散式系統的MQ
本文基於Disruptor v3.4.*版本
Demo
既然是簡單使用,這階段只需要關註:
- 生產者
- 消費者:EventHandler
- 消息的傳遞:消息的載體Event
簡單例子
首先,我們定義消息的載體Event,生產者向消費者傳遞的消息通過Event承載
class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" + "value=" + value + '}';
}
}
然後定義Event生產工廠,這用於初始化Event
EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
};
接下來就可以構建Disruptor了,以下是完整代碼
// 消息載體(event)
static class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" + "value=" + value + '}';
}
}
// 發佈消息的轉換器
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception {
// event生產工廠,初始化RingBuffer的時候使用
EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
};
// 指定RingBuffer的大小(必須是2的n次方)
int bufferSize = 1024;
// 構造Disruptor(預設使用多生產者模式、BlockingWaitStrategy阻塞策略)
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy());
// 設置消費者
EventHandler<LongEvent> handler = (event, sequence, endOfBatch) -> {
System.out.println("Event: " + event);
};
disruptor.handleEventsWith(handler);
// 啟動disruptor,啟動所有需要運行的線程
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long i = 0; i < 100; i++) {
bb.putLong(i);
// 發佈事件
ringBuffer.publishEvent(LongEventMain::translate, bb);
}
}
消費者組合(多使用場景)
Disruptor不僅可以當高性能的隊列使用,還支持消費者的串列、並行消費等
以下只展示關鍵代碼(設置消費者),其餘部分參考上一節的簡單demo
-
單鏈串列
disruptor.handleEventsWith(handlerA).then(handlerB);
-
並行
disruptor.handleEventsWith(handlerA, handlerB);
-
鏈內串列,多鏈並行
disruptor.handleEventsWith(handlerA).then(handlerC); disruptor.handleEventsWith(handlerB).then(handlerD);
-
菱形(C、D都執行完才到E)
disruptor.handleEventsWith(handlerA).then(handlerC); disruptor.handleEventsWith(handlerB).then(handlerD); disruptor.after(handlerC, handlerD).then(handlerE);
-
分組(AB都執行完才到CD)
disruptor.handleEventsWith(handlerA, handlerB).then(handlerC, handlerD);
-
分組不重覆消費
組內競爭,組外串列:每個消息在每個分組中只有一個消費者能消費成功,如果就是分組A中只有HandlerA2能得到數據,分組B中只有HandlerB1獲得
// 註意:此處的handler實現的是WorkHandler介面 disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3) .then(handlerB1, handlerB2, handlerB3);
-
分組不重覆消費(菱形)
// handlerA、handlerB實現WorkHandler介面 // handlerC 實現EventHandler或WorkHandler介面均可 disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3) .then(handlerB1, handlerB2, handlerB3) .then(handlerC);
等待策略
消費者速度比生產者快時,需要等待。因此就有了不同的等待策略以適應不同場景
-
BlockingWaitStrategy
預設策略。使用鎖和 Condition 的等待、喚醒機制。速度慢,但節省CPU資源並且在不同部署環境中能提供更加一致的性能表現。
-
YieldingWaitStrategy
二段式,一階段自旋100次,二階段執行Thread.yield,需要低延遲的場景可使用此策略
-
SleepingWaitStrategy
三段式,一階段自旋,二階段執行Thread.yield,三階段睡眠
-
BusySpinWaitStrategy
性能最高的策略,與 YieldingWaitStrategy 一樣在低延遲場景使用,但是此策略要求消費者數量低於 CPU 邏輯內核總數
其他小技巧
-
清除消息載體 Event 中的數據
如果 Event 中存在大對象,應該在消費者鏈的末尾,添加一個清除數據的消費者,以幫助jvm垃圾回收。demo中的 LongEvent 是
private long value;
所以沒必要添加。
-
總結
本文介紹了 Disruptor 的簡單使用,以及複雜場景下消費者的配置。下篇開坑 Disruptor 源碼解析。
參考資料