什麼是Reactor模式 Reactor模式是一種設計模式,它是基於事件驅動的,可以併發的處理多個服務請求,當請求抵達後,依據多路復用策略,同步的派發這些請求至相關的請求處理程式。 Reactor模式角色構成 在早先的論文An Object Behavioral Pattern forDemulti ...
什麼是Reactor模式
Reactor模式是一種設計模式,它是基於事件驅動的,可以併發的處理多個服務請求,當請求抵達後,依據多路復用策略,同步的派發這些請求至相關的請求處理程式。
Reactor模式角色構成
在早先的論文An Object Behavioral Pattern for
Demultiplexing and Dispatching Handles for Synchronous Events中Reactor模式主要有五大角色組成,分別如下:
Handle:操作系統提供的一種資源,用於表示一個個的事件,在網路編程中可以是一個連接事件,一個讀取事件,一個寫入事件,Handle是事件產生的發源地
Synchronous Event Demultiplexer:本質上是一個系統調用,用於等待事件的發生,調用方在調用它的時候會被阻塞,一直阻塞到同步事件分離器上有事件產生為止
Initiation Dispatcher:定義了一些用於控制事件的調度方式的規範,提供對事件管理。它本身是整個事件處理器的核心所在,Initiation Dispatcher會通過Synchronous Event Demultiplexer來等待事件的發生。一旦事件發生,Initiation Dispatcher首先會分離出每一個事件,然後調用事件處理器,最後調用相關的回調方法來處理這些事件
Event Handler:定義事件處理方法以供InitiationDispatcher回調使用
Concrete Event Handler:是事件處理器的實現。它本身實現了事件處理器所提供的各種回調方法,從而實現了特定於業務的邏輯。它本質上就是我們所編寫的一個個的處理器實現。
Reactor模式實現流程
- 初始化 Initiation Dispatcher,然後將若幹個Concrete Event Handler註冊到 Initiation Dispatcher中,應用會標識出該事件處理器希望Initiation Dispatcher在某些事件發生時向其發出通知
- Initiation Dispatcher 會要求每個事件處理器向其傳遞內部的Handle,該Handle向操作系統標識了事件處理器
- 當所有的Concrete Event Handler都註冊完畢後,就會啟動 Initiation Dispatcher的事件迴圈,使用Synchronous Event Demultiplexer同步阻塞的等待事件的發生
- 當與某個事件源對應的Handle變為ready狀態時,Synchronous Event Demultiplexer就會通知 Initiation Dispatcher
- Initiation Dispatcher會觸發事件處理器的回調方法響應這個事件
Java NIO對Reactor的實現
在Java的NIO中,對Reactor模式有無縫的支持,即使用Selector類封裝了操作系統提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的作者)在Scalable IO in Java中對此有非常詳細的描述。概況來說其主要流程如下:
- 伺服器端的Reactor線程對象會啟動事件迴圈,並使用Selector來實現IO的多路復用
- 註冊Acceptor事件處理器到Reactor中,Acceptor事件處理器所關註的事件是ACCEPT事件,這樣Reactor會監聽客戶端向伺服器端發起的連接請求事件
- 客戶端向伺服器端發起連接請求,Reactor監聽到了該ACCEPT事件的發生並將該ACCEPT事件派發給相應的Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客戶端對應的連接(SocketChannel),然後將該連接所關註的READ/WRITE事件以及對應的READ/WRITE事件處理器註冊到Reactor中,這樣一來Reactor就會監聽該連接的READ/WRITE事件了。
- 當Reactor監聽到有讀或者寫事件發生時,將相關的事件派發給對應的處理器進行處理
- 每當處理完所有就緒的感興趣的I/O事件後,Reactor線程會再次執行select()阻塞等待新的事件就緒並將其分派給對應處理器進行處理
Doug Lea 在Scalable IO in Java中分別描述了單線程的Reactor,多線程模式的Reactor以及多Reactor線程模式。
單線程的Reactor,主要依賴Java NIO中的Channel,Buffer,Selector,SelectionKey。在單線程Reactor模式中,不僅I/O操作在該Reactor線程上,連非I/O的業務操作也在該線程上進行處理了,這可能會大大延遲I/O請求的響應
在多線程Reactor中添加了一個工作線程池,將非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至於因為一些耗時的業務邏輯而延遲對後面I/O請求的處理,但是所有的I/O操作依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操作
多Reactor線程模式將“接受客戶端的連接請求”和“與該客戶端的通信”分在了兩個Reactor線程來完成。mainReactor完成接收客戶端連接請求的操作,它不負責與客戶端的通信,而是將建立好的連接轉交給subReactor線程來完成與客戶端的通信,這樣一來就不會因為read()數據量太大而導致後面的客戶端連接請求得不到即時處理的情況。並且多Reactor線程模式在海量的客戶端併發請求的情況下,還可以通過實現subReactor線程池來將海量的連接分發給多個subReactor線程,在多核的操作系統中這能大大提升應用的負載和吞吐量
代碼示例:
// NIO selector 多路復用reactor線程模型
public class NIOReactor {
// 處理業務操作的線程池
private static ExecutorService workPool = Executors.newCachedThreadPool();
// 封裝了selector.select()等事件輪詢的代碼
abstract class ReactorThread extends Thread {
Selector selector;
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
volatile boolean running = false;
private ReactorThread() throws IOException {
selector = Selector.open();
}
// Selector監聽到有事件後,調用這個方法
public abstract void handler(SelectableChannel channel) throws Exception;
@Override
public void run() {
// 輪詢Selector事件
while (running) {
try {
// 執行隊列中的任務
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);
// 獲取查詢結果
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍歷查詢結果
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
// 被封裝的查詢結果
SelectionKey selectionKey = keyIterator.next();
keyIterator.remove();
int readyOps = selectionKey.readyOps();
// 關註 Read 和 Accept兩個事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
|| readyOps == 0) {
try {
SelectableChannel channel = (SelectableChannel) selectionKey.attachment();
channel.configureBlocking(false);
handler(channel);
// 如果關閉了,就取消這個KEY的訂閱
if (!channel.isOpen()) {
selectionKey.cancel();
}
} catch (Exception e) {
// 如果有異常,就取消這個KEY的訂閱
selectionKey.cancel();
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private SelectionKey register(SelectableChannel channel) throws Exception {
// 為什麼register要以任務提交的形式,讓reactor線程去處理?
// 因為線程在執行channel註冊到selector的過程中,會和調用selector.select()方法的線程爭用同一把鎖
// 而select()方法實在eventLoop中通過while迴圈調用的,爭搶的可能性很高,
// 為了讓register能更快的執行,就放到同一個線程來處理
FutureTask<SelectionKey> futureTask =
new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);
return futureTask.get();
}
private void doStart() {
if (!running) {
running = true;
start();
}
}
}
private ServerSocketChannel serverSocketChannel;
// 1、創建多個線程 - accept處理reactor線程 (accept線程)
private ReactorThread[] mainReactorThreads = new ReactorThread[1];
// 2、創建多個線程 - io處理reactor線程 (I/O線程)
private ReactorThread[] subReactorThreads = new ReactorThread[8];
// 初始化線程組
private void newGroup() throws IOException {
// 創建mainReactor線程, 只負責處理serverSocketChannel
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] =
new ReactorThread() {
AtomicInteger incr = new AtomicInteger(0);
@Override
public void handler(SelectableChannel channel) throws Exception {
// 只做請求分發,不做具體的數據讀取
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);
// 收到連接建立的通知之後,分發給I/O線程繼續去讀取數據
int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(
Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
}
};
}
// 創建IO線程,負責處理客戶端連接以後socketChannel的IO讀寫
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] =
new ReactorThread() {
@Override
public void handler(SelectableChannel channel) throws Exception {
// work線程只負責處理IO處理,不處理accept事件
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) return; // 如果沒數據了, 則不繼續後面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(
Thread.currentThread().getName() + "收到數據,來自:" + ch.getRemoteAddress());
// TODO 業務操作 資料庫、介面...
workPool.submit(() -> {});
// 響應結果 200
String response =
"HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
}
// 始化channel,並且綁定一個eventLoop線程
private void initAndRegister() throws Exception {
// 1、 創建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2、 將serverSocketChannel註冊到selector
int index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
// 綁定埠
private void bind() throws IOException {
// 1、 正式綁定埠,對外服務
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("啟動完成,埠8080");
}
public static void main(String[] args) throws Exception {
NIOReactor nioReactor = new NIOReactor();
// 1、 創建main和sub兩組線程
nioReactor.newGroup();
// 2、 創建serverSocketChannel,註冊到mainReactor線程上的selector上
nioReactor.initAndRegister();
// 3、 為serverSocketChannel綁定埠
nioReactor.bind();
}
}
-
-- 創建用戶,指定明文密碼create user 'rose'@'localhost' identified by 'rosepwd'; -- 查看用戶是否創建成功select user,host from mysql.user; -- 創建用戶,不設置密碼create user 'rose01' ...
-
state也就是vuex里的值,也即是整個vuex的狀態,而strict和state的設置有關,如果設置strict為true,那麼不能直接修改state里的值,只能通過mutation來設置 例1: 渲染如下: 當我們在控制台修改store.state.coun里的值時頁面會自動更新,例如: 此時 ...
-
一、相對定位(position:relative) 1、相對定位:將盒子的position屬性設置為relative;可通過left、top、right、bottom設置偏移量。 相對定位基礎用法示例: 我們先在頁面設置兩個div盒子(第一個紅色;第二個藍色) 最初的位置 我們將第一個盒子進行相對定 ...
-
1. 不依賴新舊值的watch 很多時候,我們監聽一個屬性,不會使用到改變前後的值,只是為了執行一些方法,這時可以使用字元串代替 2.立即執行watch 總所周知,watch是在監聽屬性改變時才會觸發,有些時候,我們希望在組件創建後watch能夠立即執行一次。 可能想到的的方法就是在create生命 ...
-
在react頁面內嵌“微信二維碼”,實現PC端通過微信掃碼進行登錄。首先去微信開放平臺註冊一個賬號,創建一個網站應用,提交網站備案審核,獲取appid和appsecret;其他開發流程根據微信文檔來進行操作。 react頁面部分代碼,引入內嵌二維碼腳本,設置iframe標簽支持跨域,自定義二維碼樣式 ...
-
自己總結的尚矽谷Angular課程筆記 1.入門介紹 1.1AngularJS是什麼? jQuery是js函數庫 Angular是Google開源的JS結構化框架 官網:https://angularjs.org/ 1.1.1AngularJS特性和優點 耦合度越低越好,避免牽一發而動全身的事情發生 ...
-
第一種方式:使用H5的API dataTransfer 實現思路: 1.為將要拖拽的元素設置允許拖拽,並賦予dragstart事件將其id轉換成數據保存; 2.為容器添加dragover屬性添加事件阻止瀏覽器預設事件,允許元素放置,並賦予drop事件進行元素的放置。 代碼如下: 第二種方式:使用原生 ...
-
1.1 持久化類的編寫規則 1.1.1 什麼是持久化類? 持久化類 : 與表建立了映射關係的實體類,就可以稱之為持久化類. 持久化類 = Java類 + 映射文件. 1.1.2 持久化類的編寫規則 (1): 提供無參數的構造方法 (2): 類中的成員都是私有的private (3): 對私有屬性提供... ...