天天看點

Kotlin協程簡介(一)

目錄:

  • 一. 協程的基本概念
  • 二. 從異步程式設計開始
    • 回調
    • CompletableFuture
    • RxJava
    • 協程
  • 三. 協程的基本概念
    • suspend funtion
    • CoroutineScope
    • CoroutineContext
    • CoroutineDispatcher
    • Job 和 Deffered
    • Coroutine builders

一. 協程的基本概念

協程就像非常輕量級的線程。線程是由系統排程的,線程切換或線程阻塞的開銷都比較大。而協程依賴于線程,但是協程挂起時不需要阻塞線程,幾乎是無代價的,協程是由開發者控制的。

二. 從異步程式設計開始

我們先從一個例子說起,發送一個帶有認證的 post 請求,需要以下三個步驟,首先用戶端向服務其發送一個得到token的請求,然後構造一個 Post 請求,最後将 Post 請求發出去。這三個請求都是耗時操作,而且請求和請求之間有着依賴的關系。

fun requestToken(): Token {
    delay(500L) // 模拟請求過程
    return token 
}

fun createPost(token: Token, item: Item): Post {
    delay(500L) // 模拟構造過程
    return post  
}

fun processPost(post: Post) {
    delay(500L) // 模拟請求過程
}
           

方法一:使用回調的方式

操作2依賴于操作1,是以把操作2作為回調放在操作1的參數内,由操作1決定回調時機。

fun requestTokenAsync(cb: (Token) -> Unit) { ... }
fun createPostAsync(token: Token, item: Item, cb: (Post) -> Unit) { ... }
fun processPost(post: Post) { ... }

fun postItem(item: Item) {
    requestTokenAsync { token ->
        createPostAsync(token, item) { post ->
            processPost(post)
        }
    }
}
           

這種多層嵌套的方式比較複雜,而且不友善處理異常情況。

方法二:CompletableFuture

Java 8 引入的 CompletableFuture 可以将多個任務串聯起來,可以避免多層嵌套的問題。

可以簡單看一下API,具體的使用方法參考文章:

CompletableFuture 使用詳解

方法 作用
runAsync 建立一個異步操作,不支援傳回值
supplyAsync 建立一個異步操作,支援傳回值
whenComplete 計算結果完成的回調方法
exceptionally 計算結果出現異常的回調方法
thenApply 當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化。
handle 與thenApply相似,handle還可以處理異常任務
thenAccept 與thenApply相似,但是沒有傳回值
thenRun 與thenAccept相似,但是得不到上面任務的處理結果
thenCombine 合并任務,有傳回值
thenAcceptBoth 合并任務,無傳回值
applyToEither 兩個任務用哪個結果
acceptEither 誰傳回的結果快使用那個結果
runAfterEither 任何一個完成都會執行下一步操作
runAfterBoth 都完成了才會執行下一步操作
thenCompose 允許你對兩個 CompletionStage 進行流水線操作,第一個操作完成時,将其結果作為參數傳遞給第二個操作。

知道了API後就可以這麼寫

fun requestTokenAsync(): CompletableFuture<Token> { ... }
fun createPostAsync(token: Token, item: Item): CompletableFuture<Post> { ... }
fun processPost(post: Post) { ... }

fun postItem(item: Item) {
    requestTokenAsync()
            .thenCompose { token -> createPostAsync(token, item) }
            .thenAccept { post -> processPost(post) }
            .exceptionally { e ->
                e.printStackTrace()
                null
            }
}
           

方法三: RxJava

RxJava的用法跟CompletableFuture鍊式調用比較類似,這也是比較簡潔,比較多人使用的方式:

fun requestToken(): Token { ... }
fun createPost(token: Token, item: Item): Post { ... }
fun processPost(post: Post) { ... }

fun postItem(item: Item) {
    Single.fromCallable { requestToken() }
            .map { token -> createPost(token, item) }
            .subscribe(
                    { post -> processPost(post) }, // onSuccess
                    { e -> e.printStackTrace() } // onError
            )
}
           

方法四:協程的方式

suspend fun requestToken(): Token { ... }   // 挂起函數
suspend fun createPost(token: Token, item: Item): Post { ... }  // 挂起函數
fun processPost(post: Post) { ... }

fun postItem(item: Item) {
    GlobalScope.launch {
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
        // 需要異常處理,直接加上 try/catch 語句即可
    }
}
           

協程可以讓我們使用順序的方式去寫異步代碼,而且并不會阻塞UI線程。

三. 協程的基本概念

1. suspend funtion

我們寫的有兩個方法是挂起的函數(suspend function)

suspend fun requestToken(): Token { ... }
suspend fun createPost(token: Token, item: Item): Post { ... } 
           

首先要知道的是,挂起函數挂起協程的時候,并不會阻塞線程。

然後一個 suspend function 隻能在一個協程或一個 suspend function 中使用,但是suspend function和普通函數使用方法一樣,有自己的參數,有自己的傳回值,那麼為什麼要使用suspend funtion呢?

我們可以看到delay函數是一個挂起函數 , Thread.sleep()是一個阻塞函數,如果我們在一個A函數可能會挂起協程,比如調用delay()方法,因為 delay() 是suspend function ,隻能在一個協程或一個suspend function中使用,是以A函數也必須是suspend function。是以使用suspend funtion的标準是該函數有無挂起操作。

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine [email protected] { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}
           

2. CoroutineScope

CoroutineScope為協程的作用域,可以管理其域内的所有協程。一個CoroutineScope可以有許多的子scope。

建立子scope的方式有許多種, 常見的方式有:

方法一:使用lauch, async 等builder建立一個新的子協程。

我們來看一下CoroutineScop接口

// 每個Coroutine作用域都有一個Coroutine上下文
public interface CoroutineScope {
    // Scope 的 Context
    public val coroutineContext: CoroutineContext
}
           

是以 CoroutineScope 隻是定義了一個新 Coroutine 的 coroutineContext,其實每個 coroutine builder(launch

,async) 都是 CoroutineScope 的擴充函數,并且自動的繼承了目前 Scope 的 coroutineContext 和取消操作。我們以launch方法為例:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
           
  • 第一個參數 context,預設 launch 所建立的 Coroutine 會自動繼承目前 Coroutine 的 context,如果有額外的 conetxt 需要傳遞給所建立的 Coroutine 則可以通過第一個參數來設定。
  • 第二個參數 start 為 CoroutineStart 枚舉類型,用來指定 Coroutine 啟動的選項。有如下幾個取值:

    - DEFAULT (預設值)立刻安排執行該Coroutine執行個體

    - LAZY 延遲執行,隻有當用到的時候才執行

    - ATOMIC 類似 DEFAULT,差別是當Coroutine還沒有開始執行的時候無法取消

    - UNDISPATCHED 如果使用 Dispatchers.Unconfined dispatcher,則立刻在目前線程執行直到遇到第一個suspension point。然後當 Coroutine 恢複的時候,在繼續在 suspension的 context 中設定的 CoroutineDispatcher 中執行。

  • 第三個參數 block 為一個 suspending function,這個就是 Coroutine 中要執行的代碼塊,在實際使用過程中通常使用 lambda 表達式,也稱之為 Coroutine 代碼塊。需要注意的是,這個 block 函數定義為 CoroutineScope 的擴充函數,是以在代碼塊中可以直接通路 CoroutineScope 對象(也就是 this 對象)

結論:launch方法實際上就是new了一個LazyStandaloneCoroutine協程(isLazy屬性為false),協程自動的繼承了目前 Scope(this代表的協程scope) 的 coroutineContext 和取消操作。

方法二:使用coroutineScope Api建立新scope:

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
           

這個api主要用于友善地建立一個子域(相當于建立一個局部作用域),并且管理域中的所有子協程。注意這個方法隻有在所有 block中建立的子協程全部執行完畢後,才會退出。

// print輸出的結果順序将會是 1, 2, 3, 4
coroutineScope {
          delay(1000)
          println("1")
          launch { 
              delay(6000) 
              println("3")
          }
          println("2")
          [email protected]
      }
      println("4")
           

方法三:繼承CoroutineScope.這也是比較推薦的做法,用于處理具有生命周期的對象。

在 Android 環境中,通常每個界面(Activity、Fragment 等)啟動的 Coroutine 隻在該界面有意義,如果使用者在等待 Coroutine 執行的時候退出了這個界面,則再繼續執行這個 Coroutine 可能是沒必要的。那麼我們怎麼讓activity管理好其内的 Coroutine 呢?

我們來看下面的例子:

class ScopedActivity : Activity(), CoroutineScope {
    lateinit var job: Job
    // CoroutineScope 的實作
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Main + job

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        job = Job()
    }

    override fun onDestroy() {
        super.onDestroy()
        // 當 Activity 銷毀的時候取消該 Scope 管理的 job。
        // 這樣在該 Scope 内建立的子 Coroutine 都會被自動的取消。
        job.cancel()
    }

    /*
     * 注意 coroutine builder 的 scope, 如果 activity 被銷毀了或者該函數内建立的 Coroutine
     * 抛出異常了,則所有子 Coroutines 都會被自動取消。不需要手工去取消。
     */
    fun loadDataFromUI() = launch { // <- 自動繼承目前 activity 的 scope context,是以在 UI 線程執行
        val ioData = async(Dispatchers.IO) { // <- launch scope 的擴充函數,指定了 IO dispatcher,是以在 IO 線程運作
            // 在這裡執行阻塞的 I/O 耗時操作
        }
        // 和上面的并非 I/O 同時執行的其他操作
        val data = ioData.await() // 等待阻塞 I/O 操作的傳回結果
        draw(data) // 在 UI 線程顯示執行的結果
    }
}
           

解釋一下這個地方:

get() = Dispatchers.Main + job

一個上下文(context)可以是多個上下文的組合。組合的上下文需要是不同的類型。是以,你需要做兩件事情:

  • 一個 dispatcher: 用于指定協程預設使用的 dispatcher;
  • 一個 job: 用于在任何需要的時候取消協程;

操作符号 + 用于組合上下文。如果兩種不同類型的上下文相組合,會生成一個組合的上下文(CombinedContext),這個新的上下文會同時擁有被組合上下文的特性。因為:

get() = Dispatchers.Main + job

,是以launch方法實際上是在Dispatchers.Main,也就是在UI線程中執行的。

3. CoroutineContext

CoroutineScope 可以了解為一個協程,裡面有一個協程的上下文:CoroutineContext,這個協程上下文包含很多該協程的資訊,比如:Job, ContinuationInterceptor, CoroutineName 和CoroutineId。在CoroutineContext中,是用map來存這些資訊的, map的鍵是這些類的伴生對象,值是這些類的一個執行個體,你可以這樣子取得context的資訊:

val job = context[Job]
val continuationInterceptor = context[ContinuationInterceptor]
           

4. CoroutineDispatcher

CoroutineDispatcher,協程排程器,決定協程所在的線程或線程池。它可以指定協程運作于特定的一個線程、一個線程池或者不指定任何線程(這樣協程就會運作于目前線程)。coroutines-core中 CoroutineDispatcher 有四種标準實作Dispatchers.Default、Dispatchers. IO,Dispatchers.Main 和 Dispatchers.Unconfined,Unconfined 就是不指定線程。

  • Dispatchers.Default: 如果建立 Coroutine 的時候沒有指定 dispatcher,則一般預設使用這個作為預設值。Default dispatcher 使用一個共享的背景線程池來運作裡面的任務。
  • Dispatchers. IO: 顧名思義這是用來執行阻塞 IO 操作的,也是用一個共享的線程池來執行裡面的任務。根據同時運作的任務數量,在需要的時候會建立額外的線程,當任務執行完畢後會釋放不需要的線程。通過系統 property kotlinx.coroutines.io.parallelism 可以配置最多可以建立多少線程,在 Android 環境中我們一般不需要做任何額外配置。
  • Dispatchers.Unconfined: 立刻在啟動 Coroutine 的線程開始執行該 Coroutine直到遇到第一個 suspension point。也就是說,coroutine builder 函數在遇到第一個 suspension point 的時候才會傳回。而 Coroutine 恢複的線程取決于 suspension function 所在的線程。 一般而言我們不使用 Unconfined。
  • Dispatchers.Main: 是在 Android 的 UI 線程執行。
  • 通過 newSingleThreadContext 和 newFixedThreadPoolContext 函數可以建立在私有的線程池中運作的 Dispatcher。由于建立線程比較消耗系統資源,是以對于臨時建立的線程池在使用完畢後需要通過 close 函數來關閉線程池并釋放資源。

5. Job 和 Deffered

CoroutineScope.launch 函數傳回一個 Job 對象,該對象代表了這個剛剛建立的 Coroutine執行個體,job 對象有不同的狀态(剛建立的狀态、活躍的狀态、執行完畢的狀态、取消狀态等),通過這個 job 對象可以控制這個 Coroutine 執行個體,比如調用 cancel 函數可以取消執行。Job對象持有所有的子job執行個體,可以取消所有子job的運作。Job的join方法會等待自己以及所有子job的執行, 是以Job給予了CoroutineScope一個管理自己所有子協程的能力。

CoroutineScope.async 函數也是三個參數,參數類型和 launch 一樣,唯一的差別是第三個block參數會傳回一個值,而 async 函數的傳回值為 Deferred 類型。可以通過 Deferred 對象擷取異步代碼塊(block)傳回的值。Deferred 繼承了 Job,它有個 await() 方法。

// Awaits for completion of this value without blocking a thread and resumes when deferred computation is complete,
// returning the resulting value or throwing the corresponding exception if the deferred was cancelled.
public suspend fun await(): T
           

6. Coroutine builders

  1. CoroutineScope.launch : 不阻塞目前線程,在背景建立一個新協程,也可以指定協程排程器,無傳回值
  2. CoroutineScope.async : 在背景建立一個新協程,有傳回值
  3. runBlocking :

建立一個新的協程來阻塞目前線程,直到 runBlocking 代碼塊執行完成。通常它不會用于協程中,因為在協程中寫一個阻塞的代碼塊實在太别扭,可以通過挂起操作取代。它通常作為一個擴充卡,将 main 線程轉換成一個 main 協程,我們也就持有了一個 main 協程的 coroutineContext 上下文對象,就可以随心所欲用(this)使用 coroutineContext 的擴充方法,随心所欲使用 suspend 方法 ( suspend 方法隻能用于 suspend 方法和協程中)。是以 runBlocking 一般用在 test 函數和 main 函數中。

  1. withContext :

withContext 不會建立一個新的協程,在指定的協程上運作代碼塊,并挂起該協程直到代碼塊運作完成。通常是用于切換協程的上下文。

例如:

// 使用 withContext 切換協程,上面的例子就是先在 IO 線程裡執行,然後切換到主線程。
GlobalScope.launch(Dispatchers.IO) {
    ...
    withContext(Dispatchers.Main) {
        ...
    }
}
           

繼續閱讀