RxJava2|Flowable以及背壓

来源:https://www.cnblogs.com/shwo/archive/2018/10/30/9874680.html
-Advertisement-
Play Games

RxJava2 Flowable以及背壓 前述 java maven rxjava 背壓 背壓是指在非同步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。 https://www.jianshu.com/p/0cd258eecf60 的官方介紹: ...


RxJava2 Flowable以及背壓

前述

java-1.8

maven-3

rxjava-2.2.3

背壓

背壓是指在非同步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。

----https://www.jianshu.com/p/0cd258eecf60

Flowable的官方介紹:

io.reactivex.Flowable: 0..N flows, supporting Reactive-Streams and backpressure

0...N flows, 支持響應式流和背壓(backpressure)

只有在需要處理背壓問題時,才需要使用Flowable。

由於只有在上下游運行在不同的線程中,且上游發射數據的速度大於下游接收處理數據的速度時,才會產生背壓問題;
所以,如果能夠確定:

  1. 上下游運行在同一個線程中,

  2. 上下游工作在不同的線程中,但是下游處理數據的速度不慢於上游發射數據的速度,

  3. 上下游工作在不同的線程中,但是數據流中只有一條數據
    則不會產生背壓問題,就沒有必要使用Flowable,以免影響性能。

    由於基於Flowable發射的數據流,以及對數據加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運行效率要比Observable慢得多。

此段出處: https://www.jianshu.com/p/ff8167c1d191

示例(Flowable簡單使用)

Flowable邏輯類 - HelloFlowable.java

package yag;


import io.reactivex.*;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;


public class HelloFlowable {

    public void helloFlowable(){
        // 基本上和Observable一樣.
        Flowable
                .create((FlowableOnSubscribe<Integer>) flowableEmitter -> {
                    Integer i = 0;
                    while ( i < 7) {
                        i++;
                        flowableEmitter.onNext(i);
                    }
                }, BackpressureStrategy.ERROR/* 背壓 */)

                .subscribe(new Subscriber<Integer>() {

                    private Subscription subscription;
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        subscription.request(Long.MAX_VALUE);
                        this.subscription = subscription;
                    }

                    @Override
                    public void onNext(Integer i) {
                        if (i == 5){
                            // 退出接收
                            subscription.cancel();
                        }else {
                            System.out.println("現在接收到的信號是: 第" + i + "信號");
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });


    }
}

執行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloFlowable helloFlowable = new HelloFlowable();
        helloFlowable.helloFlowable();
    }
}

執行結果

現在接收到的信號是: 第1信號
現在接收到的信號是: 第2信號
現在接收到的信號是: 第3信號
現在接收到的信號是: 第4信號

Process finished with exit code 0

小結

request()

subscription.request(Long.MAX_VALUE);

這個方法就是用來向生產者申請可以消費的事件數量。這樣我們便可以根據本身的消費能力進行消費事件。

當調用了request()方法後,生產者便發送對應數量的事件供消費者消費。

BackpressureStrategy.ERROR

參考: https://www.jianshu.com/p/1f4867ce3c01

這是一個背壓操作策略. (BackpressureStrategy - 背壓策略)

ERROR策略下,如果緩存池溢出,就會立刻拋出MissingBackpressureException異常。即保證在非同步操作中,事件累積不能超過128,超過即出現異常。消費者不能再接收事件了,但生產者並不會停止。

其他

  • BUFFER - 所謂BUFFER就是把RxJava中預設的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數據。消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存。

    比較消耗記憶體, 除非是我們比較瞭解消費者的消費能力,能夠把握具體情況,不會產生OOM。(OutOfMemoryError)

  • DROP - 當消費者處理不了事件,就丟棄。

  • LATEST - 消費者通過request()傳入其需求n,然後生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉。
    唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最後一個事件

個人補充:

  • MISSING - 寫入過程中沒有任何緩衝或丟棄, 即不操作.

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Vue 組件之間傳值 一、父組件向子組件傳遞數據 在 Vue 中,可以使用 props 向子組件傳遞數據。 子組件部分: 這是 header.vue 的 HTML 部分,logo 是在 data 中定義的變數。 如果需要從父組件獲取 logo 的值,就需要使用 props: ['logo'] 在 p ...
  • ```php ...
  • 備忘錄,備份曾經發生過的歷史記錄,以防忘記,之後便可以輕鬆回溯過往。想必我們曾經都乾過很多蠢事導致糟糕的結果,當後悔莫及的時候已經是覆水難收了,只可惜這世界上沒有後悔藥,事後我們能做的只能去彌補過失,總結經驗。除非穿越時空,時光倒流,利用愛因斯坦狹義相對論,超越光速回到過去,破鏡重圓。 然而世界是殘 ...
  • 先編一個這麼久不寫的理由 上周我終於鼓起勇氣翻開了headfirst設計模式這本書,看看自己下一個設計模式要寫個啥,然後,我終於知道我為啥這麼久都沒寫設計模式了,headfirst的這個抽象工廠模式,額,我看了好幾次,都不太理解。 在我的印象中,簡單工廠,工廠方法,抽象工廠,這三個東西應該是層層遞進 ...
  • sentence="知之為知之不知為不知"dict1={}for s in sentence: dict1[s]=dict1.setdefault(s,0)+1print(dict1) ...
  • 前言 開心一刻 老師對小明說:"乳就是小的意思,比如乳豬就是小豬,乳名就是小名,請你用乳字造個句" 小明:"我家很窮,只能住在40平米的乳房" 老師:"..., 這個不行,換一個" 小明:"我每天上學都要跳過我家門口的一條乳溝" 老師:"......, 這個也不行,再換一個" 小明:"老師,我想不出 ...
  • Spring Boot 提供了一個發送郵件的簡單抽象,使用的是下麵這個介面。 org.springframework.mail.javamail.JavaMailSender Spring Boot 提供了一個 ,並能自動配置,下麵來做個小例子,順便解析它做了什麼工作。 0、你所需具備的基礎 "什麼 ...
  • 我最近在學習python3,基礎不是很好,所以準備在Leetcode上刷題,這個作為筆記記錄我的,如果大家看到有錯誤,拜托大家幫我指出,謝謝啦~ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...