Reactive 簡介

来源:https://www.cnblogs.com/cjsblog/archive/2020/03/25/12568502.html
-Advertisement-
Play Games

1. 概念 Reactive 非常適合低延遲、高吞吐量的工作負載。 Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制) Reactive Streams 為無阻塞背壓的非同步流處理提供標準。 Reactor 是基 ...


1. 概念

Reactive 非常適合低延遲、高吞吐量的工作負載。

Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制)

Reactive Streams 為無阻塞背壓的非同步流處理提供標準。

Reactor 是基於Reactive Streams規範的第四代響應庫,用於在JVM上構建非阻塞的應用程式。

Project Reactor 是一個完全無阻塞的基礎,其中包括背壓支持。它是Spring生態系統中的響應式堆棧的基礎,並且在諸如Spring WebFlux,Spring Data和Spring Cloud Gateway等項目中都有它的身影。利用Project Reactor可以高效的響應式系統。剛纔說Reactive Streams是規範,那麼Project Reactor就是實現。

2. 響應式編程

響應式編程是一種非同步編程風格,它關註數據流和變化的傳播。

響應式編程是一種與數據流和變化傳播相關的聲明式編程範式。使用此範例,可以輕鬆地表示靜態(例如,數組)或動態(例如,事件發射器)數據流,並且還可以表示關聯執行模型中的推斷出的依賴關係,這有助於更改數據流的自動傳播。 

reactive programming   (響應式編程)

imperative programming(命令式編程)

在命令式編程中,a:=b+c意味著將b+c的結果賦值給a,並且此後b或c的值發生變化不會影響到a的值。而在響應式編程中,a的值會隨著b或c的改變而自動更新,並且不需要重新執行a:=b+c來確定當前分配給a的值。(PS:是不是很像angularjs、vuejs這種MVVM框架,視圖綁定模型,模型變了,視圖自動就跟著變了)

例如,在 model–view–controller (MVC) 架構中,響應式編程可以促進基礎模型中的更改,這些更改會自動反映在關聯的視圖中。

響應式編程與面向對象編程中通常使用的觀察者模式具有很多相似之處。

如果從推拉的角度來看的話,響應式編程是“推”,它主動將變化推送給它的訂閱者。Publisher-Subscriber是兩個非常重要的概念。

想象一下,數據流從源出發,經過一個一個節點的處理,最終達到目的地。節點就相當於操作符,處理完了以後就將流發射出去,到下一個節點再執行再發射。

我總覺得這個流程很眼熟,很像 Apache Storm 的處理方式。在一個拓撲結構中,數據流從Spout發出,經過若幹bolt的處理,最終彙集到某個地方。

還有一種理解,我覺得也很不錯,說響應式編程是一種通過非同步和數據流來構建事務關係的編程模型。事物可以理解程一次處理過程,一次執行過程。響應式編程就是要構建關係,事務和事務之間的關係。而數據流就像是一個橋梁一樣,數據流從一個事務流向下一個事務。

想象一下,長江流經宜賓、瀘州、重慶、涪陵、萬州、宜昌、荊州、武漢、黃石、鄂州、九江、安慶、銅陵、蕪湖、南京、上海,最終匯入東海。

就像CompleteFuture把Future進行編排一樣。

本質來講,響應式編程上是對數據流或某種變化所作出的反應,但是這個變化什麼時候發生是未知的,所以他是一種基於非同步、回調的方式在處理問題

3. NIO

NIO(Non-Blocking I/O)

BIO(Blocking I/O)

在經典的線程模型中,socket.accept()、socket.read()、socket.write()三個主要函數都是同步阻塞的,當一個連接在處理I/O的時候,系統是阻塞的,如果使用單線程的話就阻塞在那裡了,但CPU是並沒有阻塞,如果用多線程的話,就可以讓CPU去處理更多的事情。其實這也是所有使用多線程的本質: 當I/O阻塞系統,但CPU空閑的時候,可以利用多線程使用CPU資源。然而,線程的創建、銷毀、切換成本都是很高的。

事實上,所有的系統I/O都分為兩個階段:等待就緒和操作。舉例來說,讀函數,分為等待系統可讀和真正的讀;同理,寫函數分為等待網卡可以寫和真正的寫。

需要說明的是等待就緒的阻塞是不使用CPU的,是在“空等”;而真正的讀寫操作的阻塞是使用CPU的,真正在”幹活”。

以socket.read()為例子:

傳統的BIO裡面socket.read(),如果TCP RecvBuffer里沒有數據,函數會一直阻塞,直到收到數據,返回讀到的數據。

對於NIO,如果TCP RecvBuffer有數據,就把數據從網卡讀到記憶體,並且返回給用戶;反之則直接返回0,永遠不會阻塞。 

在BIO模型中,沒有辦法知道到底能不能寫、能不能讀,只能”傻等”。而在NIO模型中,如果一個連接不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來,記錄的方式通常是在Selector上註冊標記位,然後切換到其它就緒的連接(channel)繼續進行讀寫。

NIO的主要事件有幾個:讀就緒、寫就緒、有新連接到來。那麼,首先需要註冊當這幾個事件到來的時候所對應的處理器,然後在合適的時機告訴事件選擇器:我對這個事件感興趣,最後用一個死迴圈選擇就緒的事件。select是阻塞的,所以你可以放心大膽地在一個while(true)裡面調用這個函數而不用擔心CPU空轉。

總結起來就是:註冊所有感興趣的事件處理器,單線程輪詢選擇就緒事件,執行事件處理器。

我們大概可以總結出NIO是怎麼解決掉線程的瓶頸並處理海量連接的:

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。並且由於線程的節約,連接數大的時候因為線程切換帶來的問題也隨之解決,進而為處理海量連接提供了可能。單線程處理I/O的效率確實非常高,沒有線程切換,只是拼命的讀、寫、選擇事件。但現在的伺服器,一般都是多核處理器,如果能夠利用多核心進行I/O,無疑對效率會有更大的提高。

 

Buffer(緩衝區)

在NIO中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,它也是寫入到緩衝區中的。 

Channel(通道)

通道是一個對象,通過它可以讀取和寫入數據,當然了所有數據都通過Buffer對象來處理。我們永遠不會將位元組直接寫入通道中,相反是將數據寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將數據從通道讀入緩衝區,再從緩衝區獲取這個位元組。

Selector(選擇器)

Selector類是NIO的核心類,Selector(選擇器)選擇器提供了選擇已經就緒的任務的能力。Selector會不斷的輪詢註冊在上面的所有channel,如果某個channel為讀寫等事件做好準備,那麼就處於就緒狀態,通過Selector可以不斷輪詢發現出就緒的channel,進行後續的IO操作。一個Selector能夠同時輪詢多個channel。這樣,一個單獨的線程就可以管理多個channel,從而管理多個網路連接。這樣就不用為每一個連接都創建一個線程,同時也避免了多線程之間上下文切換導致的開銷。

一個簡單的讀取文件的例子:

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.io.FileInputStream;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.FileChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Hello {
12 
13     public static void main(String[] args) throws Exception {
14         FileInputStream fis = new FileInputStream("/data.txt");
15         FileChannel channel = fis.getChannel();
16 
17         ByteBuffer buffer = ByteBuffer.allocate(10);
18 
19         while (true) {
20             if (channel.read(buffer) == -1) {
21                 break;
22             }
23             buffer.flip();
24             while (buffer.hasRemaining()) {
25                 System.out.print((char)buffer.get());
26             }
27             buffer.clear();
28         }
29 
30         channel.close();
31         fis.close();
32     }
33 } 

Server.java

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SelectionKey;
 6 import java.nio.channels.Selector;
 7 import java.nio.channels.ServerSocketChannel;
 8 import java.nio.channels.SocketChannel;
 9 import java.util.Iterator;
10 import java.util.Set;
11 
12 /**
13  * @author ChengJianSheng
14  * @date 2020-03-26
15  */
16 public class Server {
17     public static void main(String[] args) throws Exception {
18         //  創建一個Selector
19         Selector selector = Selector.open();
20 
21         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
22         serverSocketChannel.configureBlocking(false);
23         serverSocketChannel.bind(new InetSocketAddress(9000));
24 
25         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
26 
27         while (true) {
28             selector.select();
29 
30             Set<SelectionKey> selectedKeys = selector.selectedKeys();
31             Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
32             while (keyIterator.hasNext()) {
33                 SelectionKey key = keyIterator.next();
34                 if(key.isAcceptable()) {
35                     // a connection was accepted by a ServerSocketChannel.
36 
37                     System.out.println(1);
38                     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
39                     SocketChannel sc = ssc.accept();
40                     sc.configureBlocking(false);
41                     sc.register(selector, SelectionKey.OP_READ);
42                 } else if (key.isConnectable()) {
43                     // a connection was established with a remote server.
44                 } else if (key.isReadable()) {
45                     // a channel is ready for reading
46 
47                     System.out.println(2);
48                     SocketChannel socketChannel = (SocketChannel) key.channel();
49                     ByteBuffer buffer = ByteBuffer.allocate(1024);
50                     int len = 0;
51                     while ((len = socketChannel.read(buffer)) != -1) {
52                         buffer.flip();
53                         System.out.println(new String(buffer.array(), 0, len));
54                     }
55 
56                     socketChannel.close();
57                 } else if (key.isWritable()) {
58                     // a channel is ready for writing
59                 }
60 
61                 keyIterator.remove();
62             }
63         }
64     }
65 }

Client.java

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SocketChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Client {
12 
13     public static void main(String[] args) throws Exception {
14         SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9000));
15         socketChannel.configureBlocking(false);
16 
17         ByteBuffer buffer = ByteBuffer.allocate(1024);
18         String msg = "Hello, World!";
19         buffer.put(msg.getBytes());
20         buffer.flip();
21         socketChannel.write(buffer);
22 
23         socketChannel.close();
24     }
25 } 

關於Selector的用法

 1 Selector selector = Selector.open();
 2 
 3 channel.configureBlocking(false);
 4 
 5 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
 6 
 7 while(true) {
 8 
 9     int readyChannels = selector.selectNow();
10 
11     if(readyChannels == 0) continue;
12 
13 
14     Set<SelectionKey> selectedKeys = selector.selectedKeys();
15 
16     Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
17 
18     while(keyIterator.hasNext()) {
19 
20         SelectionKey key = keyIterator.next();
21 
22         if(key.isAcceptable()) {
23             // a connection was accepted by a ServerSocketChannel.
24 
25         } else if (key.isConnectable()) {
26             // a connection was established with a remote server.
27 
28         } else if (key.isReadable()) {
29             // a channel is ready for reading
30 
31         } else if (key.isWritable()) {
32             // a channel is ready for writing
33         }
34 
35         keyIterator.remove();
36     }
37 }

 

參考:

https://spring.io/reactive

https://www.jianshu.com/p/d47835316016

https://www.cnblogs.com/haimishasha/p/10756448.html

https://tech.meituan.com/2016/11/04/nio.html 

 

牆裂推薦Java NIO教程

http://tutorials.jenkov.com/java-nio/index.html

http://tutorials.jenkov.com/java-nio/selectors.html

http://tutorials.jenkov.com/java-nio/server-socket-channel.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 問題引入:電腦里安裝了從官網下載的python3.8.0,。先使用了菜鳥教程的方法2安裝。https://www.runoob.com/numpy/numpy-install.html 發現產生錯誤。先是提示我的pip工具沒有更新到最新版本,於是根據提示更新後,發現仍有錯誤,於是放棄使用這方法里的命 ...
  • 需求 基於Spring, SpringMVC, Mybatis 實現一個類似京東商城的3C電子商城系統, 能夠實現商品管理與展示, 加入購物車, 支付購買等功能 運行環境 jdk1.8,tomcat8.5,mysql5.6,EclispseEE 項目技術 spring springmvc, myba ...
  • 框架:具有很強的通用性,且封裝了一些通用實現方法的項目模板 (非同步框架): 高性能的網路請求 高性能的數據解析 高性能的持久化存儲 高性能的全站數據爬取 高性能的深度爬取 高性能的分散式 Scrapy環境安裝 IOS和Linux windows 安裝完成後,輸入 測試一下,出現如下圖顯示,即安裝成功 ...
  • 一、ItemPipeLine 1.爬蟲提取出的數據存入item之後,item中保存的數據需要進一步處理,比如:清洗,去重,存儲等 2.pipeline需要process_item函數 (1)process_item​:spider提出來的item作為參數出入,同時傳入的還有spider;此方法是必須 ...
  • 目錄 1. "安裝VSCode應用程式" 2. "安裝相關插件" 1. "漢化插件" 2. "C++編輯器插件" 3. "編寫配置文件" 1. "tasks.json" 2. "launch.json" 3. "c_cpp_properties.json" 第一步、安裝VSCode應用程式 打開 " ...
  • 1. 在執行python程式時遇到 ‘ModuleNotFoundError: No module named 'xxxxx'’ : 例如: 圖片中以導入第三方的 'requests' 模塊為例,此報錯提示找不到requests模塊。在python中,有的 模塊是內置的(直接導入就能使用)有的模塊是 ...
  • 近期在開發過程中,因為項目開發環境連接的mysql資料庫是阿裡雲的資料庫,而阿裡雲的資料庫版本是5.6的。而測試環境的mysql是自己安裝的5.7。因此在開發過程中有小伙伴不註意寫了有關group by的sql語句。在開發環境中運行是正常的,而到了測試環境中就發現了異常。 原因分析:MySQL5.7 ...
  • 隨著分散式技術的普及和海量數據的增長,io的能力越來越重要,java提供的io模塊提供了足夠的擴展性來適應。 我是李福春,我在準備面試,今天的問題是: java中的io有哪幾種? java中的io分3類: 1,BIO ,即同步阻塞IO,對應java.io包提供的工具;基於流模型,雖然直觀,代碼實現也 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...