Coroutines 協程 最近在總結Kotlin的一些東西, 發現協程這塊確實不容易說清楚. 之前的那篇就寫得不好, 所以決定重寫. 反覆研究了官網文檔和各種教程博客, 本篇內容是最基礎也最主要的內容, 力求小白也能看懂並理解. Coroutines概念 Coroutines(協程), 電腦程式 ...
Coroutines 協程
最近在總結Kotlin的一些東西, 發現協程這塊確實不容易說清楚. 之前的那篇就寫得不好, 所以決定重寫.
反覆研究了官網文檔和各種教程博客, 本篇內容是最基礎也最主要的內容, 力求小白也能看懂並理解.
Coroutines概念
Coroutines(協程), 電腦程式組件, 通過允許任務掛起和恢復執行, 來支持非搶占式的多任務. (見Wiki).
協程主要是為了非同步, 非阻塞的代碼. 這個概念並不是Kotlin特有的, Go, Python等多個語言中都有支持.
Kotlin Coroutines
Kotlin中用協程來做非同步和非阻塞任務, 主要優點是代碼可讀性好, 不用回調函數. (用協程寫的非同步代碼乍一看很像同步代碼.)
Kotlin對協程的支持是在語言級別的, 在標準庫中只提供了最低程度的APIs, 然後把很多功能都代理到庫中.
Kotlin中只加了suspend
作為關鍵字.
async
和await
不是Kotlin的關鍵字, 也不是標準庫的一部分.
比起futures和promises, kotlin中suspending function
的概念為非同步操作提供了一種更安全和不易出錯的抽象.
kotlinx.coroutines
是協程的庫, 為了使用它的核心功能, 項目需要增加kotlinx-coroutines-core
的依賴.
Coroutines Basics: 協程到底是什麼?
先上一段官方的demo:
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
fun main() {
GlobalScope.launch { // launch a new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
這段代碼的輸出:
先列印Hello, 延遲1s之後, 列印World.
對這段代碼的解釋:
launch
開始了一個計算, 這個計算是可掛起的(suspendable), 它在計算過程中, 釋放了底層的線程, 當協程執行完成, 就會恢復(resume).
這種可掛起的計算就叫做一個協程(coroutine). 所以我們可以簡單地說launch
開始了一個新的協程.
註意, 主線程需要等待協程結束, 如果註釋掉最後一行的Thread.sleep(2000L)
, 則只列印Hello, 沒有World.
協程和線程的關係
coroutine(協程)可以理解為輕量級的線程. 多個協程可以並行運行, 互相等待, 互相通信. 協程和線程的最大區別就是協程非常輕量(cheap), 我們可以創建成千上萬個協程而不必考慮性能.
協程是運行線上程上可以被掛起的運算. 可以被掛起, 意味著運算可以被暫停, 從線程移除, 存儲在記憶體里. 此時, 線程就可以自由做其他事情. 當計算準備好繼續進行時, 它會返回線程(但不一定要是同一個線程).
預設情況下, 協程運行在一個共用的線程池裡, 線程還是存在的, 只是一個線程可以運行多個協程, 所以線程沒必要太多.
調試
在上面的代碼中加上線程的名字:
fun main() {
GlobalScope.launch {
// launch a new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World! + ${Thread.currentThread().name}") // print after delay
}
println("Hello, + ${Thread.currentThread().name}") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
可以在IDE的Edit Configurations中設置VM options: -Dkotlinx.coroutines.debug
, 運行程式, 會在log中列印出代碼運行的協程信息:
Hello, + main
World! + DefaultDispatcher-worker-1 @coroutine#1
suspend function
上面例子中的delay
方法是一個suspend function
.
delay()
和Thread.sleep()
的區別是: delay()
方法可以在不阻塞線程的情況下延遲協程. (It doesn't block a thread, but only suspends the coroutine itself). 而Thread.sleep()
則阻塞了當前線程.
所以, suspend的意思就是協程作用域被掛起了, 但是當前線程中協程作用域之外的代碼不被阻塞.
如果把GlobalScope.launch
替換為thread
, delay方法下麵會出現紅線報錯:
Suspend functions are only allowed to be called from a coroutine or another suspend function
suspend方法只能在協程或者另一個suspend方法中被調用.
在協程等待的過程中, 線程會返回線程池, 當協程等待結束, 協程會線上程池中一個空閑的線程上恢復. (The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.)
啟動協程
啟動一個新的協程, 常用的主要有以下幾種方式:
launch
async
runBlocking
它們被稱為coroutine builders
. 不同的庫可以定義其他更多的構建方式.
runBlocking: 連接blocking和non-blocking的世界
runBlocking
用來連接阻塞和非阻塞的世界.
runBlocking
可以建立一個阻塞當前線程的協程. 所以它主要被用來在main函數中或者測試中使用, 作為連接函數.
比如前面的例子可以改寫成:
fun main() = runBlocking<Unit> {
// start main coroutine
GlobalScope.launch {
// launch a new coroutine in background and continue
delay(1000L)
println("World! + ${Thread.currentThread().name}")
}
println("Hello, + ${Thread.currentThread().name}") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
最後不再使用Thread.sleep()
, 使用delay()
就可以了.
程式輸出:
Hello, + main @coroutine#1
World! + DefaultDispatcher-worker-1 @coroutine#2
launch: 返回Job
上面的例子delay了一段時間來等待一個協程結束, 不是一個好的方法.
launch
返回Job
, 代表一個協程, 我們可以用Job
的join()
方法來顯式地等待這個協程結束:
fun main() = runBlocking {
val job = GlobalScope.launch {
// launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World! + ${Thread.currentThread().name}")
}
println("Hello, + ${Thread.currentThread().name}")
job.join() // wait until child coroutine completes
}
輸出結果和上面是一樣的.
Job
還有一個重要的用途是cancel()
, 用於取消不再需要的協程任務.
async: 從協程返回值
async
開啟線程, 返回Deferred<T>
, Deferred<T>
是Job
的子類, 有一個await()
函數, 可以返回協程的結果.
await()
也是suspend函數, 只能在協程之內調用.
fun main() = runBlocking {
// @coroutine#1
println(Thread.currentThread().name)
val deferred: Deferred<Int> = async {
// @coroutine#2
loadData()
}
println("waiting..." + Thread.currentThread().name)
println(deferred.await()) // suspend @coroutine#1
}
suspend fun loadData(): Int {
println("loading..." + Thread.currentThread().name)
delay(1000L) // suspend @coroutine#2
println("loaded!" + Thread.currentThread().name)
return 42
}
運行結果:
main @coroutine#1
waiting...main @coroutine#1
loading...main @coroutine#2
loaded!main @coroutine#2
42
Context, Dispatcher和Scope
看一下launch
方法的聲明:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
其中有幾個相關概念我們要瞭解一下.
協程總是在一個context下運行, 類型是介面CoroutineContext
. 協程的context是一個索引集合, 其中包含各種元素, 重要元素就有Job
和dispatcher. Job
代表了這個協程, 那麼dispatcher是做什麼的呢?
構建協程的coroutine builder: launch
, async
, 都是CoroutineScope
類型的擴展方法. 查看CoroutineScope
介面, 其中含有CoroutineContext
的引用. scope是什麼? 有什麼作用呢?
下麵我們就來回答這些問題.
Dispatchers和線程
Context中的CoroutineDispatcher
可以指定協程運行在什麼線程上. 可以是一個指定的線程, 線程池, 或者不限.
看一個例子:
fun main() = runBlocking<Unit> {
launch {
// context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) {
// not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) {
// will get dispatched to DefaultDispatcher
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) {
// will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
}
運行後列印出:
Unconfined : I'm working in thread main
Default : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking : I'm working in thread main
API提供了幾種選項:
Dispatchers.Default
代表使用JVM上的共用線程池, 其大小由CPU核數決定, 不過即便是單核也有兩個線程. 通常用來做CPU密集型工作, 比如排序或複雜計算等.Dispatchers.Main
指定主線程, 用來做UI更新相關的事情. (需要添加依賴, 比如kotlinx-coroutines-android
.) 如果我們在主線程上啟動一個新的協程時, 主線程忙碌, 這個協程也會被掛起, 僅當線程有空時會被恢復執行.Dispatchers.IO
: 採用on-demand創建的線程池, 用於網路或者是讀寫文件的工作.Dispatchers.Unconfined
: 不指定特定線程, 這是一個特殊的dispatcher.
如果不明確指定dispatcher, 協程將會繼承它被啟動的那個scope的context(其中包含了dispatcher).
在實踐中, 更推薦使用外部scope的dispatcher, 由調用方決定上下文. 這樣也方便測試.
newSingleThreadContext
創建了一個線程來跑協程, 一個專註的線程算是一種昂貴的資源, 在實際的應用中需要被釋放或者存儲復用.
切換線程還可以用withContext
, 可以在指定的協程context下運行代碼, 掛起直到它結束, 返回結果.
另一種方式是新啟一個協程, 然後用join
明確地掛起等待.
在Android這種UI應用中, 比較常見的做法是, 頂部協程用CoroutineDispatchers.Main
, 當需要在別的線程上做一些事情的時候, 再明確指定一個不同的dispatcher.
Scope是什麼?
當launch
, async
或runBlocking
開啟新協程的時候, 它們自動創建相應的scope. 所有的這些方法都有一個帶receiver的lambda參數, 預設的receiver類型是CoroutineScope
.
IDE會提示this: CoroutineScope
:
launch { /* this: CoroutineScope */
}
當我們在runBlocking
, launch
, 或async
的大括弧裡面再創建一個新的協程的時候, 自動就在這個scope里創建:
fun main() = runBlocking {
/* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
因為launch
是一個擴展方法, 所以上面例子中預設的receiver是this
.
這個例子中launch
所啟動的協程被稱作外部協程(runBlocking
啟動的協程)的child. 這種"parent-child"的關係通過scope傳遞: child在parent的scope中啟動.
協程的父子關係:
- 當一個協程在另一個協程的scope中被啟動時, 自動繼承其context, 並且新協程的Job會作為父協程Job的child.
所以, 關於scope目前有兩個關鍵知識點:
- 我們開啟一個協程的時候, 總是在一個
CoroutineScope
里. - Scope用來管理不同協程之間的父子關係和結構.
協程的父子關係有以下兩個特性:
- 父協程被取消時, 所有的子協程都被取消.
- 父協程永遠會等待所有的子協程結束.
值得註意的是, 也可以不啟動協程就創建一個新的scope. 創建scope可以用工廠方法: MainScope()
或CoroutineScope()
.
coroutineScope()
方法也可以創建scope. 當我們需要以結構化的方式在suspend函數內部啟動新的協程, 我們創建的新的scope, 自動成為suspend函數被調用的外部scope的child.
上面的父子關係, 可以進一步抽象到, 沒有parent協程, 由scope來管理其中所有的子協程.
Scope在實際應用中解決什麼問題呢? 如果我們的應用中, 有一個對象是有自己的生命周期的, 但是這個對象又不是協程, 比如Android應用中的Activity, 其中啟動了一些協程來做非同步操作, 更新數據等, 當Activity被銷毀的時候需要取消所有的協程, 來避免記憶體泄漏. 我們就可以利用CoroutineScope
來做這件事: 創建一個CoroutineScope
對象和activity的生命周期綁定, 或者讓activity實現CoroutineScope
介面.
所以, scope的主要作用就是記錄所有的協程, 並且可以取消它們.
A CoroutineScope keeps track of all your coroutines, and it can cancel all of the coroutines started in it.
Structured Concurrency
這種利用scope將協程結構化組織起來的機制, 被稱為"structured concurrency".
好處是:
- scope自動負責子協程, 子協程的生命和scope綁定.
- scope可以自動取消所有的子協程.
- scope自動等待所有的子協程結束. 如果scope和一個parent協程綁定, 父協程會等待這個scope中所有的子協程完成.
通過這種結構化的併發模式: 我們可以在創建top級別的協程時, 指定主要的context一次, 所有嵌套的協程會自動繼承這個context, 只在有需要的時候進行修改即可.
GlobalScope: daemon
GlobalScope
啟動的協程都是獨立的, 它們的生命只受到application的限制. 即GlobalScope
啟動的協程沒有parent, 和它被啟動時所在的外部的scope沒有關係.
launch(Dispatchers.Default) { ... }
和GlobalScope.launch { ... }
用的dispatcher是一樣的.
GlobalScope
啟動的協程並不會保持進程活躍. 它們就像daemon threads(守護線程)一樣, 如果JVM發現沒有其他一般的線程, 就會關閉.
參考
- Coroutine Wiki
- 官方文檔 Overview頁
- 官方文檔 Coroutines Guide
- Asynchronous Programming Techniques
- Your first coroutine with Kotlin
- Introduction to Coroutines and Channels
- Github: Kotlin/kotlinx.coroutines
- Github: Coroutines Guide
- Github: KEEP: Kotlin Coroutines
第三方博客:
- Coroutines on Android (part I): Getting the background
- Async Operations with Kotlin Coroutines — Part 1
- Kotlin Coroutines Tutorial for Android
- Coroutine Context and Scope