RxJava2|Flowable以及背壓

来源:https://www.cnblogs.com/shwo/archive/2018/10/30/9874680.html
-Advertisement-
Play Games

RxJava2 Flowable以及背壓 前述 java maven rxjava 背壓 背壓是指在非同步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。 https://www.jianshu.com/p/0cd258eecf60 的官方介紹: ...


RxJava2 Flowable以及背壓

前述

java-1.8

maven-3

rxjava-2.2.3

背壓

背壓是指在非同步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。

----https://www.jianshu.com/p/0cd258eecf60

Flowable的官方介紹:

io.reactivex.Flowable: 0..N flows, supporting Reactive-Streams and backpressure

0...N flows, 支持響應式流和背壓(backpressure)

只有在需要處理背壓問題時,才需要使用Flowable。

由於只有在上下游運行在不同的線程中,且上游發射數據的速度大於下游接收處理數據的速度時,才會產生背壓問題;
所以,如果能夠確定:

  1. 上下游運行在同一個線程中,

  2. 上下游工作在不同的線程中,但是下游處理數據的速度不慢於上游發射數據的速度,

  3. 上下游工作在不同的線程中,但是數據流中只有一條數據
    則不會產生背壓問題,就沒有必要使用Flowable,以免影響性能。

    由於基於Flowable發射的數據流,以及對數據加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運行效率要比Observable慢得多。

此段出處: https://www.jianshu.com/p/ff8167c1d191

示例(Flowable簡單使用)

Flowable邏輯類 - HelloFlowable.java

package yag;


import io.reactivex.*;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;


public class HelloFlowable {

    public void helloFlowable(){
        // 基本上和Observable一樣.
        Flowable
                .create((FlowableOnSubscribe<Integer>) flowableEmitter -> {
                    Integer i = 0;
                    while ( i < 7) {
                        i++;
                        flowableEmitter.onNext(i);
                    }
                }, BackpressureStrategy.ERROR/* 背壓 */)

                .subscribe(new Subscriber<Integer>() {

                    private Subscription subscription;
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        subscription.request(Long.MAX_VALUE);
                        this.subscription = subscription;
                    }

                    @Override
                    public void onNext(Integer i) {
                        if (i == 5){
                            // 退出接收
                            subscription.cancel();
                        }else {
                            System.out.println("現在接收到的信號是: 第" + i + "信號");
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });


    }
}

執行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloFlowable helloFlowable = new HelloFlowable();
        helloFlowable.helloFlowable();
    }
}

執行結果

現在接收到的信號是: 第1信號
現在接收到的信號是: 第2信號
現在接收到的信號是: 第3信號
現在接收到的信號是: 第4信號

Process finished with exit code 0

小結

request()

subscription.request(Long.MAX_VALUE);

這個方法就是用來向生產者申請可以消費的事件數量。這樣我們便可以根據本身的消費能力進行消費事件。

當調用了request()方法後,生產者便發送對應數量的事件供消費者消費。

BackpressureStrategy.ERROR

參考: https://www.jianshu.com/p/1f4867ce3c01

這是一個背壓操作策略. (BackpressureStrategy - 背壓策略)

ERROR策略下,如果緩存池溢出,就會立刻拋出MissingBackpressureException異常。即保證在非同步操作中,事件累積不能超過128,超過即出現異常。消費者不能再接收事件了,但生產者並不會停止。

其他

  • BUFFER - 所謂BUFFER就是把RxJava中預設的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數據。消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存。

    比較消耗記憶體, 除非是我們比較瞭解消費者的消費能力,能夠把握具體情況,不會產生OOM。(OutOfMemoryError)

  • DROP - 當消費者處理不了事件,就丟棄。

  • LATEST - 消費者通過request()傳入其需求n,然後生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉。
    唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最後一個事件

個人補充:

  • MISSING - 寫入過程中沒有任何緩衝或丟棄, 即不操作.

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Vue 組件之間傳值 一、父組件向子組件傳遞數據 在 Vue 中,可以使用 props 向子組件傳遞數據。 子組件部分: 這是 header.vue 的 HTML 部分,logo 是在 data 中定義的變數。 如果需要從父組件獲取 logo 的值,就需要使用 props: ['logo'] 在 p ...
  • ```php ...
  • 備忘錄,備份曾經發生過的歷史記錄,以防忘記,之後便可以輕鬆回溯過往。想必我們曾經都乾過很多蠢事導致糟糕的結果,當後悔莫及的時候已經是覆水難收了,只可惜這世界上沒有後悔藥,事後我們能做的只能去彌補過失,總結經驗。除非穿越時空,時光倒流,利用愛因斯坦狹義相對論,超越光速回到過去,破鏡重圓。 然而世界是殘 ...
  • 先編一個這麼久不寫的理由 上周我終於鼓起勇氣翻開了headfirst設計模式這本書,看看自己下一個設計模式要寫個啥,然後,我終於知道我為啥這麼久都沒寫設計模式了,headfirst的這個抽象工廠模式,額,我看了好幾次,都不太理解。 在我的印象中,簡單工廠,工廠方法,抽象工廠,這三個東西應該是層層遞進 ...
  • sentence="知之為知之不知為不知"dict1={}for s in sentence: dict1[s]=dict1.setdefault(s,0)+1print(dict1) ...
  • 前言 開心一刻 老師對小明說:"乳就是小的意思,比如乳豬就是小豬,乳名就是小名,請你用乳字造個句" 小明:"我家很窮,只能住在40平米的乳房" 老師:"..., 這個不行,換一個" 小明:"我每天上學都要跳過我家門口的一條乳溝" 老師:"......, 這個也不行,再換一個" 小明:"老師,我想不出 ...
  • Spring Boot 提供了一個發送郵件的簡單抽象,使用的是下麵這個介面。 org.springframework.mail.javamail.JavaMailSender Spring Boot 提供了一個 ,並能自動配置,下麵來做個小例子,順便解析它做了什麼工作。 0、你所需具備的基礎 "什麼 ...
  • 我最近在學習python3,基礎不是很好,所以準備在Leetcode上刷題,這個作為筆記記錄我的,如果大家看到有錯誤,拜托大家幫我指出,謝謝啦~ ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...