MyDisruptor V5版本介紹 在v4版本的MyDisruptor實現多線程生產者後。按照計劃,v5版本的MyDisruptor需要支持更便於用戶使用的DSL風格的API。 由於該文屬於系列博客的一部分,需要先對之前的博客內容有所瞭解才能更好地理解本篇博客 v1版本博客:從零開始實現lmax- ...
MyDisruptor V5版本介紹
在v4版本的MyDisruptor實現多線程生產者後。按照計劃,v5版本的MyDisruptor需要支持更便於用戶使用的DSL風格的API。
由於該文屬於系列博客的一部分,需要先對之前的博客內容有所瞭解才能更好地理解本篇博客
- v1版本博客:從零開始實現lmax-Disruptor隊列(一)RingBuffer與單生產者、單消費者工作原理解析
- v2版本博客:從零開始實現lmax-Disruptor隊列(二)多消費者、消費者組間消費依賴原理解析
- v3版本博客:從零開始實現lmax-Disruptor隊列(三)多線程消費者WorkerPool原理解析
- v4版本博客:從零開始實現lmax-Disruptor隊列(四)多線程生產者MultiProducerSequencer原理解析
為什麼Disruptor需要DSL風格的API
通過前面4個版本的迭代,MyDisruptor已經實現了disruptor的大多數功能。但對程式可讀性有要求的讀者可能會註意到,之前給出的demo示例代碼中對於構建多個消費者之間的依賴關係時細節有點多。
構建一個有上游消費者依賴的EventProcessor消費者一般來說需要通過以下幾步完成:
- 獲得所要依賴的上游消費者序列集合,併在創建EventProcessor時通過參數傳入
- 獲得所創建的EventProcessor對應的消費者序列對象
- 將獲得的消費者序列對象註冊到RingBuffer中
- 通過線程池或者start等方式啟動EventProcessor線程,開始監聽消費
目前的版本中,每創建一個消費者都需要寫一遍上述的模板代碼。對於理解Disruptor原理的人來說還勉強能接受,但還是很繁瑣且容易在細節上犯錯,更遑論對disruptor底層不大瞭解的普通用戶。
基於上述原因,disruptor提供了更加簡單易用的DSL風格API,使得對disruptor底層各組件間交互不甚瞭解的用戶也能很方便的使用disruptor,去構建不同消費者組間的依賴關係。
什麼是DSL風格的API?
DSL即Domain Specific Language,領域特定語言。DSL是針對特定領域抽象出的一個特定語言,通過進一層的抽象來代替大量繁瑣的通用代碼段,如sql、shell等都是常見的dsl。
而DSL風格的API最大的特點就是介面的定義貼合業務場景,因此易於理解和使用。
MyDisruptor DSL風格API實現詳解
Disruptor
首先要介紹的就是Disruptor類,disruptor類主要用於創建一個符合用戶需求的RingBuffer,並提供一組易用的api以屏蔽底層組件交互的細節。
MyDisruptor類的構造函數有五個參數,分別是:
- 用戶自定義的事件生產器(EventFactory)
- RingBuffer的容量大小
- 消費者執行器(juc的Executor實現類)
- 生產者類型枚舉(指定單線程生產者 or 多線程生產者)
- 消費者阻塞策略實現(WaitStrategy)
以上都是需要用戶自定義或者指定的核心參數,構建好的disruptor的同時,也生成了RingBuffer和指定類型的生產者序列器。
/**
* disruptor dsl(仿Disruptor.Disruptor)
* */
public class MyDisruptor<T> {
private final MyRingBuffer<T> ringBuffer;
private final Executor executor;
private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
public MyDisruptor(
final MyEventFactory<T> eventProducer,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final MyWaitStrategy myWaitStrategy) {
this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy);
this.executor = executor;
}
/**
* 獲得當親Disruptor的ringBuffer
* */
public MyRingBuffer<T> getRingBuffer() {
return ringBuffer;
}
// 註意:省略了大量無關代碼
}
EventHandlerGroup
創建好Disruptor後,便可以按照需求編排各種消費者的依賴邏輯了。創建消費者時除了用戶自定義的消費量邏輯介面(EventHandler/WorkHandler),還有兩個關鍵要素需要指定,一是指定是單線程生產者還是多線程,二是指定當前消費者的上游消費者序列集合(或者沒有)。
兩兩組合四種情況,為此Disruptor類一共提供了四個方法用於創建消費者:
- handleEventsWith(創建無上游消費者依賴的單線程消費者)
- createEventProcessors(創建有上游消費者依賴的單線程消費者)
- handleEventsWithWorkerPool(創建無上游消費者依賴的多線程消費者)
- createWorkerPool(創建有上游消費者依賴的多線程消費者)
這四個方法的返回值都是EventHandlerGroup對象,其中提供了關鍵的then/thenHandleEventsWithWorkerPool方法用來鏈式的編排多個消費者組。
實際上disruptor中的EventHandlerGroup還提供了等更多的dsl風格的方法(如and),限於篇幅MyDisruptor中只實現了最關鍵的幾個方法。
/**
* DSL事件處理器組(仿Disruptor.EventHandlerGroup)
* */
public class MyEventHandlerGroup<T> {
private final MyDisruptor<T> disruptor;
private final MyConsumerRepository<T> myConsumerRepository;
private final MySequence[] sequences;
public MyEventHandlerGroup(MyDisruptor<T> disruptor,
MyConsumerRepository<T> myConsumerRepository,
MySequence[] sequences) {
this.disruptor = disruptor;
this.myConsumerRepository = myConsumerRepository;
this.sequences = sequences;
}
@SafeVarargs
public final MyEventHandlerGroup<T> then(final MyEventHandler<T>... myEventHandlers) {
return handleEventsWith(myEventHandlers);
}
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... handlers) {
return disruptor.createEventProcessors(sequences, handlers);
}
@SafeVarargs
public final MyEventHandlerGroup<T> thenHandleEventsWithWorkerPool(final MyWorkHandler<T>... handlers) {
return handleEventsWithWorkerPool(handlers);
}
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... handlers) {
return disruptor.createWorkerPool(sequences, handlers);
}
}
MyDisruptor完整代碼
/**
* disruptor dsl(仿Disruptor.Disruptor)
* */
public class MyDisruptor<T> {
private final MyRingBuffer<T> ringBuffer;
private final Executor executor;
private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
public MyDisruptor(
final MyEventFactory<T> eventProducer,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final MyWaitStrategy myWaitStrategy) {
this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy);
this.executor = executor;
}
/**
* 註冊單線程消費者 (無上游依賴消費者,僅依賴生產者序列)
* */
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... myEventHandlers){
return createEventProcessors(new MySequence[0], myEventHandlers);
}
/**
* 註冊單線程消費者 (有上游依賴消費者,僅依賴生產者序列)
* @param barrierSequences 依賴的序列屏障
* @param myEventHandlers 用戶自定義的事件消費者集合
* */
public MyEventHandlerGroup<T> createEventProcessors(
final MySequence[] barrierSequences,
final MyEventHandler<T>[] myEventHandlers) {
final MySequence[] processorSequences = new MySequence[myEventHandlers.length];
final MySequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
int i=0;
for(MyEventHandler<T> myEventConsumer : myEventHandlers){
final MyBatchEventProcessor<T> batchEventProcessor =
new MyBatchEventProcessor<>(ringBuffer, myEventConsumer, barrier);
processorSequences[i] = batchEventProcessor.getCurrentConsumeSequence();
i++;
// consumer對象都維護起來,便於後續start時啟動
consumerRepository.add(batchEventProcessor);
}
// 更新當前生產者註冊的消費者序列
updateGatingSequencesForNextInChain(barrierSequences,processorSequences);
return new MyEventHandlerGroup<>(this,this.consumerRepository,processorSequences);
}
/**
* 註冊多線程消費者 (無上游依賴消費者,僅依賴生產者序列)
* */
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... myWorkHandlers) {
return createWorkerPool(new MySequence[0], myWorkHandlers);
}
/**
* 註冊多線程消費者 (有上游依賴消費者,僅依賴生產者序列)
* @param barrierSequences 依賴的序列屏障
* @param myWorkHandlers 用戶自定義的事件消費者集合
* */
public MyEventHandlerGroup<T> createWorkerPool(
final MySequence[] barrierSequences, final MyWorkHandler<T>[] myWorkHandlers) {
final MySequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final MyWorkerPool<T> workerPool = new MyWorkerPool<>(ringBuffer, sequenceBarrier, myWorkHandlers);
// consumer都保存起來,便於start啟動
consumerRepository.add(workerPool);
final MySequence[] workerSequences = workerPool.getCurrentWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new MyEventHandlerGroup<>(this, consumerRepository,workerSequences);
}
private void updateGatingSequencesForNextInChain(final MySequence[] barrierSequences, final MySequence[] processorSequences) {
if (processorSequences.length != 0) {
// 這是一個優化操作:
// 由於新的消費者通過ringBuffer.newBarrier(barrierSequences),已經是依賴於之前ringBuffer中已有的消費者序列
// 消費者即EventProcessor內部已經設置好了老的barrierSequences為依賴,因此可以將ringBuffer中已有的消費者序列去掉
// 只需要保存,依賴當前消費者鏈條最末端的序列即可(也就是最慢的序列),這樣生產者可以更快的遍歷註冊的消費者序列
for(MySequence sequenceV4 : barrierSequences){
ringBuffer.removeConsumerSequence(sequenceV4);
}
for(MySequence sequenceV4 : processorSequences){
// 新設置的就是當前消費者鏈條最末端的序列
ringBuffer.addConsumerSequence(sequenceV4);
}
}
}
/**
* 啟動所有已註冊的消費者
* */
public void start(){
// cas設置啟動標識,避免重覆啟動
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("Disruptor只能啟動一次");
}
// 遍歷所有的消費者,挨個start啟動
this.consumerRepository.getConsumerInfos().forEach(
item->item.start(this.executor)
);
}
/**
* 獲得當親Disruptor的ringBuffer
* */
public MyRingBuffer<T> getRingBuffer() {
return ringBuffer;
}
}
Disruptor內部消費者依賴編排的性能小優化
- 在上面完整的MyDisruptor實現中可以看到,在每次構建消費者後都執行了updateGatingSequencesForNextInChain這個方法。方法中將當前消費者序列號註冊進RingBuffer的同時,還將傳入的上游barrierSequence集合從當前RingBuffer中移除。
這樣做主要是為了提高生產者在獲取當前最慢消費者時的性能。 - 在沒有這個優化之前,所有的消費者的序列號都會被註冊到RingBuffer中,而生產者通過getMinimumSequence方法遍歷所有註冊的消費者序列集合獲得其中最小的序列值(最慢的消費者)。
- 我們知道,通過Disruptor的DSL介面創建的消費者之間是存在依賴關係的,每個消費者的實現內部保證了其自身的序列號不會超過上游的消費者序列。所以在存在上下游依賴關係的、所有消費者序列的集合中,最慢的消費者必然是處於下游的消費者序列號。
所以在RingBuffer中就可以不再維護更上游的消費者序列號,從而加快getMinimumSequence方法中遍曆數組的速度。
MyDisruptorV5版本demo示例
下麵通過一個簡單但不失一般性的示例,來展示一下DSL風格API到底簡化了多少複雜度。
不使用DSL風格API的示例
public class MyRingBufferV5DemoOrginal {
/**
* 消費者依賴關係圖(簡單起見都是單線程消費者):
* A -> BC -> D
* -> E -> F
* */
public static void main(String[] args) {
// 環形隊列容量為16(2的4次方)
int ringBufferSize = 16;
// 創建環形隊列
MyRingBuffer<OrderEventModel> myRingBuffer = MyRingBuffer.createSingleProducer(
new OrderEventProducer(), ringBufferSize, new MyBlockingWaitStrategy());
// 獲得ringBuffer的序列屏障(最上游的序列屏障內只維護生產者的序列)
MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier();
// ================================== 基於生產者序列屏障,創建消費者A
MyBatchEventProcessor<OrderEventModel> eventProcessorA =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier);
MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence();
// RingBuffer監聽消費者A的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA);
// ================================== 通過消費者A的序列號創建序列屏障(構成消費的順序依賴),創建消費者B
MySequenceBarrier mySequenceBarrierB = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorB =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerB"), mySequenceBarrierB);
MySequence consumeSequenceB = eventProcessorB.getCurrentConsumeSequence();
// RingBuffer監聽消費者B的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceB);
// ================================== 通過消費者A的序列號創建序列屏障(構成消費的順序依賴),創建消費者C
MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorC =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC);
MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence();
// RingBuffer監聽消費者C的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC);
// ================================== 消費者D依賴上游的消費者B,C,通過消費者B、C的序列號創建序列屏障(構成消費的順序依賴)
MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(consumeSequenceB,consumeSequenceC);
// 基於序列屏障,創建消費者D
MyBatchEventProcessor<OrderEventModel> eventProcessorD =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD);
MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence();
// RingBuffer監聽消費者D的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD);
// ================================== 通過消費者A的序列號創建序列屏障(構成消費的順序依賴),創建消費者E
MySequenceBarrier mySequenceBarrierE = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorE =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerE"), mySequenceBarrierE);
MySequence consumeSequenceE = eventProcessorE.getCurrentConsumeSequence();
// RingBuffer監聽消費者E的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceE);
// ================================== 通過消費者E的序列號創建序列屏障(構成消費的順序依賴),創建消費者F
MySequenceBarrier mySequenceBarrierF = myRingBuffer.newBarrier(consumeSequenceE);
MyBatchEventProcessor<OrderEventModel> eventProcessorF =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerF"), mySequenceBarrierF);
MySequence consumeSequenceF = eventProcessorF.getCurrentConsumeSequence();
// RingBuffer監聽消費者F的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceF);
Executor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
// 啟動消費者線程A
executor.execute(eventProcessorA);
// 啟動消費者線程B
executor.execute(eventProcessorB);
// 啟動消費者線程C
executor.execute(eventProcessorC);
// 啟動消費者線程D
executor.execute(eventProcessorD);
// 啟動消費者線程E
executor.execute(eventProcessorE);
// 啟動消費者線程F
executor.execute(eventProcessorF);
// 生產者發佈100個事件
for(int i=0; i<100; i++) {
long nextIndex = myRingBuffer.next();
OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
orderEvent.setMessage("message-"+i);
orderEvent.setPrice(i * 10);
System.out.println("生產者發佈事件:" + orderEvent);
myRingBuffer.publish(nextIndex);
}
}
}
使用DSL風格APi的示例
public class MyRingBufferV5DemoUseDSL {
/**
* 消費者依賴關係圖(簡單起見都是單線程消費者):
* A -> BC -> D
* -> E -> F
* */
public static void main(String[] args) {
// 環形隊列容量為16(2的4次方)
int ringBufferSize = 16;
MyDisruptor<OrderEventModel> myDisruptor = new MyDisruptor<>(
new OrderEventProducer(), ringBufferSize,
new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()),
ProducerType.SINGLE,
new MyBlockingWaitStrategy()
);
MyEventHandlerGroup<OrderEventModel> hasAHandlerGroup = myDisruptor.handleEventsWith(new OrderEventHandlerDemo("consumerA"));
hasAHandlerGroup.then(new OrderEventHandlerDemo("consumerB"),new OrderEventHandlerDemo("consumerC"))
.then(new OrderEventHandlerDemo("consumerD"));
hasAHandlerGroup.then(new OrderEventHandlerDemo("consumerE"))
.then(new OrderEventHandlerDemo("consumerF"));
// 啟動disruptor中註冊的所有消費者
myDisruptor.start();
MyRingBuffer<OrderEventModel> myRingBuffer = myDisruptor.getRingBuffer();
// 生產者發佈100個事件
for(int i=0; i<100; i++) {
long nextIndex = myRingBuffer.next();
OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
orderEvent.setMessage("message-"+i);
orderEvent.setPrice(i * 10);
System.out.println("生產者發佈事件:" + orderEvent);
myRingBuffer.publish(nextIndex);
}
}
}
- 可以看到實現同樣的業務邏輯時,使用DSL風格的API由於減少了大量的模板代碼,代碼量大幅減少的同時還增強了程式的可讀性。這證明瞭disruptor的DSL風格API設計是很成功的。
總結
- 本篇博客介紹了Disruptor的DSL風格的API最核心的實現邏輯,並且通過對比展示了相同業務下DSL風格的API簡單易理解的特點。
- 限於篇幅,自己實現的MyDisruptor中並沒有將disruptor中DSL風格的API功能全部實現,而僅僅實現了最常用、最核心的一部分。
感興趣的讀者可以在理解當前v5版本MyDisruptor的基礎之上,通過閱讀disruptor的源碼做進一步瞭解。 - 目前v5版本的MyDisruptor已經實現了disruptor的絕大多數功能,最後的v6版本中將會對MyDisruptor中已有的缺陷進行進一步的優化。
v6版本的MyDisruptor將會解決偽共用、優雅終止等關鍵問題併進行對應原理的解析,敬請期待。
disruptor無論在整體設計還是最終代碼實現上都有很多值得反覆琢磨和學習的細節,希望能幫助到對disruptor感興趣的小伙伴。
本篇博客的完整代碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab5