天天看點

4、協程上下文及協程排程器

協程上下文和協程排程器(官方文檔)

目錄

1、協程排程器和線程

2、非限定和限定排程器(Unconfined vs confined dispatcher)

3、調式協程和線程

3.1、使用IDEA調式

3.2、使用Log調式

4、線上程之間切換

5、協程上下文的Job

6、子協程

7、父協程的責任

8、調式狀态下的協程名字

9、組合協程上下文元素(Combining context elements)

10、協程作用域(Coroutine scope)

11、線程本地存儲(Thread-local data)

協程通常運作于協程标準庫中定義的CoroutineContext所代表協程上下文中。

協程上下文其實是一系列的不同元素集合,其中最主要的元素就是之前我們用過的協程Job,還有這裡将要介紹的協程排程器。

1、協程排程器和線程

協程上下文中包含一個協程排程器(CoroutineDispatcher)可以用來排程協程的執行,它将決定協程是運作于線程中還是線程池中。協程排程器可以限定一個協程運作在特定的線程中、排程到線程池中運作或者無限制的運作線上程之間。

所有的協程建構器就像launch和async一樣接收一個可選參數CoroutineContext ,這個參數可以用來指定協程排程器以及其他的協程上下文資訊。

你可以運作下面的程式試試:

package com.cool.cleaner.test

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    launch {//context of the parent, main runBlocking coroutine
        println("main runBlocking       : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) {//work with main thread
        println("Unconfined             : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) {//will get dispatched to DefaultDispatcher
        println("Default                : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) {//will get its own new thread
        println("newSingleThreadContext : I'm working in thread ${Thread.currentThread().name}")
    }
}
           

輸出如下(可能你運作跑出來的順序和這裡不太一樣):

Unconfined              : I'm working in thread main @coroutine#3
Default                 : I'm working in thread DefaultDispatcher-worker-2 @coroutine#4
main runBlocking        : I'm working in thread main @coroutine#2
newSingleThreadContext  : I'm working in thread MyOwnThread @coroutine#5

Process finished with exit code 0
           

當調用launch { ... }的無參版本時,它将會從啟動它的CoroutineScope 中繼承協程上下文,當然也繼承了協程排程器;在上面的例子中,它從運作于主線程的

runBlocking

 中繼承了協程上下文。

Dispatchers.Unconfined 是一種特殊的排程器而且看起來他也是運作于main線程中,但是它其實是另一種不同的機制,接下來會介紹到。

在GlobalScope中啟動的協程預設使用的協程排程器是Dispatchers.Default,并且使用共享的背景線程池,是以

launch(Dispatchers.Default) { ... }

 和 

GlobalScope.launch { ... }這兩種啟動協程的方式使用的是相同的協程排程器

Dispatchers.Default。

newSingleThreadContext 則建立了一個給協程運作的獨立線程,用一個專門的線程來運作協程是非常耗資源的;在實際的應用中,當你不再使用的時候你必須調用close函數關閉它或者儲存在一個頂級變量中以便在整個應用中可以複用它。

2、非限定和限定排程器(Unconfined vs confined dispatcher)

協程排程器Dispatchers.Unconfined 會在調用它的線程中啟動一個協程,但是僅僅隻是在協程中調用第一個suspend函數之前,在suspend函數調用之後協程可能排程到其他線程中運作,這取決于你調用的suspend函數的實作(意思就是調用suspend函數後協程可能會運作于不同的線程中)。這種非限定的排程器适合于排程那種不大量消耗CPU時間(非CPU密集型的任務)并且也不更新特定線程(比如UI線程)中的共享資料的協程。

另一方面,排程器預設繼承于外層的CoroutineScope ,對于runBlocking 協程來說預設的排程器受到調用runBlocking 的線程的限制,,,是以這種情況下的繼承會限定線程執行于調用runBlocking 的線程中,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) {//not confined -- will work with main thread
        println("Unconfined       : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined       : After delay in thread ${Thread.currentThread().name}")
    }
    launch {//context of the parent, main runBlocking coroutine
        println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking : After delay in thread ${Thread.currentThread().name}")
    }
}
           

輸出如下:

Unconfined       : I'm working in thread main @coroutine#2
main runBlocking : I'm working in thread main @coroutine#3
Unconfined       : After delay in thread kotlinx.coroutines.DefaultExecutor @coroutine#2
main runBlocking : After delay in thread main @coroutine#3

Process finished with exit code 0
           

你可以看到從runBlocking {...}中繼承了協程上下文的協程運作于main線程中,而沒有限定的那個則運作在delay函數使用的預設線程中。

非限定排程器(Dispatchers.Unconfined)是一種進階機制,在極少的情況下是非常有用的,比如:有一些操作需要馬上執行,但之後協程的排程運作并不是那麼重要或者會産生一些不希望的副作用,此時就可以使用它。在一般情況下你是不需要使用它的。

3、調式協程和線程

協程可以在一個線程挂起并在另一個線程恢複,如果你不适用特殊的工具調式的話,即使是單線程排程器也很難明白協程在幹什麼、運作到哪裡了。

3.1、使用IDEA調式

kotlin插件中的Coroutine Debugger可以簡化在IntelliJ IDEA中調式協程。(沒有使用IDEA,是以直接翻譯了)

調式器隻對 

kotlinx-coroutines-core 

1.3.8之後的版本有效

在調式視窗中有一個Coroutines的tab,在這個tab中你可以看到目前處于運作狀态(running)的和挂起的協程(suspended),協程是按照排程他們的排程器來組織的。如下圖:

4、協程上下文及協程排程器

(這是官網圖檔:好像我并麼有看到相應的tab啊)

使用協程調式器,你可以:

  1. 檢視每一個協程的狀态。
  2. 請檢視正在運作和挂起的協程的本地和捕獲變量的值。
  3. 檢視建立協程的所有棧、調用棧,棧中包含了所有變量的值。
  4. 導出一個包含所有協程及相應棧的一個報告,你可以在Coroutines這個tab中右鍵點選然後選擇Get Coroutines Dump。

你隻需要在代碼中設定斷點然後在debug模式下運作程式即可開始調式應用,你也可以點選這裡學習有關協程調式的更多技巧。

3.2、使用Log調式

另一種調式線程的方法是在log中列印出線程的名字,這種方法在log系統中都是普遍支援的;在使用協程的時候,僅僅隻是線程名字并不會包含多少關于線程上下文的資訊,是以庫kotlinx.coroutines中包含了一些工具使得log調式更容易些。

使用JVM參數-Dkotlinx.coroutines.debug運作下面的程式:

package com.cool.cleaner.test

import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking

fun log(msg: String): Unit {
    println("[${Thread.currentThread().name}] $msg")
}
fun main() = runBlocking<Unit> {
    val a = async {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() + b.await()}")
}
           

這裡有三個協程, 位于

runBlocking中的主協程

main coroutine (#1)以及計算a值和b值的另外兩個協程。他們都在

runBlocking

 的協程上下文中執行并且都被限定在主線程中執行,輸出如下:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

Process finished with exit code 0
           

log函數會把線程的名字列印在方括号裡面,列印出的名字是main + @coroutine#xxx,當打開調式模式的時候#xxx這部分的取值是連續的。

當使用選項

-ea

 運作JVM的時候也會打開調試模式,你可以在該文檔中檢視關于屬性DEBUG_PROPERTY_NAME 的調試工具。

4、線上程之間切換

使用JVM選項-Dkotlinx.coroutines.debug運作下面的程式:

package com.cool.cleaner.test

import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

fun log(msg: String): Unit {
    println("[${Thread.currentThread().name}] $msg")
}
fun main() {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}
           

這裡用到了幾個新的技術,首先是為runBlocking顯示指定特定的協程上下文,另一個是使用withContex切換協程上下文,下面是程式輸出:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

Process finished with exit code 0
           

這個例子中還使用了标準庫中的use函數釋放由newSingleThreadContext 建立的不再使用的線程。

5、協程上下文的Job

線程的job對象是協程上下文的一部分,你可以使用表達式coroutineContext[Job]擷取它,如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.Job
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}
           

在調式模式下的輸出就像下面這樣的:

My job is "coroutine#1":BlockingCoroutine{Active}@6477463f

Process finished with exit code 0
           

請注意,使用 CoroutineScope的isActive其實是表達式coroutineContext[Job]?.isActive == true的一種簡寫。

6、子協程

當一個協程在另一個協程的作用域CoroutineScope 中啟動的時候,它就會通過CoroutineScope.coroutineContext繼承父協程的協程上下文,同時子協程的job會變成父協程job的孩子(job在父子協程之間也有父子關系),當父協程取消的時候所有子協程也會遞歸的取消。

然而,當使用全局作用域GlobalScope啟動一個新協程的時候,這個新協程是沒有父協程的,是以它不會與啟動它的協程作用域綁定(也就是沒有父子關系),并且是獨立運作的。如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val request = launch {
        GlobalScope.launch {//獨立運作的,不受父協程取消的限制
            println("job1: I run in GlobaScope and execute independently")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        launch {//繼承了父協程的協程上下文,收到父協程取消的限制
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
    }
    delay(500)
    request.cancel()
    delay(1000)
    println("main: who has survived request cancellation ?")
}
           

下面是程式輸出:

job1: I run in GlobaScope and execute independently
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: who has survived request cancellation ?

Process finished with exit code 0
           

7、父協程的責任

父協程通常會等待子協程執行完成才會退出,它不需要顯示的跟蹤它啟動的所有子協程,也不需使用Job.join等待子協程的結束,反正一切都是自動的,隻有子協程執行完成了父協程才會退出,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val request = launch {
        repeat(3) { index ->
            launch {
                delay((index + 1) * 200L)
                println("Coroutine $index is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join()
    println("Now processing of the request is complete")
}
           

輸出結果如下:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

Process finished with exit code 0
           

8、調式狀态下的協程名字

當協程經常記錄列印日志時,自動配置設定協程id也不錯,你檢視日志的時候也隻需要把來自同一個協程的日志關聯在一起就好了;然而當一個協程需要執行特殊的請求或者一些特殊的背景任務的時候,為了調式友善最好是給協程取一個有意義的名字。線程的名字一樣,上下文元素CoroutineName 可以達到你的目的;當調試模式打開的時候它就會關聯到運作此協程的線程名字中。

下面的代碼證明了剛剛的理論:

package com.cool.cleaner.test

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking(CoroutineName("main")) {
    log("Started main coroutine")
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(100)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
           

使用JVM選項-Dkotlinx.coroutines.debug運作上面的程式,輸出如下:

[main @main#1] Started main coroutine
[main @v2coroutine#3] Computing v2
[main @v1coroutine#2] Computing v1
[main @main#1] The answer for v1 / v2 = 42

Process finished with exit code 0
           

9、組合協程上下文元素(Combining context elements)

有時候我們需要為一個協程指定多個上下文元素,此時我們可以使用操作符"+",舉個例子:我們可以在啟動一個協程的時候同時為它指定協程排程器和名字:

package com.cool.cleaner.test

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread ${Thread.currentThread().name}")
    }
}
           

使用JVM參數-Dkotlinx.coroutines.debug運作這段代碼的輸出結果如下:

I'm working in thread DefaultDispatcher-worker-1 @test#2

Process finished with exit code 0
           

10、協程作用域(Coroutine scope)

這裡舉個例子,這個例子将會使用到之前關于上下文、job、父子協程的知識;假如我們的應用程式有個具有生命周期的對象,但是這個對象并不是一個協程;比如我們在寫Android程式的時候會在activity中啟動不同的協程取擷取資料、更新資料,為了避免記憶體洩漏在activity頁面關閉的時候我們就需要取消所有已經啟動的協程,你當然可以自己追蹤協程上下文和job的引用并在适當的時候結束他們,但是協程庫kotlinx.coroutines為我們提供了一個 CoroutineScope的抽象,你應該已經熟悉協程作用域了因為所有的協程建構器都是 CoroutineScope的一個擴充。

我們可以建立一個綁定到activity生命周期的協程作用域 CoroutineScope,然後用它來管理協程的生命周期,一個協程作用域可以使用工廠函數CoroutineScope() 或 MainScope() 來建立,前者建立一個普通的作用域,後者建立一個針對UI線程的作用域并且使用Dispatchers.Main 作為預設的協程排程器,相關模拟代碼如下:

class Activity {
        private val mainScope = MainScope()

        fun destroy(): Unit {
            mainScope.cancel()
        }
    }
           

現在我們可以在這個Activity中使用定義的mainScope啟動協程了,這裡我們啟動了10個協程然後延時不同的時間,代碼如下:

class Activity {
        private val mainScope = MainScope()

        fun destroy(): Unit {
            mainScope.cancel()
        }

        fun doSomething(): Unit {//在mainScope中啟動多個協程
            repeat(10) { index ->
                mainScope.launch {
                    delay((index + 1) * 200L)
                    println("Coroutine $index is done")
                }
            }
        }
    }
           

在主函數裡面,我們建立activity對象,調用

doSomething

 函數,然後500ms後銷毀activity,這将會銷毀所有從

doSomething

 中啟動的協程,你可以看到activity對象銷毀後就沒有更多的輸出了,即使我們等待時間長一點也是一樣,最終代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.*

class Activity {
    private val mainScope = MainScope()

    fun destroy(): Unit {
        mainScope.cancel()
    }

    fun doSomething(): Unit {//在mainScope中啟動多個協程
        repeat(10) { index ->
            mainScope.launch {
                delay((index + 1) * 200L)
                println("Coroutine $index is done")
            }
        }
    }
}
fun main() = runBlocking<Unit> {
    val activity = Activity()
    activity.doSomething()
    println("Launched coroutines")
    delay(500)
    println("Destroying activity")
    activity.destroy()
    delay(1000)
}
           

下面是輸出:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!
           

如你所見,隻有前兩個協程輸出了日志,其他的都被Activity.destroy()中的job.cancel()調用取消了。

請注意:我在Android Studio上的測試結果和官網的有出入,你自己也可以試試喔。

11、線程本地存儲(Thread-local data)

有時候在協程之間傳遞一些Thread-local資料會比較有用,然而因為協程并不會綁定到任何一個協程,是以如果你想自己實作的話會比較麻煩。

對于

ThreadLocal來說,擴充函數

asContextElement 可以幫助你解決問題,它會建立一個額外的上下文元素來儲存

ThreadLocal

 的值,然後在協程切換上下文的時候恢複它,如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.*

val threadLoca = ThreadLocal<String?>()
fun main() = runBlocking<Unit> {
    threadLoca.set("main")
    println("Pre-main, current thread:     ${Thread.currentThread()}, thread local value: '${threadLoca.get()}'")
    val job = launch(Dispatchers.Default + threadLoca.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLoca.get()}'")
        yield()
        println("After yield, current thread:  ${Thread.currentThread()}, thread local value: '${threadLoca.get()}'")
    }
    job.join()
    println("Post-main, current thread:    ${Thread.currentThread()}, thread local value: '${threadLoca.get()}'")
}
           

在這個例子中,我們使用排程器Dispatchers.Default啟動了一個運作于背景線程池的協程,是以它會運作于線程池中的不同線程,但不管協程運作于哪個線程它都會擁有我們用threadLocal.asContextElement(value = "launch")設定的值,輸出如下:

Pre-main, current thread:     Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread:  Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread:    Thread[main @coroutine#1,5,main], thread local value: 'main'

Process finished with exit code 0
           

假如你忘記了設定額外的上下文元素,而運作協程的線程又是不同的,那你從協程從取出來的線程本地存儲值将會是不可意料的(就是可能每次取出來的值都不一樣)。為了避免這種情況,推薦使用ensurePresent和不當使用的時候啟用快速失敗機制。

在kotlinx.coroutines庫中,ThreadLocal得到了首選支援并且可以使用庫提供的任何原語;它的一個不足就是:當一個thread-local資料改變的時候,新值不會傳遞給協程的調用者(因為一個上下文元素無法跟蹤所有ThreadLocal對象的通路),而新值會在下一次挂起的時候丢失。在協程中可以使用函數 withContext 更新thread-local的值,可以在asContextElement這了解更多的資訊。

還有另一種儲存thread-local資料的方式就是把它存放在一個包裝類中比如class Counter(var i: Int),然後把包裝類當作thread-local來存儲,然而這種情況下你就要完全負責包裝類對象的同步通路了(自己進行同步,避免線程競争)。

對于一些進階應用,比如與logging MDC內建、事務性上下文或者其他在内部使用thread-locals傳遞資料的第三方庫,你可以看看關于ThreadContextElement 接口的文檔以便了解哪些接口是應該要實作的。