> 上一篇文章,我介紹了Kotlin協程的創建,使用,協作等內容。本篇將引入更多的使用場景,繼續帶你走進協程世界。 ### 使用協程處理非同步數據流 常用編程語言都會內置對同一類型不同對象的數據集表示,我們通常稱之為容器類。不同的容器類適用於不同的使用場景。Kotlin的`Flow`就是在非同步計算的需 ...
上一篇文章,我介紹了Kotlin協程的創建,使用,協作等內容。本篇將引入更多的使用場景,繼續帶你走進協程世界。
使用協程處理非同步數據流
常用編程語言都會內置對同一類型不同對象的數據集表示,我們通常稱之為容器類。不同的容器類適用於不同的使用場景。Kotlin的Flow
就是在非同步計算的需求下引入的,用於表示非同步的數據流。
Flow
“問渠哪得清如許,為有源頭活水來”,非同步數據流的基本就是以某種方式獲得非同步數據。Kotlin提供了多種種方式,比較常用的就是Kotlin協程包的asFlow
擴展和flow
構造器。前者是對普通數據集的Flow
化封裝,沒有更多可言,我們著重來看後者。
flow
構造器的主要目標就是產生一個非同步數據流,它是一個泛型函數,參數是一個掛起函數,並且是FlowCollector
是擴展函數。這個介面只有一個emit
方法,就是為創建的Flow
提供非同步計算的數據的,因為它是掛起函數,所以我們能在裡面使用其他掛起函數計算非同步值,然後通過emit
方法將值發送出去,如此反覆就能為下游操作提供源源不斷的數據流了。
事情還沒完,上面的步驟我們只是規定了創建數據的方式,並沒有真正執行,也就是建好了道路,但是還沒有車上路。那麼,怎樣才能讓車在路上跑呢,查看Flow
的介面會發現,它提供了collect
方法來處理數據。collect
接收一個掛起函數作為處理邏輯,但是同時,collect
方法本身也是掛起函數,所以,這個方法只能在掛起函數中運行。有了這些知識,我們就可以寫出最簡單的非同步數據流了。
1uspend fun compute():Int{
delay(123)
return 1024
}
viewModelScope.launch {
val flow=flow<Int> {
emit(9527)
emit(compute())
delay(256)
emit(256)
}
flow.collect {
println(it)
}
}
在flow
構造器裡面隨意做各種操作,只要在必要的時候傳遞結果就行了,但是需要註意的是,emit
方法只能運行在同一個協程里。乍一看,這樣分開寫和寫在一起並沒有本質上的差別,但Flow
還能做到更多。
該給Flow換個工作環境了
上一節,我們那個簡單的示例,假如把構造器裡面的數據獲取方法換成網路請求,應用就歇菜了。因為它們都是運行在主線程裡面的。那麼這個時候,看過上一篇文章的小伙伴馬上就會反應過來,用withContext
方法在構造器裡面切換線程就行了哇。思路是很對,因為Flow
的預設配置就是構造器和collect
方法工作在同一線程,既然現在主線程不讓運行,那就把構造器的線程切換一下就行了唄。然後事實並不是這樣,這樣寫出來的代碼根本無法運行。因為官方提供了唯一的flowOn
方法來切換構造器的執行線程。使用也很簡單,就是對創建好的Flow
對象配置一次flowOn
方法就行了。
val flow=["1.jpg","2.jpg"].asFlow()
flow.map { decode(it) }
.flowOn(Dispatchers.IO)
viewModelScope.launch {
flow.collect{
adapter.add(it)
}
有些中間處理邏輯
熟悉RxJava的小伙伴可能有疑問了,這些操作RxJava也能完成,甚至還有更多的操作符來支持中間狀態的處理,那麼非同步數據流能做到這些嗎。毫無疑問,它可以。普通的數據集有map
,filter
等操作方法,對於非同步數據流來說,這些方法同樣適用。而且這些方法參數都是掛起函數,都可以執行非同步操作。而且它還有個更靈活的transform
方法,這個方法可以定製自己的操作符,實現更靈活的數據操作。
當然,上面那些操作符都只能實現單一非同步流的操作,對於多數據流的支持,它也同樣不在話下。zip
可以將兩個兩個數據源兩兩合併起來,合成的數據流長度為兩個數據流中最短的那個數據流的長度。combine
則與zip
不同,它會將兩個數據流最近的發送數據作為輸入,也就是說,假如一塊一慢的兩個數據源,慢的數據源的元素可能會被多次取到,從而最終的數據流比最短的那個都長。
val flow = flowOf(1, 2).delayEach(10)
val flow2 = flowOf("a", "b", "c").delayEach(15)
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
println(it) // Will print "1a 2a 2b 2c"
}
結束狀態跟蹤
上一節提到,由於數據源和處理邏輯不在同一個地方,所以很難確定最終的數據流大小,進而不知道數據流什麼時候處理結束。而且中間操作也可能會改變數據流的大小,由此就更加難以確定數據處理結束的時機了。但是我們有的時候卻需要在數據處理完成後做一些操作,該怎麼辦呢?這個時候當然是該onCompletion
方法上場了。這個方法有一個可為空的Throwable
類型參數,很顯然,這可以同時指示兩種處理結果,成功或者失敗,失敗就會將異常對象傳遞進來。
多個協程共同工作
很多時候,避免不了讓多個協程共同工作。對於返回單個值的協程,上一篇我們也提到過了,可以傳遞async
構造器的返回對象Deferred
,但是局限性就是這個對象只能傳遞一個值。針對多值傳遞的情況,Kotlin提供了Channel
的解決方法。Channel
類似於阻塞隊列,數據通過send
方法發送出去,在另外的地方使用receive
方法接收。通過這種方法,我們可以極大提供協程的工作效率。利用它就可以輕鬆實現生產者和消費者模型。
val chanel=Channel<Int>()
viewModelScope.launch(Dispatchers.IO) {
for (i in 1..5){
delay(1000)
chanel.send(i)
}
}
viewModelScope.launch {
for (i in chanel){
println("Handle ${i}")
}
}
當然,這隻是最簡單的用法,還可以加入更多的生產者,或者不再需要數據時取消,甚至還有專門的product
構造器,直接獲得返回多個值的協程對象。
總結
Kotlin協程有很多有用的API,這些API覆蓋了大部分非同步使用的場景。所以在使用協程的時候,我們首先需要明確使用場景,再根據使用場景確定使用哪一套API,這可以使我們避免陷入API恐懼症。為此,我根據這兩篇文章的內容,整理出了一份情景表格,實際開發中可以參照使用。
Kotlin協程構造器
API | 使用場景 |
---|---|
launch | 執行耗時操作,不需要返回值 |
async | 需要獲取耗時操作的單個返回值 |
produce | 需要獲取耗時操作的多個返回值 |
Kotlin協程協同工具
API | 使用場景 |
---|---|
Flow | 操作非同步數據流 |
Channel | 協程間通信 |
青山不改,綠水長流,咱們下期見!