通過分析如下代碼,大致瞭解Disruptor的原理 1. 第2行代碼 EventFactory<LongEvent> eventFactory = new LongEventFactory(); 數據工廠類構造單個數據,disruptor使用此工廠類預分配數據。 2. 第5行代碼 final Dis
通過分析如下代碼,大致瞭解Disruptor的原理
1 public static void main(String[] args)throws Exception{ 2 EventFactory<LongEvent> eventFactory = new LongEventFactory(); 3 int ringBufferSize = 1024; 4 ExecutorService executors = Executors.newCachedThreadPool(); 5 final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy()); 6 //final Disruptor<IntEvent> disruptor = new Disruptor<IntEvent>(eventFactory, ringBufferSize, executors,ProducerType.MULTI, new BlockingWaitStrategy()); //多生產者 7 Consumer consumer1 = new Consumer(); 8 Consumer consumer2 = new Consumer(); 9 EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2); 10 Consumer consumer21 = new Consumer(); 11 Consumer consumer22 = new Consumer(); 12 EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22); 13 disruptor.start(); //啟動disruptor,consumer開始等待,消費數據 14 Producer producer = new Producer(disruptor); 15 16 //啟動生產者 17 new Thread(producer).start(); 18 }
1. 第2行代碼 EventFactory<LongEvent> eventFactory = new LongEventFactory();
數據工廠類構造單個數據,disruptor使用此工廠類預分配數據。
2. 第5行代碼 final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy());
預分配數據,構建RingBuffer,指定生產者類型(單生產者、多生產者)、消費者執行的線程池、生產者等待可發佈數據空間和消費者等待可消費數據的策略。
3. 第9行代碼 EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2);
每個消費者Handler都會被封裝為一個Processor,其可消費序號由其sequence barrier決定。
4. 第12行代碼 EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22);
5. 第13行代碼 disruptor.start(); //啟動disruptor,consumer開始等待,消費數據
6. 第14-17行代碼,創建啟動生產者,發佈數據