Reactive-MongoDB 非同步 Java Driver 解讀

来源:https://www.cnblogs.com/wefeng/archive/2019/10/22/11718970.html
-Advertisement-
Play Games

一、關於 非同步驅動 從3.0 版本開始,MongoDB 開始提供非同步方式的驅動(Java Async Driver),這為應用提供了一種更高性能的選擇。但實質上,使用同步驅動(Java Sync Driver)的項目也不在少數,或許是因為先入為主的原因(同步Driver的文檔說明更加的完善),又或者 ...


一、關於 非同步驅動

從3.0 版本開始,MongoDB 開始提供非同步方式的驅動(Java Async Driver),這為應用提供了一種更高性能的選擇。
但實質上,使用同步驅動(Java Sync Driver)的項目也不在少數,或許是因為先入為主的原因(同步Driver的文檔說明更加的完善),又或者是為了相容舊的 MongoDB 版本。
無論如何,由於 Reactive 的發展,未來使用非同步驅動應該是一個趨勢。

在使用 Async Driver 之前,需要對 Reactive 的概念有一些熟悉。

二、理解 Reactive (響應式)

響應式(Reactive)是一種非同步的、面向數據流的開發方式,最早是來自於.NET 平臺上的 Reactive Extensions 庫,隨後被擴展為各種編程語言的實現。
在著名的 Reactive Manifesto(響應式宣言) 中,對 Reactive 定義了四個特征:

reactive-specs

  • 及時響應(Responsive):系統能及時的響應請求。
  • 有韌性(Resilient):系統在出現異常時仍然可以響應,即支持容錯。
  • 有彈性(Elastic):在不同的負載下,系統可彈性伸縮來保證運行。
  • 消息驅動(Message Driven):不同組件之間使用非同步消息傳遞來進行交互,並確保松耦合及相互隔離。

在響應式宣言的所定義的這些系統特征中,無一不與響應式的流有若幹的關係,於是乎就有了 2013年發起的 響應式流規範(Reactive Stream Specification)。

https://www.reactive-streams.org/

其中,對於響應式流的處理環節又做瞭如下定義:

  • 具有處理無限數量的元素的能力,即允許流永不結束
  • 按序處理
  • 非同步地傳遞元素
  • 實現非阻塞的負壓(back-pressure)

Java 平臺則是在 JDK 9 版本上發佈了對 Reactive Streams 的支持。

下麵介紹響應式流的幾個關鍵介面:

  • Publisher
    Publisher 是數據的發佈者。Publisher 介面只有一個方法 subscribe,用於添加數據的訂閱者,也就是 Subscriber。
  • Subscriber
    Subscriber 是數據的訂閱者。Subscriber 介面有4個方法,都是作為不同事件的處理器。在訂閱者成功訂閱到發佈者之後,其 onSubscribe(Subscription s) 方法會被調用。
    Subscription 表示的是當前的訂閱關係。

當訂閱成功後,可以使用 Subscription 的 request(long n) 方法來請求發佈者發佈 n 條數據。發佈者可能產生3種不同的消息通知,分別對應 Subscriber 的另外3個回調方法。

數據通知:對應 onNext 方法,表示發佈者產生的數據。
錯誤通知:對應 onError 方法,表示發佈者產生了錯誤。
結束通知:對應 onComplete 方法,表示發佈者已經完成了所有數據的發佈。
在上述3種通知中,錯誤通知和結束通知都是終結通知,也就是在終結通知之後,不會再有其他通知產生。

  • Subscription
    Subscription 表示的是一個訂閱關係。除了之前提到的 request 方法之外,還有 cancel 方法用來取消訂閱。需要註意的是,在 cancel 方法調用之後,發佈者仍然有可能繼續發佈通知。但訂閱最終會被取消。

這幾個介面的關係如下圖所示:

reactive interfaces

圖片出處:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

MongoDB 的非同步驅動為 mongo-java-driver-reactivestreams 組件,其實現了 Reactive Stream 的上述介面。

> 除了 reactivestream 之外,MongoDB 的非同步驅動還包含 RxJava 等風格的版本,有興趣的讀者可以進一步瞭解

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

三、使用示例

接下來,通過一個簡單的例子來演示一下 Reactive 方式的代碼風格:

A. 引入依賴


    org.mongodb
    mongodb-driver-reactivestreams
    1.11.0

> 引入mongodb-driver-reactivestreams 將會自動添加 reactive-streams, bson, mongodb-driver-async組件

B. 連接資料庫

//伺服器實例表
List servers = new ArrayList();
servers.add(new ServerAddress("localhost", 27018));

//配置構建器
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

//傳入伺服器實例
settingsBuilder.applyToClusterSettings(
        builder -> builder.hosts(servers));

//構建 Client 實例
MongoClient mongoClient = MongoClients.create(settingsBuilder.build());

C. 實現文檔查詢

//獲得資料庫對象
MongoDatabase database = client.getDatabase(databaseName);

//獲得集合
MongoCollection collection = database.getCollection(collectionName);

//非同步返回Publisher
FindPublisher publisher = collection.find();

//訂閱實現
publisher.subscribe(new Subscriber() {
    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("start...");
        //執行請求
        s.request(Integer.MAX_VALUE);

    }
    @Override
    public void onNext(Document document) {
        //獲得文檔
        System.out.println("Document:" + document.toJson());
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("error occurs.");
    }

    @Override
    public void onComplete() {
        System.out.println("finished.");
    }
});

註意到,與使用同步驅動不同的是,collection.find()方法返回的不是 Cursor,而是一個 FindPublisher對象,這是Publisher介面的一層擴展。
而且,在返回 Publisher 對象時,此時並沒有產生真正的資料庫IO請求。 真正發起請求需要通過調用 Subscription.request()方法。
在上面的代碼中,為了讀取由 Publisher 產生的結果,通過自定義一個Subscriber,在onSubscribe 事件觸發時就執行 資料庫的請求,之後分別對 onNext、onError、onComplete進行處理。

儘管這種實現方式是純非同步的,但在使用上比較繁瑣。試想如果對於每個資料庫操作都要完成一個Subscriber 邏輯,那麼開發的工作量是巨大的。

為了儘可能復用重覆的邏輯,可以對Subscriber的邏輯做一層封裝,包含如下功能:

  • 使用 List 容器對請求結果進行緩存
  • 實現阻塞等待結果的方法,可指定超時時間
  • 捕獲異常,在等待結果時拋出

代碼如下:

public class ObservableSubscriber implements Subscriber {

    //響應數據
    private final List received;
    //錯誤信息
    private final List errors;
    //等待對象
    private final CountDownLatch latch;
    //訂閱器
    private volatile Subscription subscription;
    //是否完成
    private volatile boolean completed;

    public ObservableSubscriber() {
        this.received = new ArrayList();
        this.errors = new ArrayList();
        this.latch = new CountDownLatch(1);
    }

    @Override
    public void onSubscribe(final Subscription s) {
        subscription = s;
    }

    @Override
    public void onNext(final T t) {
        received.add(t);
    }

    @Override
    public void onError(final Throwable t) {
        errors.add(t);
        onComplete();
    }

    @Override
    public void onComplete() {
        completed = true;
        latch.countDown();
    }

    public Subscription getSubscription() {
        return subscription;
    }

    public List getReceived() {
        return received;
    }

    public Throwable getError() {
        if (errors.size() > 0) {
            return errors.get(0);
        }
        return null;
    }

    public boolean isCompleted() {
        return completed;
    }

    /**
     * 阻塞一定時間等待結果
     *
     * @param timeout
     * @param unit
     * @return
     * @throws Throwable
     */
    public List get(final long timeout, final TimeUnit unit) throws Throwable {
        return await(timeout, unit).getReceived();
    }

    /**
     * 一直阻塞等待請求完成
     *
     * @return
     * @throws Throwable
     */
    public ObservableSubscriber await() throws Throwable {
        return await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    /**
     * 阻塞一定時間等待完成
     *
     * @param timeout
     * @param unit
     * @return
     * @throws Throwable
     */
    public ObservableSubscriber await(final long timeout, final TimeUnit unit) throws Throwable {
        subscription.request(Integer.MAX_VALUE);
        if (!latch.await(timeout, unit)) {
            throw new MongoTimeoutException("Publisher onComplete timed out");
        }
        if (!errors.isEmpty()) {
            throw errors.get(0);
        }
        return this;
    }
}

藉助這個基礎的工具類,我們對於文檔的非同步操作就變得簡單多了。
比如對於文檔查詢的操作可以改造如下:

ObservableSubscriber subscriber = new ObservableSubscriber();
collection.find().subscribe(subscriber);

//結果處理
subscriber.get(15, TimeUnit.SECONDS).forEach( d -> {
    System.out.println("Document:" + d.toJson());
});

當然,這個例子還有可以繼續完善,比如使用 List 作為緩存,則要考慮數據量的問題,避免將全部(或超量) 的文檔一次性轉入記憶體。

原文地址:https://www.mongochina.com/article/655.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • mysql查看資料庫性能常用命令 mysql> show global status; 可以列出MySQL伺服器運行各種狀態值,另外,查詢MySQL伺服器配置信息語句: mysql> show variables; 一、慢查詢 mysql> show variables like '%slow%'; ...
  • 之前寫過一篇博客“SQL Server中是否可以準確獲取最後一次索引重建的時間?“,裡面主要講述了三個問題:我們能否找到索引的創建時間?最後一次索引重建(Index Rebuild)的時間? 最後一次索引重組(INDEX REORGANIZE)的時間呢?,當時得出的結論,答案是我們無法準確的找到索引... ...
  • server has gone away: 如下圖 執行以下命令 @@GLOBAL.GTID_PURGED can only be set when @@GLOBAL.GTID_MODE = ON: 如下圖 修改要導入的sql數據文件(data.sql),註釋或者去掉類似以下內容的代碼: SET @ ...
  • 有兩張表,info1, info2 。 info1: info2: 現在,要用info2中的數據更新info1中對應的學生信息,sql語句如下: 運行結果如下: 更新過的info1: 至於效率問題,之前我有三張表,都在40萬左右。需要將 table2 中的兩個欄位(step1),table3 中的一 ...
  • 1、 在百度搜索mysql,點擊mysql官網上下載mysql的地址 在url直接輸入mysql的下載地址也可以:https://dev.mysql.com/downloads/mysql/ 如圖: 因為下載的是免費版,所有隻選擇 Community_Server 點開後,在“Select Oper ...
  • ########################## 今天來說一下MySQL資料庫的一些基本操作 ########################## 1.創建資料庫 create database db1; //db1是資料庫名 2.查看當前存在的資料庫 show database db1; 3. ...
  • 工作中oracle資料庫安裝完成後,需要修改預設的密碼有效期,預設為180天,如果不修改,到期忘記更改密碼可能會造成不必要的影響。 兩種修改方式pl/sql或者cmd視窗 1、pl/sql修改預設密碼有效期 通過語句查詢可以發現,預設有效期為6個月。執行下行語句,更改有效期為不限制 2、使用cmd窗 ...
  • 在SQL Server中,我們經常遇到一些需求,需要去搜索存儲過程(Procedure)、函數(Function)等對象是否包含某個對象或涉及某個對象,例如,我需要查找那些存儲過程、函數是否調用了鏈接伺服器(LINKED SERVER),我們如果從sys.sql_modules去搜索的話,如果有多個... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...