天天看點

再談協程之suspend到底挂起了啥

Kotlin編譯器會給每一個suspend函數生成一個狀态機來管理協程的執行。

Coroutines簡化了Android上的異步操作。正如文檔中所解釋的,我們可以用它們來管理異步任務,否則可能會阻塞主線程,導緻你的應用程式Crash。

Coroutines也有助于用指令式的代碼取代基于回調的API。

作為例子,我們先看看這個使用回調的異步代碼。

// Simplified code that only considers the happy path
fun loginUser(userId: String, password: String, userResult: Callback<User>) {
// Async callbacks
userRemoteDataSource.logUserIn { user ->
// Successful network request
userLocalDataSource.logUserIn(user) { userDb ->
// Result saved in DB
userResult.success(userDb)
    }
  }
}      

這些回調可以使用coroutines轉換為順序的函數調用。

suspend fun loginUser(userId: String, password: String): User {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
return userDb
}      

在coroutines代碼中,我們給函數添加了suspend修飾符。這将告訴編譯器,這個函數需要在一個coroutine内執行。作為一個開發者,你可以把suspend函數看作是一個普通的函數,但它的執行可能被挂起,并在某個時候恢複。

簡而言之,suspend就是一種編譯器生成的回調。

與回調不同的是,coroutines提供了一種線上程之間切換和處理異常的簡單方法。

但是,當我們把函數标記為suspend時,編譯器實際上在幕後做了什麼?

Suspend到底做了什麼

回到loginUser的suspend函數,注意它調用的其他函數也是suspend函數。

suspend fun loginUser(userId: String, password: String): User {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
return userDb
}
// UserRemoteDataSource.kt
suspend fun logUserIn(userId: String, password: String): User
// UserLocalDataSource.kt
suspend fun logUserIn(userId: String): UserDb      

簡而言之,Kotlin編譯器将使用有限狀态機(我們将在後面介紹)把suspend函數轉換為優化版本的回調實作。你說對了,編譯器會幫你寫這些回調,它們的本質,依然是回調!

Continuation的真面目

suspend函數之間的通信方式是使用Continuation對象。一個Continuation隻是一個帶有一些額外資訊的通用回調接口。正如我們稍後将看到的,它将代表一個suspend函數的生成狀态機。

讓我們看一下它的定義。

interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(value: Result<T>)
}      
  • context是在continuation中使用的CoroutineContext。
  • resumeWith用一個Result來恢複Coroutine的執行,這個Result可以包含一個導緻suspend的計算結果的值或者是一個異常。
注意:從Kotlin 1.3開始,你還可以使用擴充函數resume(value: T)和resumeWithException(exception: Throwable),它們是resumeWith調用的特殊版本。

編譯器将使用函數簽名中的額外參數completion(Continuation類型)替換suspend修飾符,該參數将用于将suspend函數的結果傳達給調用它的coroutine。

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
completion.resume(userDb)
}      

為了簡單起見,我們的例子将傳回Unit而不是User。User對象将在添加的Continuation參數中被 "傳回"。

suspend函數的位元組碼實際上傳回 Any? 因為它是 (T | COROUTINE_SUSPENDED)的聯合類型。這允許函數在可以時同步傳回。

注意:如果你用suspend修飾符标記一個不調用其他suspend函數的函數,編譯器也會添加額外的Continuation參數,但不會對它做任何事情,函數體的位元組碼看起來就像一個普通函數。

你也可以在其他地方看到Continuation接口。

  • 當使用suspendCoroutine或suspendCancellableCoroutine将基于回調的API轉換為coroutine時(你應該總是傾向于使用這種方法),你直接與Continuation對象互動,以恢複在運作時被suspend的作為參數傳遞的代碼塊。
  • 你可以使用suspend函數上的startCoroutine擴充函數來啟動一個coroutine。它接收一個Continuation對象作為參數,當新的coroutine完成時,無論是結果還是異常,都會被調用。

切換不同的Dispatchers

你可以在不同的Dispatchers之間進行交換,在不同的線程上執行計算。那麼Kotlin如何知道在哪裡恢複一個暫停的計算?

Continuation有一個子類型,叫做DispatchedContinuation,它的resume函數可以對CoroutineContext中可用的Dispatcher進行排程調用。除了Dispatchers.Unconfined的isDispatchNeeded函數覆寫(在dispatch之前調用)總是傳回false,所有Dispatcher都會調用dispatch。

在協程中,有個不成文的約定,那就是,suspend函數預設是不阻塞線程的,也就是說,suspend函數的調用者,不用為suspend函數運作在哪個線程而擔心,suspend函數會自己處理它工作的線程,不大部分時候,都是通過withContext來進行切換的。

生成狀态機

免責聲明:文章其餘部分所展示的代碼将不完全符合編譯器所生成的位元組碼。它将是足夠準确的Kotlin代碼,使你能夠了解内部真正發生的事情。這種表示法是由Coroutines 1.3.3版本生成的,在該庫的未來版本中可能會發生變化。

Kotlin編譯器将識别函數何時可以在内部suspend。每個suspend point都将被表示為有限狀态機中的一個狀态。這些狀态由編譯器用标簽表示,前面示例中的suspend函數在編譯後,會産生類似下面的僞代碼。

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
// Label 0 -> first execution
val user = userRemoteDataSource.logUserIn(userId, password)
// Label 1 -> resumes from userRemoteDataSource
val userDb = userLocalDataSource.logUserIn(user)
// Label 2 -> resumes from userLocalDataSource
completion.resume(userDb)
}      

為了更好地表示狀态機,編譯器将使用一個when語句來實作不同的狀态。

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
when(label) {
0 -> { // Label 0 -> first execution
userRemoteDataSource.logUserIn(userId, password)
    }
1 -> { // Label 1 -> resumes from userRemoteDataSource
userLocalDataSource.logUserIn(user)
    }
2 -> { // Label 2 -> resumes from userLocalDataSource
completion.resume(userDb)
    }
else -> throw IllegalStateException(...)
  }
}      
編譯器将suspend函數編譯成帶有Continuation參數的方法叫做CPS(Continuation-Passing-Style)變換。

這段代碼是不完整的,因為不同的狀态沒有辦法分享資訊。編譯器會在函數中使用相同的Continuation對象來做這件事。這就是為什麼Continuation的泛型是Any? 而不是原始函數的傳回類型(即User)。

此外,編譯器将建立一個私有類,1)持有所需的資料,2)遞歸地調用loginUser函數以恢複執行。你可以看看下面這個生成的類的近似值。

免責聲明:注釋不是由編譯器生成的。我添加它們是為了解釋它們的作用,并使跟随代碼更容易了解。
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
class LoginUserStateMachine(
// completion parameter is the callback to the function 
// that called loginUser
completion: Continuation<Any?>
  ): CoroutineImpl(completion) {
// Local variables of the suspend function
var user: User? = null
var userDb: UserDb? = null
// Common objects for all CoroutineImpls
var result: Any? = null
var label: Int = 0
// this function calls the loginUser again to trigger the
// state machine (label will be already in the next state) and
// result will be the result of the previous state's computation
override fun invokeSuspend(result: Any?) {
this.result = result
loginUser(null, null, this)
    }
  }
...
}      

由于invokeSuspend将僅用Continuation對象的資訊來再次調用loginUser,loginUser函數簽名中的其餘參數都變成了空值。在這一點上,編譯器隻需要添加如何在狀态之間轉移的資訊。

它需要做的第一件事是知道1)這是函數第一次被調用,或者2)函數已經從之前的狀态恢複。它通過檢查傳入的continuation是否是LoginUserStateMachine類型來實作。

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
...
val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
...
}      

如果是第一次,它将建立一個新的LoginUserStateMachine執行個體,并将收到的完成執行個體作為一個參數存儲起來,這樣它就能記住如何恢複調用這個執行個體的函數。如果不是這樣,它将隻是繼續執行狀态機(suspend函數)。

現在,讓我們看看編譯器為在狀态間移動和在狀态間共享資訊而生成的代碼。

/* Copyright 2019 Google LLC. 
SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
...

val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

when(continuation.label) {
0 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Next time this continuation is called, it should go to state 1
continuation.label = 1
// The continuation object is passed to logUserIn to resume 
// this state machine's execution when it finishes
userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
1 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.user = continuation.result as User
// Next time this continuation is called, it should go to state 2
continuation.label = 2
// The continuation object is passed to logUserIn to resume 
// this state machine's execution when it finishes
userLocalDataSource.logUserIn(continuation.user, continuation)
        }
... // leaving out the last state on purpose
    }
}      

花點時間浏覽一下上面的代碼,看看你是否能發現與前面的代碼片斷的不同之處。讓我們看看編譯器生成了什麼。

  • when語句的參數是LoginUserStateMachine執行個體中的Label。
  • 每次處理一個新的狀态時,都會有一個檢查,以防這個函數suspend時發生異常。
  • 在調用下一個suspend函數(即logUserIn)之前,LoginUserStateMachine執行個體的Label将被更新為下一個狀态。
  • 當在這個狀态機内部有一個對另一個suspend函數的調用時,continuation的執行個體(LoginUserStateMachine類型)被作為一個參數傳遞。要調用的suspend函數也已經被編譯器轉化了,它是另一個像這樣的狀态機,它把一個continuation對象也作為參數!當那個suspend函數的狀态機完成後,它将恢複這個狀态機的執行。

最後一個狀态是不同的,因為它必須恢複調用這個函數的執行,正如你在代碼中看到的,它對存儲在LoginUserStateMachine中的cont變量(在構造時)調用resume。

/* Copyright 2019 Google LLC. 
SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
...

val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

when(continuation.label) {
...
2 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.userDb = continuation.result as UserDb
// Resumes the execution of the function that called this one
continuation.cont.resume(continuation.userDb)
        }
else -> throw IllegalStateException(...)
    }
}      

正如你所看到的,Kotlin編譯器為我們做了很多事情!從這個suspend函數功能來舉例。

suspend fun loginUser(userId: String, password: String): User {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
return userDb
}      

編譯器為我們生成了下面這一切。

/* Copyright 2019 Google LLC. 
SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {

class LoginUserStateMachine(
// completion parameter is the callback to the function that called loginUser
completion: Continuation<Any?>
    ): CoroutineImpl(completion) {
// objects to store across the suspend function
var user: User? = null
var userDb: UserDb? = null

// Common objects for all CoroutineImpl
var result: Any? = null
var label: Int = 0

// this function calls the loginUser again to trigger the 
// state machine (label will be already in the next state) and 
// result will be the result of the previous state's computation
override fun invokeSuspend(result: Any?) {
this.result = result
loginUser(null, null, this)
        }
    }

val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

when(continuation.label) {
0 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Next time this continuation is called, it should go to state 1
continuation.label = 1
// The continuation object is passed to logUserIn to resume 
// this state machine's execution when it finishes
userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
1 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.user = continuation.result as User
// Next time this continuation is called, it should go to state 2
continuation.label = 2
// The continuation object is passed to logUserIn to resume 
// this state machine's execution when it finishes
userLocalDataSource.logUserIn(continuation.user, continuation)
        }
2 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.userDb = continuation.result as UserDb
// Resumes the execution of the function that called this one
continuation.cont.resume(continuation.userDb)
        }
else -> throw IllegalStateException(...)
    }
}      

Kotlin編譯器将每個suspend函數轉化為一個狀态機,在每次函數需要suspend時使用回調進行優化。

現在你知道了編譯器在編譯時到底做了什麼,你就可以更好地了解為什麼一個suspend函數在它執行完所有工作之前不會傳回。另外,你也會知道,代碼是如何在不阻塞線程的情況下進行suspend的——這是因為,當函數恢複時需要執行的資訊被存儲在Continuation對象中!

作者:徐宜生