響應式流是一個倡議,用來為具有非阻塞後壓的非同步流處理提供一個標準。大家努力的目標集中在運行時環境(JVM和JavaScript)和網路協議上。 註:響應式流其實就是一個規範,本文講解的正是這個規範,且這個規範已經被引入到JDK9里了。 後壓:就是下游出現了問題,得不到解決時,這個問題就會逆流而上,繼 ...
響應式流是一個倡議,用來為具有非阻塞後壓的非同步流處理提供一個標準。大家努力的目標集中在運行時環境(JVM和JavaScript)和網路協議上。
註:響應式流其實就是一個規範,本文講解的正是這個規範,且這個規範已經被引入到JDK9里了。
後壓:就是下游出現了問題,得不到解決時,這個問題就會逆流而上,繼而影響上游。
如果一個路口紅綠燈壞了造成堵車,如果不管的話,用不了太長時間,車就會堵到上一個路口,如果再不管的話,整條路都會被賭滿。
本規範里的這些介面在JDK9的java.util.concurrent.Flow里都已經可用,它們在語義上與響應式流的各介面基本上一比一相等。
這意味著將有一個遷移周期,直至第三方庫都採用JDK里的新類型,這個周期自然希望短一些。
這取決於第三方庫的完整語義相等,和Reactive Streams和JDK的Flow之間的適配器庫和一個與JDK的Flow類型可直接相容的TCK。
因為這個標準在JDK9才引入,在此之前一些第三方庫都已經存在,所以需要一個過渡階段,讓第三方庫慢慢採用JDK的標準。
TCK是一個工具,下文有介紹。
處理流數據,尤其是線上數據,它們的量是無法預知的,在一個非同步系統中要求格外小心。
最重要的問題是資源的消耗需要被小心地控制,以便一個快速的數據源不會淹沒流的目的地(下游)。
需要非同步的目的是為了並行地使用計算資源,如協調網路上多個主機,或一個機器的多個CPU核。
響應式流的主要目標是控制橫穿一個非同步邊界的流數據的交換。
考慮到向另一個線程或線程池傳遞元素,同時確保接收端不被強迫緩衝任意數量的數據。
換句話說,後壓是這個模型的一個必須部分,目的是允許隊列在被界定的線程之間進行調節(斡旋)。
如果後壓信號是同步的,非同步處理的好處將被否定,因此對一個響應式流實現的所有方面的完全非阻塞和非同步行為的授權需要小心一些。
這個規範的意圖就是允許創建許多種一致的實現,它們憑藉遵守規則將能夠平滑地互操作,在一個流應用的整個處理圖中保留前文提到的好處和特征。
需要註意的是流操作的精確特性(轉化,分割,合併等)並沒有被這個規範包括。響應式流只關心在不同的API組件間調節流數據。在他們的開發中,已經非常細心地確保所有組合流的基本方式都能夠被表達。
總之,響應式流是JVM上面向流的庫的一個標準和規範:
處理一個潛在的無限數目元素,
依次地,
非同步地在組件間傳遞元素,
帶有強制的非阻塞後壓。
響應式流規範由以下部分組成:
1、API規定了需要實現的響應式流類型,並且在不同的實現間完成互操作性。
2、技術相容性工具(TCK)是一個標準的測試套件,用於各種實現的一致性測試。
各種實現可以自由地實現規範中沒有提到的額外特性,只要它們遵從API要求和在TCK中通過測試。
API由以下組件組成,響應式流的實現必須提供它們:
1、Publisher,發佈者(生產者)
2、Subscriber,訂閱者(消費者)
3、Subscription,訂閱
4、Processor,處理者
它們其實是4個介面,先睹為快:
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
一個發佈者是一個潛在的無限數量的序列元素的一個提供者,按照收到的來自於它的訂閱者的需要來發佈這些元素。
作為對發佈者的subscribe(Subscriber)方法調用的響應,對於訂閱者上的方法的可能調用順序按下麵的協議給出:
onSubscribe onNext* (onError | onComplete)?
這意味著onSubscribe方法總是被調用,後面跟著一個可能的無限數量onNext方法調用(因為訂閱者的請求)。如果失敗的話,後跟一個onError方法調用,或當沒有更多的元素可用時,是一個onComplete方法調用,只要這個Subscription(訂閱關係)沒有被取消。
術語,釋義
Signal,本義是信號。作為一個名詞,指的是這些方法onSubscribe,onNext,onComplete,onError,request(n)或cancel中的一個。作為一個動詞,指的是調用這些方法中的一個。
錶面上可以理解為發信號進行通知,本質上也是通過方法調用來實現的。
Demand,本義是需求。作為一個名詞,指的是一個訂閱者(向發佈者)請求的一定數量的元素,它還沒有被髮布者分發。作為一個動詞,指的是請求更多元素的行為動作。
可以看作是訂閱者向發佈者發出的需求/動作,想要獲取更多的元素。發佈者暫時還沒有回應。
Synchronous(ly),本義是同步的。指的是在調用線程上執行(沒有新開線程)。
Return normally,本義是正常返回。指的是僅返回已聲明過的類型的值給調用者。如果想發送一個失敗給訂閱者,唯一合法的方式是通過onError(回調)方法。
Responsivity,本義是響應度。指的是已準備就緒有能力來做出響應。在這個文檔里用來指示不同的組件不應該互相削弱響應的能力。
Non-obstructing(堵塞),本義是不堵塞。指的是描述一個方法的質量(品質),即在調用線程上儘可能快地執行完。這意味著,例如,避免重的計算和其它將拖住調用者線程執行的事情(因為沒有新開線程)。
Terminal state,本義是終止狀態。對於一個發佈者,指的是當onComplete或者onError已經被調用。對於一個訂閱者,指的是當一個onComplete或onError(回調方法)已經收到。
NOP,指的是執行對於調用線程來說沒有可檢測到的影響,能夠像這樣安全地被調用任意次。
External synchronization,本義是外部同步。為了線程安全的目的,協調訪問在這個規範里定義的結構之外被實現,使用的技術像但不限於atomics,monitors或locks。
Thread-safe,能夠安全地被同步或非同步調用,不需要外部的同步來確保程式的正確性。
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}