Kotlin協程中使用挂起函數(Suspend函數)可以異步地傳回單個計算結果,但是如果有多個計算結果希望通過協程的方式異步傳回,這時可以使用Flows(基于kotlin v. 1.3.61)。
傳回多個值的方式
使用Collections
一般我們可以使用集合類存儲多個值,例如foo()傳回一個list,包含3各成員,可以通過foreach對其周遊并列印結果
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
輸出:
1
2
3
使用Sequences
除了list以外,我們可以使用squences對單個數值計算後逐條傳回,但是計算過程是同步的
fun foo(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
foo().forEach { value -> println(value) }
}
列印結果相同,每100ms輸出一個
使用挂起函數 Suspending functions
上面的計算會阻塞UI線程,此時我們會考慮使用協程挂起的方式傳回結果:
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
1s後結果被一次性輸出
使用Flow
使用 List<Int> 意味着我們的結果隻能一次性傳回,此時還有一個選擇就是使用Flow<Int>,可以像Sequence<Int>一樣逐條計算後傳回流式結果,同時計算可以在異步完成,不會阻塞UI
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
數值和“I'm not blocked”并行列印,說明collect并沒有阻塞UI
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
通過上面例子可以看到,Flow有以下特征:
- flow{ ... } 建構一個Flow類型
- flow { ... }内可以使用suspend函數.
- foo()不需要是suspend函數
- emit方法用來發射資料
- collect方法用來周遊結果
Flow 是冷流
Flows像Sequences一樣是冷流,即在調用collect之前,flow{ ... }中的代碼不會執行
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
輸出結果:
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
這也是 foo() 不需要标記為suspend的原因,因為foo()中的執行不需要挂起,可以很快傳回結果,等到調用collect的時候才開始執行。
Flow 的取消
Flow建立後并不傳回可以cancel的句柄,但是一個flow的collect是suspend的,是以可以像取消一個suspend方法一樣取消flow的collection。就好像RxJava中沒有subscribe之前僅僅建立一個Observable是不需要unsubscribe的
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
輸出結果:
Emitting 1
1
Emitting 2
2
Done
Flow 的建立
從前面的例子中我們知道可以通過 flow { ... } 建立Flow。除此之外還有以下兩種方式
- flowOf 建立一個保護固定數量的flow,類似listOf
- 任意集合類或者squence通過.asFlow()轉成一個flow
例如可以将一個IntRange轉成一個flow
(1..3).asFlow().collect { value -> println(value) }