Reactive 簡介

来源:https://www.cnblogs.com/cjsblog/archive/2020/03/25/12568502.html
-Advertisement-
Play Games

1. 概念 Reactive 非常適合低延遲、高吞吐量的工作負載。 Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制) Reactive Streams 為無阻塞背壓的非同步流處理提供標準。 Reactor 是基 ...


1. 概念

Reactive 非常適合低延遲、高吞吐量的工作負載。

Reactive Processing 是一種範式(規範),它使開發人員能夠構建非阻塞的、非同步的應用程式,這些應用程式能夠處理背壓(流控制)

Reactive Streams 為無阻塞背壓的非同步流處理提供標準。

Reactor 是基於Reactive Streams規範的第四代響應庫,用於在JVM上構建非阻塞的應用程式。

Project Reactor 是一個完全無阻塞的基礎,其中包括背壓支持。它是Spring生態系統中的響應式堆棧的基礎,並且在諸如Spring WebFlux,Spring Data和Spring Cloud Gateway等項目中都有它的身影。利用Project Reactor可以高效的響應式系統。剛纔說Reactive Streams是規範,那麼Project Reactor就是實現。

2. 響應式編程

響應式編程是一種非同步編程風格,它關註數據流和變化的傳播。

響應式編程是一種與數據流和變化傳播相關的聲明式編程範式。使用此範例,可以輕鬆地表示靜態(例如,數組)或動態(例如,事件發射器)數據流,並且還可以表示關聯執行模型中的推斷出的依賴關係,這有助於更改數據流的自動傳播。 

reactive programming   (響應式編程)

imperative programming(命令式編程)

在命令式編程中,a:=b+c意味著將b+c的結果賦值給a,並且此後b或c的值發生變化不會影響到a的值。而在響應式編程中,a的值會隨著b或c的改變而自動更新,並且不需要重新執行a:=b+c來確定當前分配給a的值。(PS:是不是很像angularjs、vuejs這種MVVM框架,視圖綁定模型,模型變了,視圖自動就跟著變了)

例如,在 model–view–controller (MVC) 架構中,響應式編程可以促進基礎模型中的更改,這些更改會自動反映在關聯的視圖中。

響應式編程與面向對象編程中通常使用的觀察者模式具有很多相似之處。

如果從推拉的角度來看的話,響應式編程是“推”,它主動將變化推送給它的訂閱者。Publisher-Subscriber是兩個非常重要的概念。

想象一下,數據流從源出發,經過一個一個節點的處理,最終達到目的地。節點就相當於操作符,處理完了以後就將流發射出去,到下一個節點再執行再發射。

我總覺得這個流程很眼熟,很像 Apache Storm 的處理方式。在一個拓撲結構中,數據流從Spout發出,經過若幹bolt的處理,最終彙集到某個地方。

還有一種理解,我覺得也很不錯,說響應式編程是一種通過非同步和數據流來構建事務關係的編程模型。事物可以理解程一次處理過程,一次執行過程。響應式編程就是要構建關係,事務和事務之間的關係。而數據流就像是一個橋梁一樣,數據流從一個事務流向下一個事務。

想象一下,長江流經宜賓、瀘州、重慶、涪陵、萬州、宜昌、荊州、武漢、黃石、鄂州、九江、安慶、銅陵、蕪湖、南京、上海,最終匯入東海。

就像CompleteFuture把Future進行編排一樣。

本質來講,響應式編程上是對數據流或某種變化所作出的反應,但是這個變化什麼時候發生是未知的,所以他是一種基於非同步、回調的方式在處理問題

3. NIO

NIO(Non-Blocking I/O)

BIO(Blocking I/O)

在經典的線程模型中,socket.accept()、socket.read()、socket.write()三個主要函數都是同步阻塞的,當一個連接在處理I/O的時候,系統是阻塞的,如果使用單線程的話就阻塞在那裡了,但CPU是並沒有阻塞,如果用多線程的話,就可以讓CPU去處理更多的事情。其實這也是所有使用多線程的本質: 當I/O阻塞系統,但CPU空閑的時候,可以利用多線程使用CPU資源。然而,線程的創建、銷毀、切換成本都是很高的。

事實上,所有的系統I/O都分為兩個階段:等待就緒和操作。舉例來說,讀函數,分為等待系統可讀和真正的讀;同理,寫函數分為等待網卡可以寫和真正的寫。

需要說明的是等待就緒的阻塞是不使用CPU的,是在“空等”;而真正的讀寫操作的阻塞是使用CPU的,真正在”幹活”。

以socket.read()為例子:

傳統的BIO裡面socket.read(),如果TCP RecvBuffer里沒有數據,函數會一直阻塞,直到收到數據,返回讀到的數據。

對於NIO,如果TCP RecvBuffer有數據,就把數據從網卡讀到記憶體,並且返回給用戶;反之則直接返回0,永遠不會阻塞。 

在BIO模型中,沒有辦法知道到底能不能寫、能不能讀,只能”傻等”。而在NIO模型中,如果一個連接不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來,記錄的方式通常是在Selector上註冊標記位,然後切換到其它就緒的連接(channel)繼續進行讀寫。

NIO的主要事件有幾個:讀就緒、寫就緒、有新連接到來。那麼,首先需要註冊當這幾個事件到來的時候所對應的處理器,然後在合適的時機告訴事件選擇器:我對這個事件感興趣,最後用一個死迴圈選擇就緒的事件。select是阻塞的,所以你可以放心大膽地在一個while(true)裡面調用這個函數而不用擔心CPU空轉。

總結起來就是:註冊所有感興趣的事件處理器,單線程輪詢選擇就緒事件,執行事件處理器。

我們大概可以總結出NIO是怎麼解決掉線程的瓶頸並處理海量連接的:

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網路描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。並且由於線程的節約,連接數大的時候因為線程切換帶來的問題也隨之解決,進而為處理海量連接提供了可能。單線程處理I/O的效率確實非常高,沒有線程切換,只是拼命的讀、寫、選擇事件。但現在的伺服器,一般都是多核處理器,如果能夠利用多核心進行I/O,無疑對效率會有更大的提高。

 

Buffer(緩衝區)

在NIO中,所有數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,它也是寫入到緩衝區中的。 

Channel(通道)

通道是一個對象,通過它可以讀取和寫入數據,當然了所有數據都通過Buffer對象來處理。我們永遠不會將位元組直接寫入通道中,相反是將數據寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將數據從通道讀入緩衝區,再從緩衝區獲取這個位元組。

Selector(選擇器)

Selector類是NIO的核心類,Selector(選擇器)選擇器提供了選擇已經就緒的任務的能力。Selector會不斷的輪詢註冊在上面的所有channel,如果某個channel為讀寫等事件做好準備,那麼就處於就緒狀態,通過Selector可以不斷輪詢發現出就緒的channel,進行後續的IO操作。一個Selector能夠同時輪詢多個channel。這樣,一個單獨的線程就可以管理多個channel,從而管理多個網路連接。這樣就不用為每一個連接都創建一個線程,同時也避免了多線程之間上下文切換導致的開銷。

一個簡單的讀取文件的例子:

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.io.FileInputStream;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.FileChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Hello {
12 
13     public static void main(String[] args) throws Exception {
14         FileInputStream fis = new FileInputStream("/data.txt");
15         FileChannel channel = fis.getChannel();
16 
17         ByteBuffer buffer = ByteBuffer.allocate(10);
18 
19         while (true) {
20             if (channel.read(buffer) == -1) {
21                 break;
22             }
23             buffer.flip();
24             while (buffer.hasRemaining()) {
25                 System.out.print((char)buffer.get());
26             }
27             buffer.clear();
28         }
29 
30         channel.close();
31         fis.close();
32     }
33 } 

Server.java

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SelectionKey;
 6 import java.nio.channels.Selector;
 7 import java.nio.channels.ServerSocketChannel;
 8 import java.nio.channels.SocketChannel;
 9 import java.util.Iterator;
10 import java.util.Set;
11 
12 /**
13  * @author ChengJianSheng
14  * @date 2020-03-26
15  */
16 public class Server {
17     public static void main(String[] args) throws Exception {
18         //  創建一個Selector
19         Selector selector = Selector.open();
20 
21         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
22         serverSocketChannel.configureBlocking(false);
23         serverSocketChannel.bind(new InetSocketAddress(9000));
24 
25         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
26 
27         while (true) {
28             selector.select();
29 
30             Set<SelectionKey> selectedKeys = selector.selectedKeys();
31             Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
32             while (keyIterator.hasNext()) {
33                 SelectionKey key = keyIterator.next();
34                 if(key.isAcceptable()) {
35                     // a connection was accepted by a ServerSocketChannel.
36 
37                     System.out.println(1);
38                     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
39                     SocketChannel sc = ssc.accept();
40                     sc.configureBlocking(false);
41                     sc.register(selector, SelectionKey.OP_READ);
42                 } else if (key.isConnectable()) {
43                     // a connection was established with a remote server.
44                 } else if (key.isReadable()) {
45                     // a channel is ready for reading
46 
47                     System.out.println(2);
48                     SocketChannel socketChannel = (SocketChannel) key.channel();
49                     ByteBuffer buffer = ByteBuffer.allocate(1024);
50                     int len = 0;
51                     while ((len = socketChannel.read(buffer)) != -1) {
52                         buffer.flip();
53                         System.out.println(new String(buffer.array(), 0, len));
54                     }
55 
56                     socketChannel.close();
57                 } else if (key.isWritable()) {
58                     // a channel is ready for writing
59                 }
60 
61                 keyIterator.remove();
62             }
63         }
64     }
65 }

Client.java

 1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SocketChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Client {
12 
13     public static void main(String[] args) throws Exception {
14         SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9000));
15         socketChannel.configureBlocking(false);
16 
17         ByteBuffer buffer = ByteBuffer.allocate(1024);
18         String msg = "Hello, World!";
19         buffer.put(msg.getBytes());
20         buffer.flip();
21         socketChannel.write(buffer);
22 
23         socketChannel.close();
24     }
25 } 

關於Selector的用法

 1 Selector selector = Selector.open();
 2 
 3 channel.configureBlocking(false);
 4 
 5 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
 6 
 7 while(true) {
 8 
 9     int readyChannels = selector.selectNow();
10 
11     if(readyChannels == 0) continue;
12 
13 
14     Set<SelectionKey> selectedKeys = selector.selectedKeys();
15 
16     Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
17 
18     while(keyIterator.hasNext()) {
19 
20         SelectionKey key = keyIterator.next();
21 
22         if(key.isAcceptable()) {
23             // a connection was accepted by a ServerSocketChannel.
24 
25         } else if (key.isConnectable()) {
26             // a connection was established with a remote server.
27 
28         } else if (key.isReadable()) {
29             // a channel is ready for reading
30 
31         } else if (key.isWritable()) {
32             // a channel is ready for writing
33         }
34 
35         keyIterator.remove();
36     }
37 }

 

參考:

https://spring.io/reactive

https://www.jianshu.com/p/d47835316016

https://www.cnblogs.com/haimishasha/p/10756448.html

https://tech.meituan.com/2016/11/04/nio.html 

 

牆裂推薦Java NIO教程

http://tutorials.jenkov.com/java-nio/index.html

http://tutorials.jenkov.com/java-nio/selectors.html

http://tutorials.jenkov.com/java-nio/server-socket-channel.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 問題引入:電腦里安裝了從官網下載的python3.8.0,。先使用了菜鳥教程的方法2安裝。https://www.runoob.com/numpy/numpy-install.html 發現產生錯誤。先是提示我的pip工具沒有更新到最新版本,於是根據提示更新後,發現仍有錯誤,於是放棄使用這方法里的命 ...
  • 需求 基於Spring, SpringMVC, Mybatis 實現一個類似京東商城的3C電子商城系統, 能夠實現商品管理與展示, 加入購物車, 支付購買等功能 運行環境 jdk1.8,tomcat8.5,mysql5.6,EclispseEE 項目技術 spring springmvc, myba ...
  • 框架:具有很強的通用性,且封裝了一些通用實現方法的項目模板 (非同步框架): 高性能的網路請求 高性能的數據解析 高性能的持久化存儲 高性能的全站數據爬取 高性能的深度爬取 高性能的分散式 Scrapy環境安裝 IOS和Linux windows 安裝完成後,輸入 測試一下,出現如下圖顯示,即安裝成功 ...
  • 一、ItemPipeLine 1.爬蟲提取出的數據存入item之後,item中保存的數據需要進一步處理,比如:清洗,去重,存儲等 2.pipeline需要process_item函數 (1)process_item​:spider提出來的item作為參數出入,同時傳入的還有spider;此方法是必須 ...
  • 目錄 1. "安裝VSCode應用程式" 2. "安裝相關插件" 1. "漢化插件" 2. "C++編輯器插件" 3. "編寫配置文件" 1. "tasks.json" 2. "launch.json" 3. "c_cpp_properties.json" 第一步、安裝VSCode應用程式 打開 " ...
  • 1. 在執行python程式時遇到 ‘ModuleNotFoundError: No module named 'xxxxx'’ : 例如: 圖片中以導入第三方的 'requests' 模塊為例,此報錯提示找不到requests模塊。在python中,有的 模塊是內置的(直接導入就能使用)有的模塊是 ...
  • 近期在開發過程中,因為項目開發環境連接的mysql資料庫是阿裡雲的資料庫,而阿裡雲的資料庫版本是5.6的。而測試環境的mysql是自己安裝的5.7。因此在開發過程中有小伙伴不註意寫了有關group by的sql語句。在開發環境中運行是正常的,而到了測試環境中就發現了異常。 原因分析:MySQL5.7 ...
  • 隨著分散式技術的普及和海量數據的增長,io的能力越來越重要,java提供的io模塊提供了足夠的擴展性來適應。 我是李福春,我在準備面試,今天的問題是: java中的io有哪幾種? java中的io分3類: 1,BIO ,即同步阻塞IO,對應java.io包提供的工具;基於流模型,雖然直觀,代碼實現也 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...