RxJava2 Flowable以及背壓 前述 java maven rxjava 背壓 背壓是指在非同步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。 https://www.jianshu.com/p/0cd258eecf60 的官方介紹: ...
RxJava2 Flowable以及背壓
前述
java-
1.8
maven-
3
rxjava-
2.2.3
背壓
背壓是指在非同步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。
----https://www.jianshu.com/p/0cd258eecf60
Flowable
的官方介紹:
io.reactivex.Flowable
: 0..N flows, supporting Reactive-Streams and backpressure
0...N flows, 支持響應式流和背壓(backpressure)
只有在需要處理背壓問題時,才需要使用Flowable。
由於只有在上下游運行在不同的線程中,且上游發射數據的速度大於下游接收處理數據的速度時,才會產生背壓問題;
所以,如果能夠確定:
上下游運行在同一個線程中,
上下游工作在不同的線程中,但是下游處理數據的速度不慢於上游發射數據的速度,
上下游工作在不同的線程中,但是數據流中只有一條數據
則不會產生背壓問題,就沒有必要使用Flowable,以免影響性能。由於基於Flowable發射的數據流,以及對數據加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運行效率要比Observable慢得多。
此段出處: https://www.jianshu.com/p/ff8167c1d191
示例(Flowable
簡單使用)
Flowable
邏輯類 - HelloFlowable.java
package yag;
import io.reactivex.*;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class HelloFlowable {
public void helloFlowable(){
// 基本上和Observable一樣.
Flowable
.create((FlowableOnSubscribe<Integer>) flowableEmitter -> {
Integer i = 0;
while ( i < 7) {
i++;
flowableEmitter.onNext(i);
}
}, BackpressureStrategy.ERROR/* 背壓 */)
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
this.subscription = subscription;
}
@Override
public void onNext(Integer i) {
if (i == 5){
// 退出接收
subscription.cancel();
}else {
System.out.println("現在接收到的信號是: 第" + i + "信號");
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
}
}
執行者 - Runner.java
package yag;
public class Runner {
public static void main(String[] args){
HelloFlowable helloFlowable = new HelloFlowable();
helloFlowable.helloFlowable();
}
}
執行結果
現在接收到的信號是: 第1信號
現在接收到的信號是: 第2信號
現在接收到的信號是: 第3信號
現在接收到的信號是: 第4信號
Process finished with exit code 0
小結
request()
subscription.request(Long.MAX_VALUE);
這個方法就是用來向生產者申請可以消費的事件數量。這樣我們便可以根據本身的消費能力進行消費事件。
當調用了request()方法後,生產者便發送對應數量的事件供消費者消費。
BackpressureStrategy.ERROR
參考: https://www.jianshu.com/p/1f4867ce3c01
這是一個背壓操作策略. (BackpressureStrategy
- 背壓策略)
在ERROR
策略下,如果緩存池溢出,就會立刻拋出MissingBackpressureException
異常。即保證在非同步操作中,事件累積不能超過128,超過即出現異常。消費者不能再接收事件了,但生產者並不會停止。
其他
BUFFER
- 所謂BUFFER就是把RxJava中預設的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數據。消費者通過request()
即使傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存。比較消耗記憶體, 除非是我們比較瞭解消費者的消費能力,能夠把握具體情況,不會產生OOM。(
OutOfMemoryError
)DROP
- 當消費者處理不了事件,就丟棄。LATEST
- 消費者通過request()傳入其需求n,然後生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉。
唯一的區別就是LATEST
總能使消費者能夠接收到生產者產生的最後一個事件。
個人補充:
MISSING
- 寫入過程中沒有任何緩衝或丟棄, 即不操作.