Reactor模式

来源:https://www.cnblogs.com/coding-diary/archive/2019/09/08/11484473.html
-Advertisement-
Play Games

什麼是Reactor模式 Reactor模式是一種設計模式,它是基於事件驅動的,可以併發的處理多個服務請求,當請求抵達後,依據多路復用策略,同步的派發這些請求至相關的請求處理程式。 Reactor模式角色構成 在早先的論文An Object Behavioral Pattern forDemulti ...


什麼是Reactor模式

Reactor模式是一種設計模式,它是基於事件驅動的,可以併發的處理多個服務請求,當請求抵達後,依據多路復用策略,同步的派發這些請求至相關的請求處理程式。

Reactor模式角色構成

在早先的論文An Object Behavioral Pattern for
Demultiplexing and Dispatching Handles for Synchronous Events
中Reactor模式主要有五大角色組成,分別如下:

Handle:操作系統提供的一種資源,用於表示一個個的事件,在網路編程中可以是一個連接事件,一個讀取事件,一個寫入事件,Handle是事件產生的發源地
Synchronous Event Demultiplexer:本質上是一個系統調用,用於等待事件的發生,調用方在調用它的時候會被阻塞,一直阻塞到同步事件分離器上有事件產生為止
Initiation Dispatcher:定義了一些用於控制事件的調度方式的規範,提供對事件管理。它本身是整個事件處理器的核心所在,Initiation Dispatcher會通過Synchronous Event Demultiplexer來等待事件的發生。一旦事件發生,Initiation Dispatcher首先會分離出每一個事件,然後調用事件處理器,最後調用相關的回調方法來處理這些事件
Event Handler:定義事件處理方法以供InitiationDispatcher回調使用
Concrete Event Handler:是事件處理器的實現。它本身實現了事件處理器所提供的各種回調方法,從而實現了特定於業務的邏輯。它本質上就是我們所編寫的一個個的處理器實現。

img

Reactor模式實現流程

  1. 初始化 Initiation Dispatcher,然後將若幹個Concrete Event Handler註冊到 Initiation Dispatcher中,應用會標識出該事件處理器希望Initiation Dispatcher在某些事件發生時向其發出通知
  2. Initiation Dispatcher 會要求每個事件處理器向其傳遞內部的Handle,該Handle向操作系統標識了事件處理器
  3. 當所有的Concrete Event Handler都註冊完畢後,就會啟動 Initiation Dispatcher的事件迴圈,使用Synchronous Event Demultiplexer同步阻塞的等待事件的發生
  4. 當與某個事件源對應的Handle變為ready狀態時,Synchronous Event Demultiplexer就會通知 Initiation Dispatcher
  5. Initiation Dispatcher會觸發事件處理器的回調方法響應這個事件

img

Java NIO對Reactor的實現

在Java的NIO中,對Reactor模式有無縫的支持,即使用Selector類封裝了操作系統提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的作者)在Scalable IO in Java中對此有非常詳細的描述。概況來說其主要流程如下:

  1. 伺服器端的Reactor線程對象會啟動事件迴圈,並使用Selector來實現IO的多路復用
  2. 註冊Acceptor事件處理器到Reactor中,Acceptor事件處理器所關註的事件是ACCEPT事件,這樣Reactor會監聽客戶端向伺服器端發起的連接請求事件
  3. 客戶端向伺服器端發起連接請求,Reactor監聽到了該ACCEPT事件的發生並將該ACCEPT事件派發給相應的Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客戶端對應的連接(SocketChannel),然後將該連接所關註的READ/WRITE事件以及對應的READ/WRITE事件處理器註冊到Reactor中,這樣一來Reactor就會監聽該連接的READ/WRITE事件了。
  4. 當Reactor監聽到有讀或者寫事件發生時,將相關的事件派發給對應的處理器進行處理
  5. 每當處理完所有就緒的感興趣的I/O事件後,Reactor線程會再次執行select()阻塞等待新的事件就緒並將其分派給對應處理器進行處理

Doug Lea 在Scalable IO in Java中分別描述了單線程的Reactor,多線程模式的Reactor以及多Reactor線程模式。

單線程的Reactor,主要依賴Java NIO中的Channel,Buffer,Selector,SelectionKey。在單線程Reactor模式中,不僅I/O操作在該Reactor線程上,連非I/O的業務操作也在該線程上進行處理了,這可能會大大延遲I/O請求的響應

img

在多線程Reactor中添加了一個工作線程池,將非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至於因為一些耗時的業務邏輯而延遲對後面I/O請求的處理,但是所有的I/O操作依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操作

img

多Reactor線程模式將“接受客戶端的連接請求”和“與該客戶端的通信”分在了兩個Reactor線程來完成。mainReactor完成接收客戶端連接請求的操作,它不負責與客戶端的通信,而是將建立好的連接轉交給subReactor線程來完成與客戶端的通信,這樣一來就不會因為read()數據量太大而導致後面的客戶端連接請求得不到即時處理的情況。並且多Reactor線程模式在海量的客戶端併發請求的情況下,還可以通過實現subReactor線程池來將海量的連接分發給多個subReactor線程,在多核的操作系統中這能大大提升應用的負載和吞吐量

img

代碼示例:

// NIO selector 多路復用reactor線程模型
public class NIOReactor {

  // 處理業務操作的線程池
  private static ExecutorService workPool = Executors.newCachedThreadPool();

  // 封裝了selector.select()等事件輪詢的代碼
  abstract class ReactorThread extends Thread {

    Selector selector;
    LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    volatile boolean running = false;

    private ReactorThread() throws IOException {
      selector = Selector.open();
    }

    // Selector監聽到有事件後,調用這個方法
    public abstract void handler(SelectableChannel channel) throws Exception;

    @Override
    public void run() {
      // 輪詢Selector事件
      while (running) {
        try {
          // 執行隊列中的任務
          Runnable task;
          while ((task = taskQueue.poll()) != null) {
            task.run();
          }
          selector.select(1000);
          // 獲取查詢結果
          Set<SelectionKey> selectionKeys = selector.selectedKeys();
          // 遍歷查詢結果
          Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
          while (keyIterator.hasNext()) {
            // 被封裝的查詢結果
            SelectionKey selectionKey = keyIterator.next();
            keyIterator.remove();
            int readyOps = selectionKey.readyOps();
            // 關註 Read 和 Accept兩個事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                || readyOps == 0) {
              try {
                SelectableChannel channel = (SelectableChannel) selectionKey.attachment();
                channel.configureBlocking(false);
                handler(channel);
                // 如果關閉了,就取消這個KEY的訂閱
                if (!channel.isOpen()) {
                  selectionKey.cancel();
                }

              } catch (Exception e) {
                // 如果有異常,就取消這個KEY的訂閱
                selectionKey.cancel();
                e.printStackTrace();
              }
            }
          }

        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

    private SelectionKey register(SelectableChannel channel) throws Exception {
      // 為什麼register要以任務提交的形式,讓reactor線程去處理?
      // 因為線程在執行channel註冊到selector的過程中,會和調用selector.select()方法的線程爭用同一把鎖
      // 而select()方法實在eventLoop中通過while迴圈調用的,爭搶的可能性很高,
      // 為了讓register能更快的執行,就放到同一個線程來處理
      FutureTask<SelectionKey> futureTask =
          new FutureTask<>(() -> channel.register(selector, 0, channel));
      taskQueue.add(futureTask);
      return futureTask.get();
    }

    private void doStart() {
      if (!running) {
        running = true;
        start();
      }
    }
  }

  private ServerSocketChannel serverSocketChannel;

  // 1、創建多個線程 - accept處理reactor線程 (accept線程)
  private ReactorThread[] mainReactorThreads = new ReactorThread[1];

  // 2、創建多個線程 - io處理reactor線程  (I/O線程)
  private ReactorThread[] subReactorThreads = new ReactorThread[8];

  // 初始化線程組
  private void newGroup() throws IOException {
    // 創建mainReactor線程, 只負責處理serverSocketChannel
    for (int i = 0; i < mainReactorThreads.length; i++) {
      mainReactorThreads[i] =
          new ReactorThread() {
            AtomicInteger incr = new AtomicInteger(0);

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // 只做請求分發,不做具體的數據讀取
              ServerSocketChannel ch = (ServerSocketChannel) channel;
              SocketChannel socketChannel = ch.accept();
              socketChannel.configureBlocking(false);
              // 收到連接建立的通知之後,分發給I/O線程繼續去讀取數據
              int index = incr.getAndIncrement() % subReactorThreads.length;
              ReactorThread workEventLoop = subReactorThreads[index];
              workEventLoop.doStart();
              SelectionKey selectionKey = workEventLoop.register(socketChannel);
              selectionKey.interestOps(SelectionKey.OP_READ);
              System.out.println(
                  Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
            }
          };
    }

    // 創建IO線程,負責處理客戶端連接以後socketChannel的IO讀寫
    for (int i = 0; i < subReactorThreads.length; i++) {
      subReactorThreads[i] =
          new ReactorThread() {

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // work線程只負責處理IO處理,不處理accept事件
              SocketChannel ch = (SocketChannel) channel;
              ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
              while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                // 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
                if (requestBuffer.position() > 0) break;
              }
              if (requestBuffer.position() == 0) return; // 如果沒數據了, 則不繼續後面的處理
              requestBuffer.flip();
              byte[] content = new byte[requestBuffer.limit()];
              requestBuffer.get(content);
              System.out.println(new String(content));
              System.out.println(
                  Thread.currentThread().getName() + "收到數據,來自:" + ch.getRemoteAddress());

              // TODO 業務操作 資料庫、介面...
              workPool.submit(() -> {});

              // 響應結果 200
              String response =
                  "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
              ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
              while (buffer.hasRemaining()) {
                ch.write(buffer);
              }
            }
          };
    }
  }

  // 始化channel,並且綁定一個eventLoop線程
  private void initAndRegister() throws Exception {
    // 1、 創建ServerSocketChannel
    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    // 2、 將serverSocketChannel註冊到selector
    int index = new Random().nextInt(mainReactorThreads.length);
    mainReactorThreads[index].doStart();
    SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  }

  // 綁定埠
  private void bind() throws IOException {
    //  1、 正式綁定埠,對外服務
    serverSocketChannel.bind(new InetSocketAddress(8080));
    System.out.println("啟動完成,埠8080");
  }

  public static void main(String[] args) throws Exception {
    NIOReactor nioReactor = new NIOReactor();
    // 1、 創建main和sub兩組線程
    nioReactor.newGroup();
    // 2、 創建serverSocketChannel,註冊到mainReactor線程上的selector上
    nioReactor.initAndRegister();
    // 3、 為serverSocketChannel綁定埠
    nioReactor.bind();
  }
}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • -- 創建用戶,指定明文密碼create user 'rose'@'localhost' identified by 'rosepwd'; -- 查看用戶是否創建成功select user,host from mysql.user; -- 創建用戶,不設置密碼create user 'rose01' ...
  • state也就是vuex里的值,也即是整個vuex的狀態,而strict和state的設置有關,如果設置strict為true,那麼不能直接修改state里的值,只能通過mutation來設置 例1: 渲染如下: 當我們在控制台修改store.state.coun里的值時頁面會自動更新,例如: 此時 ...
  • 一、相對定位(position:relative) 1、相對定位:將盒子的position屬性設置為relative;可通過left、top、right、bottom設置偏移量。 相對定位基礎用法示例: 我們先在頁面設置兩個div盒子(第一個紅色;第二個藍色) 最初的位置 我們將第一個盒子進行相對定 ...
  • 1. 不依賴新舊值的watch 很多時候,我們監聽一個屬性,不會使用到改變前後的值,只是為了執行一些方法,這時可以使用字元串代替 2.立即執行watch 總所周知,watch是在監聽屬性改變時才會觸發,有些時候,我們希望在組件創建後watch能夠立即執行一次。 可能想到的的方法就是在create生命 ...
  • 在react頁面內嵌“微信二維碼”,實現PC端通過微信掃碼進行登錄。首先去微信開放平臺註冊一個賬號,創建一個網站應用,提交網站備案審核,獲取appid和appsecret;其他開發流程根據微信文檔來進行操作。 react頁面部分代碼,引入內嵌二維碼腳本,設置iframe標簽支持跨域,自定義二維碼樣式 ...
  • 自己總結的尚矽谷Angular課程筆記 1.入門介紹 1.1AngularJS是什麼? jQuery是js函數庫 Angular是Google開源的JS結構化框架 官網:https://angularjs.org/ 1.1.1AngularJS特性和優點 耦合度越低越好,避免牽一發而動全身的事情發生 ...
  • 第一種方式:使用H5的API dataTransfer 實現思路: 1.為將要拖拽的元素設置允許拖拽,並賦予dragstart事件將其id轉換成數據保存; 2.為容器添加dragover屬性添加事件阻止瀏覽器預設事件,允許元素放置,並賦予drop事件進行元素的放置。 代碼如下: 第二種方式:使用原生 ...
  • 1.1 持久化類的編寫規則 1.1.1 什麼是持久化類? 持久化類 : 與表建立了映射關係的實體類,就可以稱之為持久化類. 持久化類 = Java類 + 映射文件. 1.1.2 持久化類的編寫規則 (1): 提供無參數的構造方法 (2): 類中的成員都是私有的private (3): 對私有屬性提供... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...