Reactor 模式簡單實現 在網上有部分文章在描述Netty時,會提到Reactor。這個Reactor到底是什麼呢?為了搞清楚Reactor到底是什麼鬼,我寫了一個簡單的Demo,來幫助大家理解他。 網上是這麼描述Reactor的: The Reactor design pattern hand ...
Reactor 模式簡單實現
在網上有部分文章在描述Netty時,會提到Reactor。這個Reactor到底是什麼呢?為了搞清楚Reactor到底是什麼鬼,我寫了一個簡單的Demo,來幫助大家理解他。
網上是這麼描述Reactor的:
The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients.
Each service in an application may consist of serveral methods and is represented by a separate event handler that is responsible for dispatching service-specific requests.
Dispatching of event handlers is performed by an initiation dispatcher, which manages the registered event handlers. Demultiplexing of service requests is performed by a synchronous event demultiplexer.
大致意思是:Reactor是用於處理多個客戶端的請求的設計模式。應用程式提供的每一種服務都可能包括多個方法,並且有必要為這每一個服務分配獨立的請求處理器(也可以說是 event handler)。對於Event handler的調度是有Dispatcher來執行的,這個Dispatcher可以管理event handler的註冊工作。而分離器Demultiplexer則將一個服務分成了多份。這段話看起來還是不那麼容易理解的。
我對這段話的理解是:應用程式提供多種服務,而每一種服務都會分為多步驟(或者多類別)進行。這裡將每一步都作為一個事件,那麼每一步的處理就認為是一個event handler。Dispatcher管理這多個步驟的處理器,也即dispatcher管理著多個Event Handler。而將一個服務處理分為多步驟(多個類別)的處理的工作則是有分離器來完成。
從這個類圖上看,主要有四個角色:
·Handle:事件源。
·EventHandler:事件處理器
·Dispatcher:調度器。使用Demultiplexer選出可以執行處理的EventHandler,然後執行對EventHandler的調度。
·Demultiplexer:同步的事件分離器。
從類圖上看:Dispatcher中有一個Selector和一個EventHandler集合(可以是List,也可以是Map,具體怎麼實現根據實際需求)。Regist_handler、remove_handler用於管理EventHandler。
Handle_events用於啟動調度,這個方法的實現通常是:使用分離器選擇出可以調度的Event,然後對它們進行調度。
下麵就是對Reactor模式的簡單實現:
package com.fjn.jdk.nio.reactor.standard; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; public class StandardReactor { } class EventDispatcher { Map<EventType, EventHandler> eventHandlerMap = new ConcurrentHashMap<EventType, EventHandler>(); Demultiplexer selector; EventDispatcher(Demultiplexer selector) { this.selector = selector; } public void registEventHandler(EventType eventType, EventHandler eventHandler) { eventHandlerMap.put(eventType, eventHandler); } public void removeEventHandler(EventType eventType) { eventHandlerMap.remove(eventType); } public void handleEvents() { dispatch(); } private void dispatch() { while (true) { List<Event> events = selector.select(); for (Event event : events) { EventHandler eventHandler = eventHandlerMap.get(event.type); eventHandler.handle(event); } } } } class Demultiplexer { private BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(); private Object lock = new Object(); List<Event> select() { return select(0); } List<Event> select(long timeout) { if (timeout > 0) { if (eventQueue.isEmpty()) { synchronized (lock) { if (eventQueue.isEmpty()) { try { lock.wait(timeout); } catch (InterruptedException e) { // ignore it } } } } } List<Event> events = new ArrayList<Event>(); eventQueue.drainTo(events); return events; } public void addEvent(Event e) { boolean success = eventQueue.offer(e); if (success) { synchronized (lock) { lock.notify(); } } } } class Source { private Date date = new Date(); private String id = date.toString() + "_" + System.identityHashCode(date); @Override public String toString() { return id; } } enum EventType { ACCEPT, READ, WRITE, TIMEOUT; } class Event { public EventType type; public Source source; } abstract class EventHandler { Source source; public abstract void handle(Event event); public Source getSource() { return source; } } class AcceptEventHandler extends EventHandler { private Demultiplexer selector; public AcceptEventHandler(Demultiplexer selector) { this.selector = selector; } @Override public void handle(Event event) { if (event.type == EventType.ACCEPT) { Event readEvent = new Event(); readEvent.source = event.source; readEvent.type = EventType.READ; selector.addEvent(readEvent); } } } class ReadEventHandler extends EventHandler { // private Pipeline pipeline; @Override public void handle(Event event) { // create channel with a pipeline // register the channel to this event dispatcher or a child event dispatcher // handle event use the pipeline : // step 1: read to a frame buffer // step 2: use frame decoder to decode buffer as a message (maybe a business object) // step 3: handle the message or submit the message to business thread pool // step 4: register a message event } } class WriteEventHandler extends EventHandler { @Override public void handle(Event event) { // step 1: encode a message to byte[] // step 2: submit a write task to IOWorker thread pool } } //-------------------------------分割線--------------------------// class Acceptor implements Runnable { private int port; // server socket port private Demultiplexer selector; // 代表 serversocket private BlockingQueue<Source> sourceQueue = new LinkedBlockingQueue<Source>(); Acceptor(Demultiplexer selector, int port) { this.selector = selector; this.port = port; } public void aNewConnection(Source source) { sourceQueue.offer(source); } public int getPort() { return this.port; } public void run() { while (true) { Source source = null; try { // 相當於 serversocket.accept() source = sourceQueue.take(); } catch (InterruptedException e) { // ignore it; } if (source != null) { Event acceptEvent = new Event(); acceptEvent.source = source; acceptEvent.type = EventType.ACCEPT; selector.addEvent(acceptEvent); } } } } class Server { Demultiplexer selector = new Demultiplexer(); EventDispatcher eventLooper = new EventDispatcher(selector); Acceptor acceptor; Server(int port) { acceptor = new Acceptor(selector, port); } public void start() { eventLooper.registEventHandler(EventType.ACCEPT, new AcceptEventHandler(selector)); new Thread(acceptor, "Acceptor-" + acceptor.getPort()).start(); eventLooper.handleEvents(); } }