協程中的Channel用於協程間的通信, 它的宗旨是: ``` Do not communicate by sharing memory; instead, share memory by communicating. ``` ...
Coroutines Channels
Java中的多線程通信, 總會涉及到共用狀態(shared mutable state)的讀寫, 有同步, 死鎖等問題要處理.
協程中的Channel用於協程間的通信, 它的宗旨是:
Do not communicate by sharing memory; instead, share memory by communicating.
Channel basics
channels用於協程間的通信, 允許我們在不同的協程間傳遞數據(a stream of values).
生產者-消費者模式
發送數據到channel的協程被稱為producer
, 從channel接受數據的協程被稱為consumer
.
生產: send, produce.
消費: receive, consume.
當需要的時候, 多個協程可以向同一個channel發送數據, 一個channel的數據也可以被多個協程接收.
當多個協程從同一個channel接收數據的時候, 每個元素僅被其中一個consumer消費一次. 處理元素會自動將其從channel里刪除.
Channel的特點
Channel
在概念上有點類似於BlockingQueue
, 元素從一端被加入, 從另一端被消費. 關鍵的區別在於, 讀寫的方法不是blocking的, 而是suspending的.
在為空或為滿時. channel可以suspend它的send
和receive
操作.
Channel的關閉和迭代
Channel可以被關閉, 說明沒有更多的元素了.
取消producer協程也會關閉channel.
在receiver端有一種方便的方式來接收: 用for
迭代.
看這個例子:
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
運行後會輸出:
1
2
3
4
5
Done!
Process finished with exit code 0
如果註釋掉channel.close()
就會變成:
1
2
3
4
5
Done沒有被輸出, 程式也沒有退出, 這是因為接受者協程還在一直等待.
不同的Channel類型
庫中定義了多個channel類型, 它們的主要區別在於:
- 內部可以存儲的元素數量;
send
是否可以被掛起.
所有channel類型的receive
方法都是同樣的行為: 如果channel不為空, 接收一個元素, 否則掛起.
Channel的不同類型:
- Rendezvous channel: 0尺寸buffer,
send
和receive
要meet on time, 否則掛起. (預設類型). - Unlimited channel: 無限元素,
send
不被掛起. - Buffered channel: 指定大小, 滿了之後
send
掛起. - Conflated channel: 新元素會覆蓋舊元素, receiver只會得到最新元素,
send
永不掛起.
創建channel:
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
預設是Rendezvous channel.
練習: 分析代碼輸出
看這段代碼:
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
這段代碼創建了一個channel, 傳遞String類型的元素.
兩個producder協程, 分別向channel發送不同的字元串, 發送完畢後列印各自的"done".
一個receiver協程, 接收channel中的3個元素並列印.
程式的運行輸出結果會是怎樣呢?
記得在Configurations中加上VM options: -Dkotlinx.coroutines.debug
. 可以看到協程信息.
答案揭曉:
[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
答對了嗎?
為什麼會是這樣呢? 原因主要有兩點:
- 這裡創建的channel是預設的Rendezvous類型, 沒有buffer, send和receive必須要meet, 否則掛起.
- 兩個producer和receiver協程都運行在同一個線程上, ready to be resumed也只是加入了一個等待隊列, resume要按順序來.
這個例子在Introduction to Coroutines and Channels中有一個視頻解說.
另外, 官方文檔中還有一個ping-pang的例子, 為了說明Channels are fair.
參考
- 官方文檔: Channels
- Introduction to Coroutines and Channels
- Github: Coroutines Guide
- Kotlin: Diving in to Coroutines and Channels
歡迎關註微信公眾號: 聖騎士Wind