一、關於 非同步驅動 從3.0 版本開始,MongoDB 開始提供非同步方式的驅動(Java Async Driver),這為應用提供了一種更高性能的選擇。但實質上,使用同步驅動(Java Sync Driver)的項目也不在少數,或許是因為先入為主的原因(同步Driver的文檔說明更加的完善),又或者 ...
一、關於 非同步驅動
從3.0 版本開始,MongoDB 開始提供非同步方式的驅動(Java Async Driver),這為應用提供了一種更高性能的選擇。
但實質上,使用同步驅動(Java Sync Driver)的項目也不在少數,或許是因為先入為主的原因(同步Driver的文檔說明更加的完善),又或者是為了相容舊的 MongoDB 版本。
無論如何,由於 Reactive 的發展,未來使用非同步驅動應該是一個趨勢。
在使用 Async Driver 之前,需要對 Reactive 的概念有一些熟悉。
二、理解 Reactive (響應式)
響應式(Reactive)是一種非同步的、面向數據流的開發方式,最早是來自於.NET 平臺上的 Reactive Extensions 庫,隨後被擴展為各種編程語言的實現。
在著名的 Reactive Manifesto(響應式宣言) 中,對 Reactive 定義了四個特征:
- 及時響應(Responsive):系統能及時的響應請求。
- 有韌性(Resilient):系統在出現異常時仍然可以響應,即支持容錯。
- 有彈性(Elastic):在不同的負載下,系統可彈性伸縮來保證運行。
- 消息驅動(Message Driven):不同組件之間使用非同步消息傳遞來進行交互,並確保松耦合及相互隔離。
在響應式宣言的所定義的這些系統特征中,無一不與響應式的流有若幹的關係,於是乎就有了 2013年發起的 響應式流規範(Reactive Stream Specification)。
https://www.reactive-streams.org/
其中,對於響應式流的處理環節又做瞭如下定義:
- 具有處理無限數量的元素的能力,即允許流永不結束
- 按序處理
- 非同步地傳遞元素
- 實現非阻塞的負壓(back-pressure)
Java 平臺則是在 JDK 9 版本上發佈了對 Reactive Streams 的支持。
下麵介紹響應式流的幾個關鍵介面:
- Publisher
Publisher 是數據的發佈者。Publisher 介面只有一個方法 subscribe,用於添加數據的訂閱者,也就是 Subscriber。 - Subscriber
Subscriber 是數據的訂閱者。Subscriber 介面有4個方法,都是作為不同事件的處理器。在訂閱者成功訂閱到發佈者之後,其 onSubscribe(Subscription s) 方法會被調用。
Subscription 表示的是當前的訂閱關係。
當訂閱成功後,可以使用 Subscription 的 request(long n) 方法來請求發佈者發佈 n 條數據。發佈者可能產生3種不同的消息通知,分別對應 Subscriber 的另外3個回調方法。
數據通知:對應 onNext 方法,表示發佈者產生的數據。
錯誤通知:對應 onError 方法,表示發佈者產生了錯誤。
結束通知:對應 onComplete 方法,表示發佈者已經完成了所有數據的發佈。
在上述3種通知中,錯誤通知和結束通知都是終結通知,也就是在終結通知之後,不會再有其他通知產生。
- Subscription
Subscription 表示的是一個訂閱關係。除了之前提到的 request 方法之外,還有 cancel 方法用來取消訂閱。需要註意的是,在 cancel 方法調用之後,發佈者仍然有可能繼續發佈通知。但訂閱最終會被取消。
這幾個介面的關係如下圖所示:
圖片出處:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html
MongoDB 的非同步驅動為 mongo-java-driver-reactivestreams 組件,其實現了 Reactive Stream 的上述介面。
> 除了 reactivestream 之外,MongoDB 的非同步驅動還包含 RxJava 等風格的版本,有興趣的讀者可以進一步瞭解
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/
三、使用示例
接下來,通過一個簡單的例子來演示一下 Reactive 方式的代碼風格:
A. 引入依賴
org.mongodb
mongodb-driver-reactivestreams
1.11.0
> 引入mongodb-driver-reactivestreams 將會自動添加 reactive-streams, bson, mongodb-driver-async組件
B. 連接資料庫
//伺服器實例表
List servers = new ArrayList();
servers.add(new ServerAddress("localhost", 27018));
//配置構建器
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();
//傳入伺服器實例
settingsBuilder.applyToClusterSettings(
builder -> builder.hosts(servers));
//構建 Client 實例
MongoClient mongoClient = MongoClients.create(settingsBuilder.build());
C. 實現文檔查詢
//獲得資料庫對象
MongoDatabase database = client.getDatabase(databaseName);
//獲得集合
MongoCollection collection = database.getCollection(collectionName);
//非同步返回Publisher
FindPublisher publisher = collection.find();
//訂閱實現
publisher.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("start...");
//執行請求
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Document document) {
//獲得文檔
System.out.println("Document:" + document.toJson());
}
@Override
public void onError(Throwable t) {
System.out.println("error occurs.");
}
@Override
public void onComplete() {
System.out.println("finished.");
}
});
註意到,與使用同步驅動不同的是,collection.find()方法返回的不是 Cursor,而是一個 FindPublisher對象,這是Publisher介面的一層擴展。
而且,在返回 Publisher 對象時,此時並沒有產生真正的資料庫IO請求。 真正發起請求需要通過調用 Subscription.request()方法。
在上面的代碼中,為了讀取由 Publisher 產生的結果,通過自定義一個Subscriber,在onSubscribe 事件觸發時就執行 資料庫的請求,之後分別對 onNext、onError、onComplete進行處理。
儘管這種實現方式是純非同步的,但在使用上比較繁瑣。試想如果對於每個資料庫操作都要完成一個Subscriber 邏輯,那麼開發的工作量是巨大的。
為了儘可能復用重覆的邏輯,可以對Subscriber的邏輯做一層封裝,包含如下功能:
- 使用 List 容器對請求結果進行緩存
- 實現阻塞等待結果的方法,可指定超時時間
- 捕獲異常,在等待結果時拋出
代碼如下:
public class ObservableSubscriber implements Subscriber {
//響應數據
private final List received;
//錯誤信息
private final List errors;
//等待對象
private final CountDownLatch latch;
//訂閱器
private volatile Subscription subscription;
//是否完成
private volatile boolean completed;
public ObservableSubscriber() {
this.received = new ArrayList();
this.errors = new ArrayList();
this.latch = new CountDownLatch(1);
}
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
}
@Override
public void onNext(final T t) {
received.add(t);
}
@Override
public void onError(final Throwable t) {
errors.add(t);
onComplete();
}
@Override
public void onComplete() {
completed = true;
latch.countDown();
}
public Subscription getSubscription() {
return subscription;
}
public List getReceived() {
return received;
}
public Throwable getError() {
if (errors.size() > 0) {
return errors.get(0);
}
return null;
}
public boolean isCompleted() {
return completed;
}
/**
* 阻塞一定時間等待結果
*
* @param timeout
* @param unit
* @return
* @throws Throwable
*/
public List get(final long timeout, final TimeUnit unit) throws Throwable {
return await(timeout, unit).getReceived();
}
/**
* 一直阻塞等待請求完成
*
* @return
* @throws Throwable
*/
public ObservableSubscriber await() throws Throwable {
return await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
/**
* 阻塞一定時間等待完成
*
* @param timeout
* @param unit
* @return
* @throws Throwable
*/
public ObservableSubscriber await(final long timeout, final TimeUnit unit) throws Throwable {
subscription.request(Integer.MAX_VALUE);
if (!latch.await(timeout, unit)) {
throw new MongoTimeoutException("Publisher onComplete timed out");
}
if (!errors.isEmpty()) {
throw errors.get(0);
}
return this;
}
}
藉助這個基礎的工具類,我們對於文檔的非同步操作就變得簡單多了。
比如對於文檔查詢的操作可以改造如下:
ObservableSubscriber subscriber = new ObservableSubscriber();
collection.find().subscribe(subscriber);
//結果處理
subscriber.get(15, TimeUnit.SECONDS).forEach( d -> {
System.out.println("Document:" + d.toJson());
});
當然,這個例子還有可以繼續完善,比如使用 List 作為緩存,則要考慮數據量的問題,避免將全部(或超量) 的文檔一次性轉入記憶體。
原文地址:https://www.mongochina.com/article/655.html