天天看點

Kotlin Flow(一)基本使用

Kotlin協程中使用挂起函數(Suspend函數)可以異步地傳回單個計算結果,但是如果有多個計算結果希望通過協程的方式異步傳回,這時可以使用Flows(基于kotlin v. 1.3.61)。

傳回多個值的方式

使用Collections

一般我們可以使用集合類存儲多個值,例如​foo()​傳回一個list,包含3各成員,可以通過foreach對其周遊并列印結果

fun foo(): List<Int> = listOf(1, 2, 3)

fun main() {
    foo().forEach { value -> println(value) } 
}

           

輸出:

1
2
3
           

使用Sequences

除了list以外,我們可以使用squences對單個數值計算後逐條傳回,但是計算過程是同步的

fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}
​
fun main() {
    foo().forEach { value -> println(value) } 
}

           

列印結果相同,每100ms輸出一個

使用挂起函數 Suspending functions

上面的計算會阻塞UI線程,此時我們會考慮使用協程挂起的方式傳回結果:

suspend fun foo(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    foo().forEach { value -> println(value) } 
}

           

1s後結果被一次性輸出

使用Flow

使用 ​List<Int>​ 意味着我們的結果隻能一次性傳回,此時還有一個選擇就是使用Flow<Int>,可以像Sequence<Int>一樣逐條計算後傳回流式結果,同時計算可以在異步完成,不會阻塞UI

fun foo(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    foo().collect { value -> println(value) } 
}
           

數值和“I'm not blocked”并行列印,說明collect并沒有阻塞UI

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
           

通過上面例子可以看到,Flow有以下特征:

  • ​flow{ ... }​ 建構一個Flow類型
  • ​flow { ... }​内可以使用suspend函數.
  • foo()​不需要是suspend函數
  • emit方法用來發射資料
  • collect方法用來周遊結果

Flow 是冷流

Flows像Sequences一樣是冷流,即在調用collect之前,flow{ ... }中的代碼不會執行

fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
           

輸出結果:

Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
           

這也是 ​foo()​ 不需要标記為suspend的原因,因為​foo()​中的執行不需要挂起,可以很快傳回結果,等到調用collect的時候才開始執行。

Flow 的取消

Flow建立後并不傳回可以cancel的句柄,但是一個flow的collect是suspend的,是以可以像取消一個suspend方法一樣取消flow的collection。就好像RxJava中沒有subscribe之前僅僅建立一個Observable是不需要unsubscribe的 

fun foo(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        foo().collect { value -> println(value) } 
    }
    println("Done")
}
           

輸出結果:

Emitting 1
1
Emitting 2
2
Done
           

Flow 的建立

從前面的例子中我們知道可以通過 ​flow { ... }​ 建立Flow。除此之外還有以下兩種方式

  • flowOf 建立一個保護固定數量的flow,類似listOf
  • 任意集合類或者squence通過​.asFlow()​轉成一個flow

例如可以将一個IntRange轉成一個flow

(1..3).asFlow().collect { value -> println(value) }
           

繼續閱讀