1. 概念 Reactive 非常適合低延遲、高吞吐量的工作負載。 Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制) Reactive Streams 為無阻塞背壓的非同步流處理提供標準。 Reactor 是基 ...
1. 概念
Reactive 非常適合低延遲、高吞吐量的工作負載。
Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制)
Reactive Streams 為無阻塞背壓的非同步流處理提供標準。
Reactor 是基於Reactive Streams規範的第四代響應庫,用於在JVM上構建非阻塞的應用程式。
Project Reactor 是一個完全無阻塞的基礎,其中包括背壓支持。它是Spring生態系統中的響應式堆棧的基礎,並且在諸如Spring WebFlux,Spring Data和Spring Cloud Gateway等項目中都有它的身影。利用Project Reactor可以高效的響應式系統。剛纔說Reactive Streams是規範,那麼Project Reactor就是實現。
2. 響應式編程
響應式編程是一種非同步編程風格,它關註數據流和變化的傳播。
響應式編程是一種與數據流和變化傳播相關的聲明式編程範式。使用此範例,可以輕鬆地表示靜態(例如,數組)或動態(例如,事件發射器)數據流,並且還可以表示關聯執行模型中的推斷出的依賴關係,這有助於更改數據流的自動傳播。
reactive programming (響應式編程)
imperative programming(命令式編程)
在命令式編程中,a:=b+c意味著將b+c的結果賦值給a,並且此後b或c的值發生變化不會影響到a的值。而在響應式編程中,a的值會隨著b或c的改變而自動更新,並且不需要重新執行a:=b+c來確定當前分配給a的值。(PS:是不是很像angularjs、vuejs這種MVVM框架,視圖綁定模型,模型變了,視圖自動就跟著變了)
例如,在 model–view–controller (MVC) 架構中,響應式編程可以促進基礎模型中的更改,這些更改會自動反映在關聯的視圖中。
響應式編程與面向對象編程中通常使用的觀察者模式具有很多相似之處。
如果從推拉的角度來看的話,響應式編程是“推”,它主動將變化推送給它的訂閱者。Publisher-Subscriber是兩個非常重要的概念。
想象一下,數據流從源出發,經過一個一個節點的處理,最終達到目的地。節點就相當於操作符,處理完了以後就將流發射出去,到下一個節點再執行再發射。
我總覺得這個流程很眼熟,很像 Apache Storm 的處理方式。在一個拓撲結構中,數據流從Spout發出,經過若幹bolt的處理,最終彙集到某個地方。
還有一種理解,我覺得也很不錯,說響應式編程是一種通過非同步和數據流來構建事務關係的編程模型。事物可以理解程一次處理過程,一次執行過程。響應式編程就是要構建關係,事務和事務之間的關係。而數據流就像是一個橋梁一樣,數據流從一個事務流向下一個事務。
想象一下,長江流經宜賓、瀘州、重慶、涪陵、萬州、宜昌、荊州、武漢、黃石、鄂州、九江、安慶、銅陵、蕪湖、南京、上海,最終匯入東海。
就像CompleteFuture把Future進行編排一樣。
本質來講,響應式編程上是對數據流或某種變化所作出的反應,但是這個變化什麼時候發生是未知的,所以他是一種基於非同步、回調的方式在處理問題
3. NIO
NIO(Non-Blocking I/O)
BIO(Blocking I/O)
在經典的線程模型中,socket.accept()、socket.read()、socket.write()三個主要函數都是同步阻塞的,當一個連接在處理I/O的時候,系統是阻塞的,如果使用單線程的話就阻塞在那裡了,但CPU是並沒有阻塞,如果用多線程的話,就可以讓CPU去處理更多的事情。其實這也是所有使用多線程的本質: 當I/O阻塞系統,但CPU空閑的時候,可以利用多線程使用CPU資源。然而,線程的創建、銷毀、切換成本都是很高的。
事實上,所有的系統I/O都分為兩個階段:等待就緒和操作。舉例來說,讀函數,分為等待系統可讀和真正的讀;同理,寫函數分為等待網卡可以寫和真正的寫。
需要說明的是等待就緒的阻塞是不使用CPU的,是在“空等”;而真正的讀寫操作的阻塞是使用CPU的,真正在”幹活”。
以socket.read()為例子:
傳統的BIO裡面socket.read(),如果TCP RecvBuffer里沒有數據,函數會一直阻塞,直到收到數據,返回讀到的數據。
對於NIO,如果TCP RecvBuffer有數據,就把數據從網卡讀到記憶體,並且返回給用戶;反之則直接返回0,永遠不會阻塞。
在BIO模型中,沒有辦法知道到底能不能寫、能不能讀,只能”傻等”。而在NIO模型中,如果一個連接不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來,記錄的方式通常是在Selector上註冊標記位,然後切換到其它就緒的連接(channel)繼續進行讀寫。
NIO的主要事件有幾個:讀就緒、寫就緒、有新連接到來。那麼,首先需要註冊當這幾個事件到來的時候所對應的處理器,然後在合適的時機告訴事件選擇器:我對這個事件感興趣,最後用一個死迴圈選擇就緒的事件。select是阻塞的,所以你可以放心大膽地在一個while(true)裡面調用這個函數而不用擔心CPU空轉。
總結起來就是:註冊所有感興趣的事件處理器,單線程輪詢選擇就緒事件,執行事件處理器。
我們大概可以總結出NIO是怎麼解決掉線程的瓶頸並處理海量連接的:
NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。
NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。並且由於線程的節約,連接數大的時候因為線程切換帶來的問題也隨之解決,進而為處理海量連接提供了可能。單線程處理I/O的效率確實非常高,沒有線程切換,只是拼命的讀、寫、選擇事件。但現在的伺服器,一般都是多核處理器,如果能夠利用多核心進行I/O,無疑對效率會有更大的提高。
Buffer(緩衝區)
在NIO中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,它也是寫入到緩衝區中的。
Channel(通道)
通道是一個對象,通過它可以讀取和寫入數據,當然了所有數據都通過Buffer對象來處理。我們永遠不會將位元組直接寫入通道中,相反是將數據寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將數據從通道讀入緩衝區,再從緩衝區獲取這個位元組。
Selector(選擇器)
Selector類是NIO的核心類,Selector(選擇器)選擇器提供了選擇已經就緒的任務的能力。Selector會不斷的輪詢註冊在上面的所有channel,如果某個channel為讀寫等事件做好準備,那麼就處於就緒狀態,通過Selector可以不斷輪詢發現出就緒的channel,進行後續的IO操作。一個Selector能夠同時輪詢多個channel。這樣,一個單獨的線程就可以管理多個channel,從而管理多個網路連接。這樣就不用為每一個連接都創建一個線程,同時也避免了多線程之間上下文切換導致的開銷。
一個簡單的讀取文件的例子:
1 package com.cjs.example.restservice.nio;
2
3 import java.io.FileInputStream;
4 import java.nio.ByteBuffer;
5 import java.nio.channels.FileChannel;
6
7 /**
8 * @author ChengJianSheng
9 * @date 2020-03-26
10 */
11 public class Hello {
12
13 public static void main(String[] args) throws Exception {
14 FileInputStream fis = new FileInputStream("/data.txt");
15 FileChannel channel = fis.getChannel();
16
17 ByteBuffer buffer = ByteBuffer.allocate(10);
18
19 while (true) {
20 if (channel.read(buffer) == -1) {
21 break;
22 }
23 buffer.flip();
24 while (buffer.hasRemaining()) {
25 System.out.print((char)buffer.get());
26 }
27 buffer.clear();
28 }
29
30 channel.close();
31 fis.close();
32 }
33 }
Server.java
1 package com.cjs.example.restservice.nio;
2
3 import java.net.InetSocketAddress;
4 import java.nio.ByteBuffer;
5 import java.nio.channels.SelectionKey;
6 import java.nio.channels.Selector;
7 import java.nio.channels.ServerSocketChannel;
8 import java.nio.channels.SocketChannel;
9 import java.util.Iterator;
10 import java.util.Set;
11
12 /**
13 * @author ChengJianSheng
14 * @date 2020-03-26
15 */
16 public class Server {
17 public static void main(String[] args) throws Exception {
18 // 創建一個Selector
19 Selector selector = Selector.open();
20
21 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
22 serverSocketChannel.configureBlocking(false);
23 serverSocketChannel.bind(new InetSocketAddress(9000));
24
25 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
26
27 while (true) {
28 selector.select();
29
30 Set<SelectionKey> selectedKeys = selector.selectedKeys();
31 Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
32 while (keyIterator.hasNext()) {
33 SelectionKey key = keyIterator.next();
34 if(key.isAcceptable()) {
35 // a connection was accepted by a ServerSocketChannel.
36
37 System.out.println(1);
38 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
39 SocketChannel sc = ssc.accept();
40 sc.configureBlocking(false);
41 sc.register(selector, SelectionKey.OP_READ);
42 } else if (key.isConnectable()) {
43 // a connection was established with a remote server.
44 } else if (key.isReadable()) {
45 // a channel is ready for reading
46
47 System.out.println(2);
48 SocketChannel socketChannel = (SocketChannel) key.channel();
49 ByteBuffer buffer = ByteBuffer.allocate(1024);
50 int len = 0;
51 while ((len = socketChannel.read(buffer)) != -1) {
52 buffer.flip();
53 System.out.println(new String(buffer.array(), 0, len));
54 }
55
56 socketChannel.close();
57 } else if (key.isWritable()) {
58 // a channel is ready for writing
59 }
60
61 keyIterator.remove();
62 }
63 }
64 }
65 }
Client.java
1 package com.cjs.example.restservice.nio;
2
3 import java.net.InetSocketAddress;
4 import java.nio.ByteBuffer;
5 import java.nio.channels.SocketChannel;
6
7 /**
8 * @author ChengJianSheng
9 * @date 2020-03-26
10 */
11 public class Client {
12
13 public static void main(String[] args) throws Exception {
14 SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9000));
15 socketChannel.configureBlocking(false);
16
17 ByteBuffer buffer = ByteBuffer.allocate(1024);
18 String msg = "Hello, World!";
19 buffer.put(msg.getBytes());
20 buffer.flip();
21 socketChannel.write(buffer);
22
23 socketChannel.close();
24 }
25 }
關於Selector的用法
1 Selector selector = Selector.open();
2
3 channel.configureBlocking(false);
4
5 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
6
7 while(true) {
8
9 int readyChannels = selector.selectNow();
10
11 if(readyChannels == 0) continue;
12
13
14 Set<SelectionKey> selectedKeys = selector.selectedKeys();
15
16 Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
17
18 while(keyIterator.hasNext()) {
19
20 SelectionKey key = keyIterator.next();
21
22 if(key.isAcceptable()) {
23 // a connection was accepted by a ServerSocketChannel.
24
25 } else if (key.isConnectable()) {
26 // a connection was established with a remote server.
27
28 } else if (key.isReadable()) {
29 // a channel is ready for reading
30
31 } else if (key.isWritable()) {
32 // a channel is ready for writing
33 }
34
35 keyIterator.remove();
36 }
37 }
參考:
https://www.jianshu.com/p/d47835316016
https://www.cnblogs.com/haimishasha/p/10756448.html
https://tech.meituan.com/2016/11/04/nio.html
牆裂推薦Java NIO教程
http://tutorials.jenkov.com/java-nio/index.html
http://tutorials.jenkov.com/java-nio/selectors.html
http://tutorials.jenkov.com/java-nio/server-socket-channel.html