一 概述 1.Disruptor Disruptor是一個高性能的非同步處理框架,一個“生產者-消費者”模型。 2.RingBuffer RingBuffer是一種環形數據結構,包含一個指向下一個槽點的序號,可以線上程間傳遞數據。 3.Event 在Disruptor框架中,生產者生產的數據叫做Eve ...
一 概述
1.Disruptor
Disruptor是一個高性能的非同步處理框架,一個“生產者-消費者”模型。
2.RingBuffer
RingBuffer是一種環形數據結構,包含一個指向下一個槽點的序號,可以線上程間傳遞數據。
3.Event
在Disruptor框架中,生產者生產的數據叫做Event。
二 Disruptor框架基本構成
1.MyEvent:自定義對象,充當“生產者-消費者”模型中的數據。
2.MyEventFactory:實現EventFactory的介面,用於生產數據。
3.MyEventProducerWithTranslator:將數據存儲到自定義對象中併發布。
4.MyEventHandler:自定義消費者。
三 Demo
初次接觸Disruptor,認識停留在錶面,零散,模糊,在此記一個簡單的示例,以便日後深入研究。
1.自定義數據類
package com.disruptor.basic; public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
2.數據生產工廠(創建數據類對象)
package com.disruptor.basic; import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { // TODO Auto-generated method stub return new LongEvent(); } }
3.數據源(初始化數據對象併發布)
package com.disruptor.basic; import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { /** * event:包含有消費數據的對象; sequence:分配給目標對象的RingBuffer空間序號; * bb:包含有將要被存儲到目標對象中的數據的容器 */ public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { // TODO Auto-generated method stub event.setValue(bb.getLong(0));// 將數據存儲到目標對象中 } }; public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb);// 發佈,將數據推送給消費者 } }
4.消費者
package com.disruptor.basic; import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { // TODO Auto-generated method stub System.out.println("當前消費的數據="+event.getValue()); } }
5.測試類
package com.disruptor.basic; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.Test; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; public class LongEventTest { @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void test01() throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); EventFactory<LongEvent> factory = new LongEventFactory(); int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new LongEventHandler()); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // LongEventProducer producer = new // LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); // long startTime = System.currentTimeMillis(); for (long a = 0; a < 100; a++) { bb.putLong(0, a); producer.onData(bb); /*if (a == 99) { long endTime = System.currentTimeMillis(); System.out.println("useTime=" + (endTime - startTime)); }*/ Thread.sleep(100); } /*long endTime = System.currentTimeMillis(); System.out.println("useTime=" + (endTime - startTime));*/ disruptor.shutdown(); executor.shutdown(); } /*@Test public void test02() { long startTime = System.currentTimeMillis(); for (long a = 0; a < 100; a++) { System.out.println(a); } long endTime = System.currentTimeMillis(); System.out.println("useTime=" + (endTime - startTime)); }*/ }