目錄:
- 一. 協程的基本概念
- 二. 從異步程式設計開始
- 回調
- CompletableFuture
- RxJava
- 協程
- 三. 協程的基本概念
- suspend funtion
- CoroutineScope
- CoroutineContext
- CoroutineDispatcher
- Job 和 Deffered
- Coroutine builders
一. 協程的基本概念
協程就像非常輕量級的線程。線程是由系統排程的,線程切換或線程阻塞的開銷都比較大。而協程依賴于線程,但是協程挂起時不需要阻塞線程,幾乎是無代價的,協程是由開發者控制的。
二. 從異步程式設計開始
我們先從一個例子說起,發送一個帶有認證的 post 請求,需要以下三個步驟,首先用戶端向服務其發送一個得到token的請求,然後構造一個 Post 請求,最後将 Post 請求發出去。這三個請求都是耗時操作,而且請求和請求之間有着依賴的關系。
fun requestToken(): Token {
delay(500L) // 模拟請求過程
return token
}
fun createPost(token: Token, item: Item): Post {
delay(500L) // 模拟構造過程
return post
}
fun processPost(post: Post) {
delay(500L) // 模拟請求過程
}
方法一:使用回調的方式
操作2依賴于操作1,是以把操作2作為回調放在操作1的參數内,由操作1決定回調時機。
fun requestTokenAsync(cb: (Token) -> Unit) { ... }
fun createPostAsync(token: Token, item: Item, cb: (Post) -> Unit) { ... }
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
requestTokenAsync { token ->
createPostAsync(token, item) { post ->
processPost(post)
}
}
}
這種多層嵌套的方式比較複雜,而且不友善處理異常情況。
方法二:CompletableFuture
Java 8 引入的 CompletableFuture 可以将多個任務串聯起來,可以避免多層嵌套的問題。
可以簡單看一下API,具體的使用方法參考文章:
CompletableFuture 使用詳解
方法 | 作用 |
---|---|
runAsync | 建立一個異步操作,不支援傳回值 |
supplyAsync | 建立一個異步操作,支援傳回值 |
whenComplete | 計算結果完成的回調方法 |
exceptionally | 計算結果出現異常的回調方法 |
thenApply | 當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化。 |
handle | 與thenApply相似,handle還可以處理異常任務 |
thenAccept | 與thenApply相似,但是沒有傳回值 |
thenRun | 與thenAccept相似,但是得不到上面任務的處理結果 |
thenCombine | 合并任務,有傳回值 |
thenAcceptBoth | 合并任務,無傳回值 |
applyToEither | 兩個任務用哪個結果 |
acceptEither | 誰傳回的結果快使用那個結果 |
runAfterEither | 任何一個完成都會執行下一步操作 |
runAfterBoth | 都完成了才會執行下一步操作 |
thenCompose | 允許你對兩個 CompletionStage 進行流水線操作,第一個操作完成時,将其結果作為參數傳遞給第二個操作。 |
知道了API後就可以這麼寫
fun requestTokenAsync(): CompletableFuture<Token> { ... }
fun createPostAsync(token: Token, item: Item): CompletableFuture<Post> { ... }
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
requestTokenAsync()
.thenCompose { token -> createPostAsync(token, item) }
.thenAccept { post -> processPost(post) }
.exceptionally { e ->
e.printStackTrace()
null
}
}
方法三: RxJava
RxJava的用法跟CompletableFuture鍊式調用比較類似,這也是比較簡潔,比較多人使用的方式:
fun requestToken(): Token { ... }
fun createPost(token: Token, item: Item): Post { ... }
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
Single.fromCallable { requestToken() }
.map { token -> createPost(token, item) }
.subscribe(
{ post -> processPost(post) }, // onSuccess
{ e -> e.printStackTrace() } // onError
)
}
方法四:協程的方式
suspend fun requestToken(): Token { ... } // 挂起函數
suspend fun createPost(token: Token, item: Item): Post { ... } // 挂起函數
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
GlobalScope.launch {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
// 需要異常處理,直接加上 try/catch 語句即可
}
}
協程可以讓我們使用順序的方式去寫異步代碼,而且并不會阻塞UI線程。
三. 協程的基本概念
1. suspend funtion
我們寫的有兩個方法是挂起的函數(suspend function)
suspend fun requestToken(): Token { ... }
suspend fun createPost(token: Token, item: Item): Post { ... }
首先要知道的是,挂起函數挂起協程的時候,并不會阻塞線程。
然後一個 suspend function 隻能在一個協程或一個 suspend function 中使用,但是suspend function和普通函數使用方法一樣,有自己的參數,有自己的傳回值,那麼為什麼要使用suspend funtion呢?
我們可以看到delay函數是一個挂起函數 , Thread.sleep()是一個阻塞函數,如果我們在一個A函數可能會挂起協程,比如調用delay()方法,因為 delay() 是suspend function ,隻能在一個協程或一個suspend function中使用,是以A函數也必須是suspend function。是以使用suspend funtion的标準是該函數有無挂起操作。
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine [email protected] { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
2. CoroutineScope
CoroutineScope為協程的作用域,可以管理其域内的所有協程。一個CoroutineScope可以有許多的子scope。
建立子scope的方式有許多種, 常見的方式有:
方法一:使用lauch, async 等builder建立一個新的子協程。
我們來看一下CoroutineScop接口
// 每個Coroutine作用域都有一個Coroutine上下文
public interface CoroutineScope {
// Scope 的 Context
public val coroutineContext: CoroutineContext
}
是以 CoroutineScope 隻是定義了一個新 Coroutine 的 coroutineContext,其實每個 coroutine builder(launch
,async) 都是 CoroutineScope 的擴充函數,并且自動的繼承了目前 Scope 的 coroutineContext 和取消操作。我們以launch方法為例:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
- 第一個參數 context,預設 launch 所建立的 Coroutine 會自動繼承目前 Coroutine 的 context,如果有額外的 conetxt 需要傳遞給所建立的 Coroutine 則可以通過第一個參數來設定。
-
第二個參數 start 為 CoroutineStart 枚舉類型,用來指定 Coroutine 啟動的選項。有如下幾個取值:
- DEFAULT (預設值)立刻安排執行該Coroutine執行個體
- LAZY 延遲執行,隻有當用到的時候才執行
- ATOMIC 類似 DEFAULT,差別是當Coroutine還沒有開始執行的時候無法取消
- UNDISPATCHED 如果使用 Dispatchers.Unconfined dispatcher,則立刻在目前線程執行直到遇到第一個suspension point。然後當 Coroutine 恢複的時候,在繼續在 suspension的 context 中設定的 CoroutineDispatcher 中執行。
- 第三個參數 block 為一個 suspending function,這個就是 Coroutine 中要執行的代碼塊,在實際使用過程中通常使用 lambda 表達式,也稱之為 Coroutine 代碼塊。需要注意的是,這個 block 函數定義為 CoroutineScope 的擴充函數,是以在代碼塊中可以直接通路 CoroutineScope 對象(也就是 this 對象)
結論:launch方法實際上就是new了一個LazyStandaloneCoroutine協程(isLazy屬性為false),協程自動的繼承了目前 Scope(this代表的協程scope) 的 coroutineContext 和取消操作。
方法二:使用coroutineScope Api建立新scope:
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
這個api主要用于友善地建立一個子域(相當于建立一個局部作用域),并且管理域中的所有子協程。注意這個方法隻有在所有 block中建立的子協程全部執行完畢後,才會退出。
// print輸出的結果順序将會是 1, 2, 3, 4
coroutineScope {
delay(1000)
println("1")
launch {
delay(6000)
println("3")
}
println("2")
[email protected]
}
println("4")
方法三:繼承CoroutineScope.這也是比較推薦的做法,用于處理具有生命周期的對象。
在 Android 環境中,通常每個界面(Activity、Fragment 等)啟動的 Coroutine 隻在該界面有意義,如果使用者在等待 Coroutine 執行的時候退出了這個界面,則再繼續執行這個 Coroutine 可能是沒必要的。那麼我們怎麼讓activity管理好其内的 Coroutine 呢?
我們來看下面的例子:
class ScopedActivity : Activity(), CoroutineScope {
lateinit var job: Job
// CoroutineScope 的實作
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
}
override fun onDestroy() {
super.onDestroy()
// 當 Activity 銷毀的時候取消該 Scope 管理的 job。
// 這樣在該 Scope 内建立的子 Coroutine 都會被自動的取消。
job.cancel()
}
/*
* 注意 coroutine builder 的 scope, 如果 activity 被銷毀了或者該函數内建立的 Coroutine
* 抛出異常了,則所有子 Coroutines 都會被自動取消。不需要手工去取消。
*/
fun loadDataFromUI() = launch { // <- 自動繼承目前 activity 的 scope context,是以在 UI 線程執行
val ioData = async(Dispatchers.IO) { // <- launch scope 的擴充函數,指定了 IO dispatcher,是以在 IO 線程運作
// 在這裡執行阻塞的 I/O 耗時操作
}
// 和上面的并非 I/O 同時執行的其他操作
val data = ioData.await() // 等待阻塞 I/O 操作的傳回結果
draw(data) // 在 UI 線程顯示執行的結果
}
}
解釋一下這個地方:
get() = Dispatchers.Main + job
一個上下文(context)可以是多個上下文的組合。組合的上下文需要是不同的類型。是以,你需要做兩件事情:
- 一個 dispatcher: 用于指定協程預設使用的 dispatcher;
- 一個 job: 用于在任何需要的時候取消協程;
操作符号 + 用于組合上下文。如果兩種不同類型的上下文相組合,會生成一個組合的上下文(CombinedContext),這個新的上下文會同時擁有被組合上下文的特性。因為:
get() = Dispatchers.Main + job
,是以launch方法實際上是在Dispatchers.Main,也就是在UI線程中執行的。
3. CoroutineContext
CoroutineScope 可以了解為一個協程,裡面有一個協程的上下文:CoroutineContext,這個協程上下文包含很多該協程的資訊,比如:Job, ContinuationInterceptor, CoroutineName 和CoroutineId。在CoroutineContext中,是用map來存這些資訊的, map的鍵是這些類的伴生對象,值是這些類的一個執行個體,你可以這樣子取得context的資訊:
val job = context[Job]
val continuationInterceptor = context[ContinuationInterceptor]
4. CoroutineDispatcher
CoroutineDispatcher,協程排程器,決定協程所在的線程或線程池。它可以指定協程運作于特定的一個線程、一個線程池或者不指定任何線程(這樣協程就會運作于目前線程)。coroutines-core中 CoroutineDispatcher 有四種标準實作Dispatchers.Default、Dispatchers. IO,Dispatchers.Main 和 Dispatchers.Unconfined,Unconfined 就是不指定線程。
- Dispatchers.Default: 如果建立 Coroutine 的時候沒有指定 dispatcher,則一般預設使用這個作為預設值。Default dispatcher 使用一個共享的背景線程池來運作裡面的任務。
- Dispatchers. IO: 顧名思義這是用來執行阻塞 IO 操作的,也是用一個共享的線程池來執行裡面的任務。根據同時運作的任務數量,在需要的時候會建立額外的線程,當任務執行完畢後會釋放不需要的線程。通過系統 property kotlinx.coroutines.io.parallelism 可以配置最多可以建立多少線程,在 Android 環境中我們一般不需要做任何額外配置。
- Dispatchers.Unconfined: 立刻在啟動 Coroutine 的線程開始執行該 Coroutine直到遇到第一個 suspension point。也就是說,coroutine builder 函數在遇到第一個 suspension point 的時候才會傳回。而 Coroutine 恢複的線程取決于 suspension function 所在的線程。 一般而言我們不使用 Unconfined。
- Dispatchers.Main: 是在 Android 的 UI 線程執行。
- 通過 newSingleThreadContext 和 newFixedThreadPoolContext 函數可以建立在私有的線程池中運作的 Dispatcher。由于建立線程比較消耗系統資源,是以對于臨時建立的線程池在使用完畢後需要通過 close 函數來關閉線程池并釋放資源。
5. Job 和 Deffered
CoroutineScope.launch 函數傳回一個 Job 對象,該對象代表了這個剛剛建立的 Coroutine執行個體,job 對象有不同的狀态(剛建立的狀态、活躍的狀态、執行完畢的狀态、取消狀态等),通過這個 job 對象可以控制這個 Coroutine 執行個體,比如調用 cancel 函數可以取消執行。Job對象持有所有的子job執行個體,可以取消所有子job的運作。Job的join方法會等待自己以及所有子job的執行, 是以Job給予了CoroutineScope一個管理自己所有子協程的能力。
CoroutineScope.async 函數也是三個參數,參數類型和 launch 一樣,唯一的差別是第三個block參數會傳回一個值,而 async 函數的傳回值為 Deferred 類型。可以通過 Deferred 對象擷取異步代碼塊(block)傳回的值。Deferred 繼承了 Job,它有個 await() 方法。
// Awaits for completion of this value without blocking a thread and resumes when deferred computation is complete,
// returning the resulting value or throwing the corresponding exception if the deferred was cancelled.
public suspend fun await(): T
6. Coroutine builders
- CoroutineScope.launch : 不阻塞目前線程,在背景建立一個新協程,也可以指定協程排程器,無傳回值
- CoroutineScope.async : 在背景建立一個新協程,有傳回值
- runBlocking :
建立一個新的協程來阻塞目前線程,直到 runBlocking 代碼塊執行完成。通常它不會用于協程中,因為在協程中寫一個阻塞的代碼塊實在太别扭,可以通過挂起操作取代。它通常作為一個擴充卡,将 main 線程轉換成一個 main 協程,我們也就持有了一個 main 協程的 coroutineContext 上下文對象,就可以随心所欲用(this)使用 coroutineContext 的擴充方法,随心所欲使用 suspend 方法 ( suspend 方法隻能用于 suspend 方法和協程中)。是以 runBlocking 一般用在 test 函數和 main 函數中。
- withContext :
withContext 不會建立一個新的協程,在指定的協程上運作代碼塊,并挂起該協程直到代碼塊運作完成。通常是用于切換協程的上下文。
例如:
// 使用 withContext 切換協程,上面的例子就是先在 IO 線程裡執行,然後切換到主線程。
GlobalScope.launch(Dispatchers.IO) {
...
withContext(Dispatchers.Main) {
...
}
}