Reactor 模式的簡單實現

来源:http://www.cnblogs.com/f1194361820/archive/2016/07/17/5679796.html
-Advertisement-
Play Games

Reactor 模式簡單實現 在網上有部分文章在描述Netty時,會提到Reactor。這個Reactor到底是什麼呢?為了搞清楚Reactor到底是什麼鬼,我寫了一個簡單的Demo,來幫助大家理解他。 網上是這麼描述Reactor的: The Reactor design pattern hand ...


Reactor 模式簡單實現

 

在網上有部分文章在描述Netty時,會提到Reactor。這個Reactor到底是什麼呢?為了搞清楚Reactor到底是什麼鬼,我寫了一個簡單的Demo,來幫助大家理解他。

 

網上是這麼描述Reactor的:

The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients.

Each service in an application may consist of serveral methods and is represented by a separate event handler that is responsible for dispatching service-specific requests.

Dispatching of event handlers is performed by an initiation dispatcher, which manages the registered event handlers. Demultiplexing of service requests is performed by a synchronous event demultiplexer.

    大致意思是:Reactor是用於處理多個客戶端的請求的設計模式。應用程式提供的每一種服務都可能包括多個方法,並且有必要為這每一個服務分配獨立的請求處理器(也可以說是 event handler)。對於Event handler的調度是有Dispatcher來執行的,這個Dispatcher可以管理event handler的註冊工作。而分離器Demultiplexer則將一個服務分成了多份。這段話看起來還是不那麼容易理解的。

    我對這段話的理解是:應用程式提供多種服務,而每一種服務都會分為多步驟(或者多類別)進行。這裡將每一步都作為一個事件,那麼每一步的處理就認為是一個event handler。Dispatcher管理這多個步驟的處理器,也即dispatcher管理著多個Event Handler。而將一個服務處理分為多步驟(多個類別)的處理的工作則是有分離器來完成。

 

 

 

 

從這個類圖上看,主要有四個角色:

·Handle:事件源。

    ·EventHandler:事件處理器

    ·Dispatcher:調度器。使用Demultiplexer選出可以執行處理的EventHandler,然後執行對EventHandler的調度。

    ·Demultiplexer:同步的事件分離器。

   

 

從類圖上看:Dispatcher中有一個Selector和一個EventHandler集合(可以是List,也可以是Map,具體怎麼實現根據實際需求)。Regist_handler、remove_handler用於管理EventHandler。

Handle_events用於啟動調度,這個方法的實現通常是:使用分離器選擇出可以調度的Event,然後對它們進行調度。

 

下麵就是對Reactor模式的簡單實現:

 

package com.fjn.jdk.nio.reactor.standard;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

public class StandardReactor {

}

class EventDispatcher {
    Map<EventType, EventHandler> eventHandlerMap = new ConcurrentHashMap<EventType, EventHandler>();

    Demultiplexer selector;

    EventDispatcher(Demultiplexer selector) {
        this.selector = selector;
    }

    public void registEventHandler(EventType eventType, EventHandler eventHandler) {
        eventHandlerMap.put(eventType, eventHandler);

    }

    public void removeEventHandler(EventType eventType) {
        eventHandlerMap.remove(eventType);
    }

    public void handleEvents() {
        dispatch();
    }

    private void dispatch() {
        while (true) {
            List<Event> events = selector.select();

            for (Event event : events) {
                EventHandler eventHandler = eventHandlerMap.get(event.type);
                eventHandler.handle(event);
            }
        }
    }
}

class Demultiplexer {
    private BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>();
    private Object lock = new Object();

    List<Event> select() {
        return select(0);
    }

    List<Event> select(long timeout) {
        if (timeout > 0) {
            if (eventQueue.isEmpty()) {
                synchronized (lock) {
                    if (eventQueue.isEmpty()) {
                        try {
                            lock.wait(timeout);
                        } catch (InterruptedException e) {
                            // ignore it
                        }
                    }
                }

            }
        }
        List<Event> events = new ArrayList<Event>();
        eventQueue.drainTo(events);
        return events;
    }

    public void addEvent(Event e) {
        boolean success = eventQueue.offer(e);
        if (success) {
            synchronized (lock) {
                lock.notify();
            }

        }
    }

}

class Source {
    private Date date = new Date();
    private String id = date.toString() + "_" + System.identityHashCode(date);

    @Override
    public String toString() {
        return id;
    }
}

enum EventType {
    ACCEPT, READ, WRITE, TIMEOUT;
}

class Event {
    public EventType type;
    public Source source;
}

abstract class EventHandler {
    Source source;

    public abstract void handle(Event event);

    public Source getSource() {
        return source;
    }
}

class AcceptEventHandler extends EventHandler {

    private Demultiplexer selector;

    public AcceptEventHandler(Demultiplexer selector) {
        this.selector = selector;
    }

    @Override
    public void handle(Event event) {
        if (event.type == EventType.ACCEPT) {

            Event readEvent = new Event();
            readEvent.source = event.source;
            readEvent.type = EventType.READ;

            selector.addEvent(readEvent);
        }
    }

}

class ReadEventHandler extends EventHandler {
    // private Pipeline pipeline;
    
    @Override
    public void handle(Event event) {
        // create channel with a pipeline
        // register the channel to this event dispatcher or a child event dispatcher 

        
        // handle event use the pipeline :
        // step 1:  read to a frame buffer
        // step 2:  use frame decoder to decode buffer as a message (maybe a business object)
        // step 3:  handle the message or submit the message to business thread pool
        // step 4:  register a message event
        
    }
    
} 

class WriteEventHandler extends EventHandler {
    
    @Override
    public void handle(Event event) {
        // step 1: encode a message to byte[]
        // step 2: submit a write task to IOWorker thread pool 
    }
    
}

//-------------------------------分割線--------------------------//


class Acceptor implements Runnable {
    private int port; // server socket port
    private Demultiplexer selector;

    // 代表 serversocket
    private BlockingQueue<Source> sourceQueue = new LinkedBlockingQueue<Source>();

    Acceptor(Demultiplexer selector, int port) {
        this.selector = selector;
        this.port = port;
    }

    public void aNewConnection(Source source) {
        sourceQueue.offer(source);
    }

    public int getPort() {
        return this.port;
    }

    public void run() {
        while (true) {

            Source source = null;
            try {
                // 相當於 serversocket.accept()
                source = sourceQueue.take();
            } catch (InterruptedException e) {
                // ignore it;
            }

            if (source != null) {
                Event acceptEvent = new Event();
                acceptEvent.source = source;
                acceptEvent.type = EventType.ACCEPT;

                selector.addEvent(acceptEvent);
            }

        }
    }

}



class Server {
    Demultiplexer selector = new Demultiplexer();
    EventDispatcher eventLooper = new EventDispatcher(selector);
    Acceptor acceptor;

    Server(int port) {
        acceptor = new Acceptor(selector, port);
    }

    public void start() {
        eventLooper.registEventHandler(EventType.ACCEPT, new AcceptEventHandler(selector));
        new Thread(acceptor, "Acceptor-" + acceptor.getPort()).start();
        eventLooper.handleEvents();
    }

}

 

  

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在上一章節Spring學習筆記1——IOC: 儘量使用註解以及java代碼中,已經搭建了項目的整體框架,介紹了IOC以及mybatis。第二節主要介紹SpringMVC中的表單數據驗證以及文件上傳,歡迎訪問https://github.com/everseeker0307/register。 一、表 ...
  • web.xml 添加下麵的就可以了 ...
  • 話說我們做程式員的,都應該多少是個懶人,我們總是想辦法驅使我們的電腦幫我們幹活,所以我們學會了各式各樣的語言來告訴電腦該做什麼——儘管,他們有時候也會誤會我們的意思。 ...
  • 一、Spring IOC 容器支持自動裝配 Bean,所謂自動裝配是指,不需要通過 <property> 或 <constructor-arg> 為 Bean 的屬性註入值的過程。 二、配置: 在 <bean> 的 autowire 屬性里指定自動裝配的模式。預設為 no 。可以通過 <beans> ...
  • Java中ScheduleThreadPoolExecutor主要用於執行延遲任務或者按照一定的頻率執行任務。其中scheduleAtFixedRate函數是按照一定頻率執行任務,scheduleWithFixedDelay可以根據延遲一定時間再執行任務。本文將參考ScheduleThreadPoo ...
  • 首先可以去http://www.oracle.com/technetwork/java/javase/downloads下載jdk安裝包。目前jdk已經更新到了8u91/8u92的版本,不過此處我用的jdk版本仍然是jdk8u51,下麵介紹其安裝步驟與環境變數的配置 雙擊應用程式jdk-8u51-w ...
  • 在最近的一次大數據技術討論會上,有一家公司的技術高管談到松耦合和緊耦合的性能表現的話題。正好Laxcus大數據管理系統的設計,從0.x、1.x到2.x版本,也經歷了從緊耦合到松耦合的發展過程。做為親歷者,對這兩種架構的設計和運行效果,我們有清楚的瞭解和認識。下麵就說一說這件事。寫此博文,也希望給做系... ...
  • ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...