Reactive 簡介

来源:https://www.cnblogs.com/cjsblog/archive/2020/03/25/12568502.html

1. 概念 Reactive 非常適合低延遲、高吞吐量的工作負載。 Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制) Reactive Streams 為無阻塞背壓的非同步流處理提供標準。 Reactor 是基 ...


1. 概念

Reactive 非常適合低延遲、高吞吐量的工作負載。

Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制)

Reactive Streams 為無阻塞背壓的非同步流處理提供標準。

Reactor 是基於Reactive Streams規範的第四代響應庫,用於在JVM上構建非阻塞的應用程式。

Project Reactor 是一個完全無阻塞的基礎,其中包括背壓支持。它是Spring生態系統中的響應式堆棧的基礎,並且在諸如Spring WebFlux,Spring Data和Spring Cloud Gateway等項目中都有它的身影。利用Project Reactor可以高效的響應式系統。剛纔說Reactive Streams是規範,那麼Project Reactor就是實現。

2. 響應式編程

響應式編程是一種非同步編程風格,它關註數據流和變化的傳播。

響應式編程是一種與數據流和變化傳播相關的聲明式編程範式。使用此範例,可以輕鬆地表示靜態(例如,數組)或動態(例如,事件發射器)數據流,並且還可以表示關聯執行模型中的推斷出的依賴關係,這有助於更改數據流的自動傳播。 

reactive programming   (響應式編程)

imperative programming(命令式編程)

在命令式編程中,a:=b+c意味著將b+c的結果賦值給a,並且此後b或c的值發生變化不會影響到a的值。而在響應式編程中,a的值會隨著b或c的改變而自動更新,並且不需要重新執行a:=b+c來確定當前分配給a的值。(PS:是不是很像angularjs、vuejs這種MVVM框架,視圖綁定模型,模型變了,視圖自動就跟著變了)

例如,在 model–view–controller (MVC) 架構中,響應式編程可以促進基礎模型中的更改,這些更改會自動反映在關聯的視圖中。

響應式編程與面向對象編程中通常使用的觀察者模式具有很多相似之處。

如果從推拉的角度來看的話,響應式編程是“推”,它主動將變化推送給它的訂閱者。Publisher-Subscriber是兩個非常重要的概念。

想象一下,數據流從源出發,經過一個一個節點的處理,最終達到目的地。節點就相當於操作符,處理完了以後就將流發射出去,到下一個節點再執行再發射。

我總覺得這個流程很眼熟,很像 Apache Storm 的處理方式。在一個拓撲結構中,數據流從Spout發出,經過若幹bolt的處理,最終彙集到某個地方。

還有一種理解,我覺得也很不錯,說響應式編程是一種通過非同步和數據流來構建事務關係的編程模型。事物可以理解程一次處理過程,一次執行過程。響應式編程就是要構建關係,事務和事務之間的關係。而數據流就像是一個橋梁一樣,數據流從一個事務流向下一個事務。

想象一下,長江流經宜賓、瀘州、重慶、涪陵、萬州、宜昌、荊州、武漢、黃石、鄂州、九江、安慶、銅陵、蕪湖、南京、上海,最終匯入東海。

就像CompleteFuture把Future進行編排一樣。

本質來講,響應式編程上是對數據流或某種變化所作出的反應,但是這個變化什麼時候發生是未知的,所以他是一種基於非同步、回調的方式在處理問題

3. NIO

NIO(Non-Blocking I/O)

BIO(Blocking I/O)

在經典的線程模型中,socket.accept()、socket.read()、socket.write()三個主要函數都是同步阻塞的,當一個連接在處理I/O的時候,系統是阻塞的,如果使用單線程的話就阻塞在那裡了,但CPU是並沒有阻塞,如果用多線程的話,就可以讓CPU去處理更多的事情。其實這也是所有使用多線程的本質: 當I/O阻塞系統,但CPU空閑的時候,可以利用多線程使用CPU資源。然而,線程的創建、銷毀、切換成本都是很高的。

事實上,所有的系統I/O都分為兩個階段:等待就緒和操作。舉例來說,讀函數,分為等待系統可讀和真正的讀;同理,寫函數分為等待網卡可以寫和真正的寫。

需要說明的是等待就緒的阻塞是不使用CPU的,是在“空等”;而真正的讀寫操作的阻塞是使用CPU的,真正在”幹活”。

以socket.read()為例子:

傳統的BIO裡面socket.read(),如果TCP RecvBuffer里沒有數據,函數會一直阻塞,直到收到數據,返回讀到的數據。

對於NIO,如果TCP RecvBuffer有數據,就把數據從網卡讀到記憶體,並且返回給用戶;反之則直接返回0,永遠不會阻塞。 

在BIO模型中,沒有辦法知道到底能不能寫、能不能讀,只能”傻等”。而在NIO模型中,如果一個連接不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來,記錄的方式通常是在Selector上註冊標記位,然後切換到其它就緒的連接(channel)繼續進行讀寫。

NIO的主要事件有幾個:讀就緒、寫就緒、有新連接到來。那麼,首先需要註冊當這幾個事件到來的時候所對應的處理器,然後在合適的時機告訴事件選擇器:我對這個事件感興趣,最後用一個死迴圈選擇就緒的事件。select是阻塞的,所以你可以放心大膽地在一個while(true)裡面調用這個函數而不用擔心CPU空轉。

總結起來就是:註冊所有感興趣的事件處理器,單線程輪詢選擇就緒事件,執行事件處理器。

我們大概可以總結出NIO是怎麼解決掉線程的瓶頸並處理海量連接的:

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。並且由於線程的節約,連接數大的時候因為線程切換帶來的問題也隨之解決,進而為處理海量連接提供了可能。單線程處理I/O的效率確實非常高,沒有線程切換,只是拼命的讀、寫、選擇事件。但現在的伺服器,一般都是多核處理器,如果能夠利用多核心進行I/O,無疑對效率會有更大的提高。

 

Buffer(緩衝區)

在NIO中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,它也是寫入到緩衝區中的。 

Channel(通道)

通道是一個對象,通過它可以讀取和寫入數據,當然了所有數據都通過Buffer對象來處理。我們永遠不會將位元組直接寫入通道中,相反是將數據寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將數據從通道讀入緩衝區,再從緩衝區獲取這個位元組。

Selector(選擇器)

Selector類是NIO的核心類,Selector(選擇器)選擇器提供了選擇已經就緒的任務的能力。Selector會不斷的輪詢註冊在上面的所有channel,如果某個channel為讀寫等事件做好準備,那麼就處於就緒狀態,通過Selector可以不斷輪詢發現出就緒的channel,進行後續的IO操作。一個Selector能夠同時輪詢多個channel。這樣,一個單獨的線程就可以管理多個channel,從而管理多個網路連接。這樣就不用為每一個連接都創建一個線程,同時也避免了多線程之間上下文切換導致的開銷。

一個簡單的讀取文件的例子:

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.io.FileInputStream;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.FileChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Hello {
12 
13     public static void main(String[] args) throws Exception {
14         FileInputStream fis = new FileInputStream("/data.txt");
15         FileChannel channel = fis.getChannel();
16 
17         ByteBuffer buffer = ByteBuffer.allocate(10);
18 
19         while (true) {
20             if (channel.read(buffer) == -1) {
21                 break;
22             }
23             buffer.flip();
24             while (buffer.hasRemaining()) {
25                 System.out.print((char)buffer.get());
26             }
27             buffer.clear();
28         }
29 
30         channel.close();
31         fis.close();
32     }
33 } 

Server.java

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SelectionKey;
 6 import java.nio.channels.Selector;
 7 import java.nio.channels.ServerSocketChannel;
 8 import java.nio.channels.SocketChannel;
 9 import java.util.Iterator;
10 import java.util.Set;
11 
12 /**
13  * @author ChengJianSheng
14  * @date 2020-03-26
15  */
16 public class Server {
17     public static void main(String[] args) throws Exception {
18         //  創建一個Selector
19         Selector selector = Selector.open();
20 
21         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
22         serverSocketChannel.configureBlocking(false);
23         serverSocketChannel.bind(new InetSocketAddress(9000));
24 
25         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
26 
27         while (true) {
28             selector.select();
29 
30             Set<SelectionKey> selectedKeys = selector.selectedKeys();
31             Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
32             while (keyIterator.hasNext()) {
33                 SelectionKey key = keyIterator.next();
34                 if(key.isAcceptable()) {
35                     // a connection was accepted by a ServerSocketChannel.
36 
37                     System.out.println(1);
38                     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
39                     SocketChannel sc = ssc.accept();
40                     sc.configureBlocking(false);
41                     sc.register(selector, SelectionKey.OP_READ);
42                 } else if (key.isConnectable()) {
43                     // a connection was established with a remote server.
44                 } else if (key.isReadable()) {
45                     // a channel is ready for reading
46 
47                     System.out.println(2);
48                     SocketChannel socketChannel = (SocketChannel) key.channel();
49                     ByteBuffer buffer = ByteBuffer.allocate(1024);
50                     int len = 0;
51                     while ((len = socketChannel.read(buffer)) != -1) {
52                         buffer.flip();
53                         System.out.println(new String(buffer.array(), 0, len));
54                     }
55 
56                     socketChannel.close();
57                 } else if (key.isWritable()) {
58                     // a channel is ready for writing
59                 }
60 
61                 keyIterator.remove();
62             }
63         }
64     }
65 }

Client.java

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SocketChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Client {
12 
13     public static void main(String[] args) throws Exception {
14         SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9000));
15         socketChannel.configureBlocking(false);
16 
17         ByteBuffer buffer = ByteBuffer.allocate(1024);
18         String msg = "Hello, World!";
19         buffer.put(msg.getBytes());
20         buffer.flip();
21         socketChannel.write(buffer);
22 
23         socketChannel.close();
24     }
25 } 

關於Selector的用法

 1 Selector selector = Selector.open();
 2 
 3 channel.configureBlocking(false);
 4 
 5 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
 6 
 7 while(true) {
 8 
 9     int readyChannels = selector.selectNow();
10 
11     if(readyChannels == 0) continue;
12 
13 
14     Set<SelectionKey> selectedKeys = selector.selectedKeys();
15 
16     Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
17 
18     while(keyIterator.hasNext()) {
19 
20         SelectionKey key = keyIterator.next();
21 
22         if(key.isAcceptable()) {
23             // a connection was accepted by a ServerSocketChannel.
24 
25         } else if (key.isConnectable()) {
26             // a connection was established with a remote server.
27 
28         } else if (key.isReadable()) {
29             // a channel is ready for reading
30 
31         } else if (key.isWritable()) {
32             // a channel is ready for writing
33         }
34 
35         keyIterator.remove();
36     }
37 }

 

參考:

https://spring.io/reactive

https://www.jianshu.com/p/d47835316016

https://www.cnblogs.com/haimishasha/p/10756448.html

https://tech.meituan.com/2016/11/04/nio.html 

 

牆裂推薦Java NIO教程

http://tutorials.jenkov.com/java-nio/index.html

http://tutorials.jenkov.com/java-nio/selectors.html

http://tutorials.jenkov.com/java-nio/server-socket-channel.html


您的分享是我們最大的動力!

更多相關文章
  • 問題引入:電腦里安裝了從官網下載的python3.8.0,。先使用了菜鳥教程的方法2安裝。https://www.runoob.com/numpy/numpy-install.html 發現產生錯誤。先是提示我的pip工具沒有更新到最新版本,於是根據提示更新後,發現仍有錯誤,於是放棄使用這方法里的命 ...
  • 需求 基於Spring, SpringMVC, Mybatis 實現一個類似京東商城的3C電子商城系統, 能夠實現商品管理與展示, 加入購物車, 支付購買等功能 運行環境 jdk1.8,tomcat8.5,mysql5.6,EclispseEE 項目技術 spring springmvc, myba ...
  • 框架:具有很強的通用性,且封裝了一些通用實現方法的項目模板 (非同步框架): 高性能的網路請求 高性能的數據解析 高性能的持久化存儲 高性能的全站數據爬取 高性能的深度爬取 高性能的分散式 Scrapy環境安裝 IOS和Linux windows 安裝完成後,輸入 測試一下,出現如下圖顯示,即安裝成功 ...
  • 一、ItemPipeLine 1.爬蟲提取出的數據存入item之後,item中保存的數據需要進一步處理,比如:清洗,去重,存儲等 2.pipeline需要process_item函數 (1)process_item​:spider提出來的item作為參數出入,同時傳入的還有spider;此方法是必須 ...
  • 目錄 1. "安裝VSCode應用程式" 2. "安裝相關插件" 1. "漢化插件" 2. "C++編輯器插件" 3. "編寫配置文件" 1. "tasks.json" 2. "launch.json" 3. "c_cpp_properties.json" 第一步、安裝VSCode應用程式 打開 " ...
  • 1. 在執行python程式時遇到 ‘ModuleNotFoundError: No module named 'xxxxx'’ : 例如: 圖片中以導入第三方的 'requests' 模塊為例,此報錯提示找不到requests模塊。在python中,有的 模塊是內置的(直接導入就能使用)有的模塊是 ...
  • 近期在開發過程中,因為項目開發環境連接的mysql資料庫是阿裡雲的資料庫,而阿裡雲的資料庫版本是5.6的。而測試環境的mysql是自己安裝的5.7。因此在開發過程中有小伙伴不註意寫了有關group by的sql語句。在開發環境中運行是正常的,而到了測試環境中就發現了異常。 原因分析:MySQL5.7 ...
  • 隨著分散式技術的普及和海量數據的增長,io的能力越來越重要,java提供的io模塊提供了足夠的擴展性來適應。 我是李福春,我在準備面試,今天的問題是: java中的io有哪幾種? java中的io分3類: 1,BIO ,即同步阻塞IO,對應java.io包提供的工具;基於流模型,雖然直觀,代碼實現也 ...
一周排行
  • 在上篇文章中我們已經知道了多線程是什麼了,那麼它到底可以幹嘛呢?這裡特別聲明一個前面的委托沒看的同學可以到上上上篇博文查看,因為多線程要經常使用到委托。源碼 一、非同步、同步 1.同步(在計算的理解總是要你措不及防,同步當線程做完一件事情之後,才會執行後續動作),同步方法慢,只有一個線程執行,非同步方法 ...
  • 本文主要是講解stopwatch對程式運行時間的準確測量 僅僅介紹裡面的StartNew()方法,Restart()方法和ElapsedMilliseconds { get;}屬性 public void StartNew():作用是對新的 System.Diagnostics.Stopwatch ...
  • 一、引言 RabbitMQ是Rabbit Message Queue的簡寫,但不能僅僅理解其為消息隊列,消息代理更合適。RabbitMQ是一個由 Erlang 語言開發的AMQP(高級消息隊列協議)的開源實現,其內部結構如下: RabbitMQ作為一個消息代理,主要和消息打交道,負責接收並轉發消息。 ...
  • TerminalMACS(Terminal Manager And Check System) 遠程終端管理和檢測系統 本文同步更新地址:https://dotnet9.com/11429.html 一、本系統可監控多種終端資源: 移動端 Android iOS PC端 Windows Linux ...
  • 首先,好消息是Goole將於2020年2月份發佈Chrome 80版本。本次發佈將推進Google的“漸進改良Cookie”策略,打造一個更為安全和保障用戶隱私的網路環境。 壞消息是,本次更新可能導致瀏覽器無法向服務端發送Cookie。如果你有多個不同功能變數名稱的應用,部分用戶很有可能出現會話時常被打斷的 ...
  • 在偶然一次調試某程式時,遇到提示: 無法載入程式集*****.XmlSerializers.dll,文件找不到(Could not load file or assembly ****.XmlSerializers.dll , FileNotFoundException...)。於是嘗試在項目屬性中 ...
  • 在上一篇abp(net core)+easyui+efcore實現倉儲管理系統——入庫管理之五(四十一) 文章中實現了入庫管理的列表頁面,並實現了控制器的代碼。在今天我們學習如何在前端實現新增入庫單信息界面。 ...
  • 面向對象 面向對象是一個抽象的概念,其本質就是對事物以抽象的方式建立對應的模型。 簡單來講,比如我有一隻鋼筆,那麼我就可以通過分析,可以得到 這隻鋼筆的材第是塑料,品牌是個雜牌 ,裡面裝的墨是黑色的,可以用。這時候就能建立一個鋼筆的模型,它在這裡應該有這些屬性: 圖是一個不正確的UML類圖,但是可以 ...
  • 在ASP.NET MVC中有四種過濾器類型 Action 1、在ASP.NET MVC項目中,新建文件夾Filter,然後新建類MyCustormFilter,繼承自ActionFilterAttribute類,我們來看下ActionFilterAttribute類有如下四個方法,從命名我應該就可以 ...
  • 你需要瞭解的 HTTP Status Code Intro 現在前後端分離的開發模式越來越流行,後端負責開發對應的 API,前端只需要 關註前端頁面的數據展示和前端邏輯即可。 對於前後端分離這種開發模式,我個人還是比較喜歡的,因為這樣可以讓更專業的人做更專業的事情,後端專註於做 API 的開發設計, ...
x