天天看點

5、異步流(Asynchronous Flow)

異步流(官方文檔)

目錄

1、多個值的表示

1.1、使用Sequences

1.2、使用suspend函數

1.3、使用Flow

2、Flow是延遲執行的

3、取消flow的基本使用

4、flow建構器

5、中間流操作符

5.1、變換算子(Transform operator)

5.2、容量限制操作符(Size-limiting operators)

6、終結操作符(Terminal flow operators)

7.flow流是順序的(Flows are sequential)

8、flow上下文(Flow context)

8.1、錯誤的使用withContext(Wrong emission withContext)

8.2、flowOn操作符

9、緩沖(Buffering)

9.1、合并(Conflation)

9.2、處理最新的值(Processing the latest value)

10、組合fow(Composing multiple flows)

10.1、Zip

10.2、Combine

11、Flattening flows

11.1、flatMapConcat

11.2、flatMapMerge

11.3、flatMapLatest

12、Flow的異常處理

12.1、由Collector端處理異常

12.2、Everyingthing is caught

13、Exception transparency

13.1、Transparent catch

13.2、Catching declaratively

14、Flow completion

14.1、指令式的try/finally

14.2、聲明式的執行

14.3、正常結束(Successful completion)

15、Imperative versus declarative

16、Launching flow

17、檢查flow的取消

17.1、Making busy flow cancellable

18、Flow及響應式流

一個suspend函數異步的傳回一個值,但是我們怎麼傳回多個異步計算的值呢?這就是kotlin流(Flows)存在的理由。

1、多個值的表示

在kotlin中可以使用 集合collections來表示多個值,比如我們有個simple函數用于傳回一個包含3個值的List,然後使用 forEach把他們列印出來,代碼如下:

package com.cool.cleaner.test

fun simple(): List<Int> =  listOf(1, 2, 3)

fun main(): Unit {
    simple().forEach { value -> println(value) }
}
           

下面是輸出:

1
2
3

Process finished with exit code 0
           

1.1、使用Sequences

假如我們要擷取的值是需要耗時(每個100ms)計算的,那我們可以使用 Sequence來包裝我們的值,代碼如下:

package com.cool.cleaner.test

fun simple(): Sequence<Int> =  sequence {
    for (i in 1..3) {
        Thread.sleep(1000)
        yield(i)//yield next value
    }
}

fun main(): Unit {
    simple().forEach { value -> println(value) }
    println("done")
}
           

輸出結果和上面一樣,但是在輸出每個數字之前都會有1000ms的延時。

1.2、使用suspend函數

不過上面使用Sequence的方式是會阻塞主線程的(很容易看到上面的例子隻有simple執行完後才會列印出"done"),當這些值都是異步計算的時候我們可以把simple函數改寫成suspend的,然後它就可以非阻塞的計算然後把結果作為一個list傳回,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

suspend fun simple(): List<Int> {
    delay(1000)
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) }
    println("done")
}
           

1秒後輸出3個數字。

1.3、使用Flow

使用傳回類型List<Int>意味着我們隻能一次傳回所有的值,使用

Flow<Int>

 可以表示數值流是異步計算的,就像我們使用Sequence<Int>表示同步計算一樣,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        emit(i)//emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(1000)
        }
    }
    simple().collect { value -> println(value) }
    println("done")
}
           

每隔1000ms列印出一個數值,并且沒有阻塞主線程,同時通過運作于主線程中的一個協程每隔1000ms列印 "I'm not blocked"證明确實沒有阻塞主線程,輸出如下:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
done

Process finished with exit code 0
           

下面是在代碼中使用 Flow與之前的例子不同的地方:

  1.  Flow 類型的建構函數是 flow。
  2. 位于

    flow { ... }

     中的代碼可以suspend。
  3. simple

     函數不再标記為suspend的。
  4. 從flow中發射資料使用emit函數。
  5. 從flow從收集資料使用collect 函數。
在simple函數的代碼塊

flow { ... }

 中,我們可以把delay替換成Thread.sleep,此時你将看到main線程已經阻塞。

2、Flow是延遲執行的

Flow就像sequences一樣,位于flow 建構者内部的代碼隻有在flow上調用collect的時候才會執行,下面的代碼很容易證明這點:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(1000)
        emit(i)//emit next value
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) }
    println("Calling collect again...")
    flow.collect { value -> println(value) }
}
           

下面是輸出:

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

Process finished with exit code 0
           

這是simple函數(傳回一個flow流)沒有用suspend修飾的原因,simple()本身很快就會傳回而不用等待任何東西;每次在flow上調用collect的時候flow都會開始執行,這就是為什麼當我們再次調用collect的時候會看到輸出 "Flow started"。

3、取消flow的基本使用

flow會與協程的協作式取消綁定在一起,通常情況下當flow挂起在(suspend)一個可以取消的suspend函數(比如 delay)中的時候,flow集合也是可以取消的。下面的代碼展示了當位于withTimeoutOrNull 中的代碼因為逾時而阻塞并且停止執行的時候它是如何取消flow的執行的:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("Emitting $i")
        emit(i)//emit next value
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) {//250ms後逾時傳回
        simple().collect { value -> println(value) }
    }
    println("Done")
}
           

請注意在simple函數中僅有2個數字從flow中發射出來(其實我隻看到輸出了1,你可以試試),輸出如下:

Emitting 1
1
Emitting 2
2
Done
           

4、flow建構器

上面使用的flow { ... }建構器是最基本的一個,還有其他更簡單的方式可以聲明異步流flows:

  1. flowOf 建構器發射固定的一些值。
  2. 不同的集合(collection)和序列(sequence)可以調用擴充函數.asFlow轉換成flow。

是以從一個flow中列印出1到3的例子可以如下編寫:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    (1..3).asFlow().collect { value -> println(value) }
}
           

5、中間流操作符

flow可以通過操作符來轉換,就像你将要使用的collection和sequence一樣。中間操作符會作用于一個上遊的流并傳回一個下遊的流,這些中間操作符就像flow一樣都是惰性的。對這樣一個操作符的調用本身不屬于挂起函數,它很快傳回一個經過變換的flow。

基本的操作符都有一些熟悉的名字,比如 map 和 filter,它和sequence的最重要的差別就是這些操作符裡面可以調用suspend函數。

比如,即使一個要映射到的請求是由suspend實作的耗時操作,flow也可以把進來的請求映射到這樣的一系列耗時請求上,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
           

每隔一秒輸出一行,輸出如下:

response 1
response 2
response 3

Process finished with exit code 0
           

5.1、變換算子(Transform operator)

在flow變換的一系列操作符中,其中最基礎的就是transform,它可以用來模仿一些簡單的變換,比如: map 和 filter,當然也可以用來實作一些複雜的變換;使用

transform

 操作符我們可以對任意值發射任意次數。

舉個例子,使用

transform

 我們可以在運作異步耗時任務請求之前發射一個字元串,然後才是發射耗時任務的執行結果,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.runBlocking

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }
}
           

下面是輸出結果:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Process finished with exit code 0
           

5.2、容量限制操作符(Size-limiting operators)

類似于take 的容量限制操作符會在達到相應門檻值時取消flow的執行,由于協程中的取消是用跑異常來表示的,是以你可以在取消操作的情況下使用所有的資源管理功能操作(比如代碼塊try { ... } finally { ... }),代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking

fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers()
        .take(2)
        .collect { number -> println(number) }
}
           

從輸出結果中可以看出在函數numbers()中的代碼塊flow { ... }在發射兩個資料後就停止發射資料了,輸出如下:

1
2
Finally in numbers

Process finished with exit code 0
           

6、終結操作符(Terminal flow operators)

在flow上的終結符操作符都是suspend函數,它會開始在flow上執行collection;操作符 collect是最基本的一個終結操作符,但是還有其他一些更友善的終結操作符,比如:

  1. 轉化為其它集合的方法比如:toList、toSet。
  2. 擷取第一個元素的first 操作符以及確定隻發射一個元素的single操作符。
  3. reduce一個flow的操作符比如 reduce 和 fold。

舉個例子,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val sum = (1..5)
        .asFlow()
        .map { it * it }
        .reduce { a, b -> a + b }
    println(sum)
}
           

隻列印出一個結果,如下:

55

Process finished with exit code 0
           

7.flow流是順序的(Flows are sequential)

除非使用了操作多個flow的操作符否則每一次的collection動作都是順序的(處理了flow中的第一個元素然後第二個元素,依次下去),預設情況下collection操作運作在調用終結操作符的協程,而不會啟動新的協程。每一個從flow中發射出來的資料都會從上遊到下遊經過中間操作符的處理最後傳送給終結操作符。

請看下面的例子,首頁過濾掉奇數,然後再映射到字元串:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }.map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }
}
           

輸出如下:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Process finished with exit code 0
           

8、flow上下文(Flow context)

collection的執行一般都是在調用collect動作的協程中,比如:假如有一個simple的flow,下面的代碼将會執行在withContext指定的上下文中而不管simple是如何實作的。

withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context 
    }
}
           

這個屬性叫做上下文保持。

是以,預設情況下代碼塊flow { ... }中的代碼運作于flow的收集者指定的上下文中。舉個例子,下面的simple()函數會列印出運作的線程并且發射3個資料:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple().collect {value -> log("Collected $value")}
}
           

輸出如下:

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

Process finished with exit code 0
           

因為simple().collect是從主線程中調用的,是以simple的flow也是運作于主線程中,這對于那些不關心運作上下文且不阻塞調用者的快速運作代碼和異步代碼來說,這種預設設定非常不錯。

8.1、錯誤的使用withContext(Wrong emission withContext)

耗時的CPU密集型任務可以需要在協程上下文Dispatchers.Default 中運作而UI更新相關的需要在 Dispatchers.Main中執行,在kotlin協程中withContext是用來切換上下文的,但是建構器flow { ... }又得遵循“上下文保持”這個屬性是以不允許從不同的上下文發射資料。

你可以運作下面的代碼試試:

package com.cool.cleaner.test

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    withContext(Dispatchers.Default) {
        log("Started simple flow")
        for (i in 1..3) {
            emit(i)
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect {value -> log("Collected $value")}
}
           

你會發現産生如下異常:

[DefaultDispatcher-worker-1 @coroutine#1] Started simple flow
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@31265a9d, [email protected]],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@68d8d324, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
	at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
	at com.cool.cleaner.test.KotTestKt$simple$1$1.invokeSuspend(KotTest.kt:16)
	(Coroutine boundary)
	at com.cool.cleaner.test.KotTestKt$simple$1.invokeSuspend(KotTest.kt:13)
	at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)
	at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:24)
Caused by: java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@31265a9d, [email protected]],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@68d8d324, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
	at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
	at com.cool.cleaner.test.KotTestKt$simple$1$1.invokeSuspend(KotTest.kt:16)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
           

8.2、flowOn操作符

異常指出了應該使用flowOn函數來改變發射flow的協程上下文,正确改變協程上下文的例子如下:

package com.cool.cleaner.test

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        delay(1000)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit> {
    simple().collect {value -> log("Collected $value")}
}
           

此時collection發生在主線程而flow { ... }代碼則運作在背景線程。

另外一個需要主意的問題是 flowOn 操作符改變了flow的預設有序性,現在collection運作在 ("coroutine#1") 而發射而運作在另外一個協程 ("coroutine#2") ,而這兩個協程則同時運作在不同的線程。當一個上遊的流需要改變CoroutineDispatcher 的時候flowOn操作符會為它建立一個協程。

9、緩沖(Buffering)

從整體運作時間來看,把一個flow的的不同部分運作在不同的協程是非常有用的,特别是當運作長時間的異步任務時。舉個例子,當simple發射flow很慢,比如需要100ms,而collector也很慢,需要花300ms處理一個元素,現在看看收集一個有3個元素的flow需要花多長時間,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().collect { value ->
            delay(3000)
            println(value)
        }
    }
    println("Collected in $time ms")
}
           

輸出如下,所有的處理時間大約12000ms(3個數字,每個花4000ms):

1
2
3
Collected in 12082 ms

Process finished with exit code 0
           

我們可以在flow上使用 buffer 操作符使得simple流的發射代碼與收集的代碼并發的運作,而不是順序執行:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .buffer()
            .collect { value ->
            delay(3000)
            println(value)
        }
    }
    println("Collected in $time ms")
}
           

輸出和上面一緻,不過由于我們建立了一個高效的處理管道,是以整體上運作得更快;花1000ms等待第一個數字的到來,然後花3000處理每個資料,這樣運作時間總共是10000ms左右:

1
2
3
Collected in 10346 ms

Process finished with exit code 0
           
請注意,flowOn操作符會在需要切換CoroutineDispatcher的時候使用同樣的緩沖機制,但是這裡我們顯示的請求緩沖而沒有改變協程運作上下文。

9.1、合并(Conflation)

當flow代表部分的操作結果或者狀态更新時,此時可能不需要處理所有的資料,而隻需要處理最新的資料即可。在這種情況下,如果collector處理過慢那就就可以使用conflate操作符來跳過中間的值,示例如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .conflate()
            .collect { value ->
            delay(3000)
            println(value)
        }
    }
    println("Collected in $time ms")
}
           

我們可以看到當第一個資料還在被處理的時候,第2個、第3個資料就已經産生了,是以第2個資料就被合并了且隻有最新(第3個)的被傳遞給了collector:

1
3
Collected in 7347 ms

Process finished with exit code 0
           

9.2、處理最新的值(Processing the latest value)

當發送者和處理者都比較慢的時候conflation是一種加快處理的方式,它是通過丢棄發射的資料來達到目的的;另外一種方法是當一個新資料到達的時候重新開機一個慢的資料處理者。有一類xxxLatest操作符,它們執行與xxx操作符相同的基本邏輯,但在新值上取消其塊中的代碼執行,例子如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .collectLatest { value ->
                println("Collecting $value")
                delay(3000)
                println("Done $value")
            }
    }
    println("Collected in $time ms")
}
           

 collectLatest 中的代碼需要3000ms來處理資料,但是新資料每1000ms就會到來,你可以看到 collectLatest 塊中的代碼在新值到來時都會執行但是隻有最後一個是完整執行的。

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 6451 ms

Process finished with exit code 0
           

10、組合fow(Composing multiple flows)

有很多方法可以組合多個flow。

10.1、Zip

就像kotlin标準庫中的Sequence.zip 擴充函數一樣,flow也有一個zip操作符可以用來合并兩個flow中的相應資料,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow()
    val strs = flowOf("one", "two", "three")
    nums.zip(strs) {a, b -> "$a -> $b"}
        .collect { println(it) }
}
           

下面是輸出:

1 -> one
2 -> two
3 -> three

Process finished with exit code 0
           

10.2、Combine

當flow代表最新的值或者操作的的時候,如果上遊發射了一個值那就需要根據最新的值重新計算,處理這種類型的相應操作符叫做combine。

比如上面的例子,每300ms産生一個數字,但是每400ms才産生一個字元,那使用zip操作符将會始終産生相同的結果,每400ms輸出一個兩兩結合的值。

在這個例子中我們使用操作符 onEach 延時每一個元素,這樣使得發射一個flow更清晰和簡潔。
package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow().onEach { delay(300) }
    val strs = flowOf("one", "two", "three").onEach { delay(400) }
    val startTime = System.currentTimeMillis();
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
           

然而如果使用 combine操作符,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow().onEach { delay(300) }
    val strs = flowOf("one", "two", "three").onEach { delay(400) }
    val startTime = System.currentTimeMillis();
    nums.combine(strs) { a, b -> "$a -> $b" }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
           

此時将會得到完全不同的輸出,當nums或者strs發射一個元素的時候都會得到一行新的輸出,結果如下:

1 -> one at 727 ms from start
2 -> one at 943 ms from start
2 -> two at 1129 ms from start
3 -> two at 1244 ms from start
3 -> three at 1532 ms from start

Process finished with exit code 0
           

11、Flattening flows

Flow代表一系列異步收到的值,是以你很可能遇到一個值又觸發産生另一個Flow的情況,比如我們可以使用下面的函數每隔500ms産生一個字元串:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}
           

假如現在我們有一個會發射三個值的Flow,并且像下面這樣調用requestFlow:

(1..3).asFlow().map { requestFlow(it) }
           

此時我們就會得到一個嵌套的Flow (

Flow<Flow<String>>

),而此時為了友善進一步的處理就需要把它擴充(flatten)成一個單一的Flow。 Collections 和sequences 中的操作符flatten及flatMap就是應對這種情況的;然而由于flow的異步性,将會存在不同形式的flatten,同樣對于flow也有一系列的flatten操作符。

11.1、flatMapConcat

連接配接模式可以使用操作符flatMapConcat 及flattenConcat ,他們會在開始下一個flow之前等待内部的flow結束,如下例子所示:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow()
        .onEach { delay(100) }
        .flatMapConcat { requestFlow(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
           

從輸出很容易看出 flatMapConcat操作符的有序性:

1: First at 201 ms from start
1: Second at 703 ms from start
2: First at 804 ms from start
2: Second at 1305 ms from start
3: First at 1406 ms from start
3: Second at 1907 ms from start

Process finished with exit code 0
           

11.2、flatMapMerge

另一種flatten模式是并發的收集flow,然後把他們的值合并到一個flow中,使得flow中的資料可以盡快的發射出去。這種情況可以使用操作符flatMapMerge 和flattenMerge;他們都接收一個可選的

concurrency

 (預設值為DEFAULT_CONCURRENCY )參數,該參數用來限制同時可以收集的flow的最大數量:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow()
        .onEach { delay(100) }
        .flatMapMerge { requestFlow(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
           

操作符flatMapMerge 的并發特性可以很明顯的從輸出中看出來:

1: First at 315 ms from start
2: First at 388 ms from start
3: First at 491 ms from start
1: Second at 816 ms from start
2: Second at 890 ms from start
3: Second at 996 ms from start

Process finished with exit code 0
           
請注意:本例中flatMapMerge 調用的代碼塊(

{ requestFlow(it) }

 in this example)是順序的,但是結果的收集是并發的;等價于先調用 

map{requestFlow(it)}然後再調用

flattenMerge

11.3、flatMapLatest

與在處理最新的值(9.2)使用的collectLatest 操作類似,當新的flow發射值的時候取消前面的flow這種情況也有相應的操作符flatMapLatest,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow()
        .onEach { delay(100) }
        .flatMapLatest { requestFlow(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
           

從結果輸出中很容易看出 flatMapLatest 是如何工作的。

1: First at 530 ms from start
2: First at 911 ms from start
3: First at 1015 ms from start
3: Second at 1516 ms from start

Process finished with exit code 0
           
請注意: flatMapLatest 在新值到來的時候會取消所有的代碼塊的執行(本例中的

{ requestFlow(it) }

 )

12、Flow的異常處理

當操作符中的發射器或者代碼發生異常的時候,flow的收集就出現異常,不過你有多種方法可以處理這些異常。

12.1、由Collector端處理異常

在collector端可以使用

try/catch

 處理異常,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = flow {
    for (index in 1..3) {
        println("Emitting $index")
        emit(index)//emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->
            println(value)
            check(value <= 1) {
                "Collected $value"
            }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}
           

上面的代碼成功的捕獲了異常,正如你所見捕獲之後就沒值再輸出了:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

Process finished with exit code 0
           

12.2、Everyingthing is caught

上面的例子的确捕獲了發射器、中間操作符或者終結操作符中抛出的異常;比如下面下面的例子,把int值map到string上,但是會産生相應的異常:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking

fun simple(): Flow<String> =
    flow {
        for (index in 1..3) {
            println("Emitting $index")
            emit(index)//emit next value
        }
    }.map { value ->
        check(value <= 1) {
            "Crashed on $value"
        }
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value)}
    } catch (e: Throwable) {
        println("Caught $e")
    }
}
           

這個異常将會被捕獲并且停止收集,下面是相應輸出:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

Process finished with exit code 0
           

13、Exception transparency

但是現在發射器如何封裝異常捕獲的代碼呢?

流必須對異常透明,而在flow { ... }代碼塊中使用try/catch并發射資料是違背異常透明規則的;就像前面的例子一樣,這樣可以保證在collector端使用try/catch能夠正常的捕獲到異常。

發射器可以使用 catch操作符處理異常,該操作遵循異常透明規則并且允許封裝異常處理;在

catch

 代碼塊中可以根據不異常類型作不同的處理:

  • 可以使用throw重新抛出異常。
  • 可以在catch塊中使用emit把異常轉變為發射資料。
  • 可以忽略異常、列印日志或者作其它處理。

比如,我們可以在捕獲異常的時候把異常資訊當成資料發射出去,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun simple(): Flow<String> =
    flow {
        for (index in 1..3) {
            println("Emitting $index")
            emit(index)//emit next value
        }
    }.map { value ->
        check(value <= 1) {
            "Crashed on $value"
        }
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple()
            .catch { e -> emit("Caught $e") }//emit on exception
            .collect { value -> println(value)}
    } catch (e: Throwable) {
        println("Caught $e")
    }
}
           

以下是輸出結果,即使不使用try/catch得到的輸出結果也和上面的一樣:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

Process finished with exit code 0
           

13.1、Transparent catch

中間操作符catch遵循透明異正常則,是以它隻捕獲上遊的異常(在catch之上的操作符抛出的異常),假如collect { ... }(在catch的下面)中抛出了異常,此異常将不會被捕獲,如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = flow {
    for (index in 1..3) {
        println("Emitting $index")
        emit(index)//emit next value
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") }//emit on exception
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}
           

上面的代碼并不能捕獲到下遊抛出的異常,輸出如下:

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)
	at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$$inlined$collect$1.emit(Collect.kt:134)
	at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
	at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:77)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
	at com.cool.cleaner.test.KotTestKt$simple$1.invokeSuspend(KotTest.kt:12)
	at com.cool.cleaner.test.KotTestKt$simple$1.invoke(KotTest.kt)
	at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)
	at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)
	at kotlinx.coroutines.flow.FlowKt__ErrorsKt.catchImpl(Errors.kt:230)
	at kotlinx.coroutines.flow.FlowKt.catchImpl(Unknown Source)
	at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catch$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:113)
	at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:80)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:16)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)
           

13.2、Catching declaratively

我們可以在想要處理異常的時候結合使用catch操作符,如何做呢?你隻需要把collect操作符中的代碼移到onEach中并放在catch操作符之前即可;當然收集這樣的flow必須調用無參數版本的collect函數,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = flow {
    for (index in 1..3) {
        println("Emitting $index")
        emit(index)//emit next value
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
        .catch { e -> println("Caught $e") }
        .collect()
}
           

通過下面的輸出結果我們知道異常已經得到處理,而你并不需要顯示的使用try/catch代碼塊:

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

Process finished with exit code 0
           

14、Flow completion

當Fow結束(正常結束或者異常結束)的時候我們可能需要執行一個動作,正如你所見,你可以用聲明式或者指令式的方式來執行你想要執行的動作。

14.1、指令式的try/finally

除了

try

/

catch以外,collector還可以使用finally來執行你想要執行的動作,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect{ value -> println(value)}
    } finally {
        println("Done")
    }
}
           

輸出如下:

2
3
Done

Process finished with exit code 0
           

14.2、聲明式的執行

對于聲明式的執行方式,flow 有一個操作符onCompletion ,該操作符會在flow完成的時候調用,前面的例子可以使用onCompletion 重寫,輸出結果也是一樣的,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { println("Done") }
        .collect{ value -> println(value)}
}
           

使用onCompletion 操作符的優勢是它包含一個可為null的

Throwable

 參數,可以根據這個參數判斷是否發生了異常,下面的例子中,simple flow在發射一個資料1之後就會抛出一個異常:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause ->
            if (null != cause) {
                println("Flow completed exceptionally")
            }
        }
        .catch { cause -> println("Caught exception") }
        .collect{ value -> println(value)}
}
           

正如你所期望的,下面是輸出:

1
Flow completed exceptionally
Caught exception

Process finished with exit code 0
           

onCompletion 操作符并不像 catch操作符,它不會處理異常,正如上面的代碼所看到的,異常還是繼續向下遊傳播,當傳播到到操作符

onCompletion

 之後還會繼續向後傳播最後被catch操作符處理。

14.3、正常結束(Successful completion)

catch操作符與onCompletion操作符另一個不同是onCompletion會接收到所有的異常并且隻有在flow正常結束(沒有取消或者失敗)的情況下才會接收到null的異常,如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlocking

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause ->
            println("Flow completed with $cause")
        }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println("$value")
        }
}
           

我們可以看到完成原因并不會空,是以flow的下遊産生了一個異常,下面是輸出:

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)
	at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)
	at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:114)
	at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:77)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:11)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)

Process finished with exit code 1
           

15、Imperative versus declarative

現在我們知道如何collect flow,接收資料發射完成通知,以聲明式或者指令式的方式處理異常;是以現在問題是哪一種處理異常的方式更好呢?作為一個庫,我們不推薦哪一種方式更好,并且相信兩種方式都是正确的,而你應該根據你的喜歡和代碼風格來選擇他們。

16、Launching flow

我們可以使用flow來代表一些從某個源發射的一些異步事件,在這 種情況下我們需要使用類似于

addEventListener

 的函數注冊一個監聽,當事件到來的時候執行我們的監聽器并繼續執行其它工作。onEach操作符正是為此而生;然而onEach是一個中間操作符,我們需要一個終結操作符來收集flow,否則僅僅調用onEach是沒有效果的。

如果在onEach之後使用終結操作符collect,那麼它後面的代碼隻有在flow收集完成之後才會開始執行,如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun events(): Flow<Int> = (1..3).asFlow().onEach {number ->
    delay(100)
}

fun main() = runBlocking<Unit> {
    events().onEach { event -> println("Event: $event") }
        .collect()//<----- Collecting the flow waits
    println("Done")
}
           

下面是輸出:

Event: 1
Event: 2
Event: 3
Done
           

launch操作符在這種情況下就可以使用了,使用launchIn替換collect使得我們可以在一個新的協程中收集flow,是以後面的代碼就可以馬上執行而不用像collect一樣需要等待collect執行完成才會執行後面的代碼,如下所示:

package com.cool.cleaner.test

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun events(): Flow<Int> = (1..3).asFlow().onEach {number ->
    delay(100)
}

fun main() = runBlocking<Unit> {
    events().onEach { event -> println("Event: $event") }
        .launchIn(this)//<----- Launching the flow in a separate coroutine
    println("Done")
}
           

下面是輸出:

Done
Event: 1
Event: 2
Event: 3
           

launchIn的參數指定需要在哪個CoroutineScope 中啟動協程來執行收集動作,在上面的例子中這來自協程建構器runBlocking中,是以當flow還沒收集完成的時候, runBlocking作用域将會等待它所有的子協程完成并阻止主函數傳回然後退出。

在一個實際應用中作用域将會來自一個有生命周期的實體。隻要實體的生命周期結束那麼相應的協程作用域也會被取消,相應flow的收集也會 被取消;從這種使用方式上來說

onEach { ... }.launchIn(scope)

 就類似于

addEventListener,隻是在結構化并發中并不需要調用相應的removeEventListener

 。

請注意,launchIn還傳回一個Job,I不過它隻能用于取消相應的流集合協程,而不會取消整個範圍或聯接它。

17、檢查flow的取消

為了操作上的友善,flow建構器會在發射 每一個值的時候調用ensureActive以檢查flow是否已經被取消,這意味着在代碼塊flow { ... }中的循環是可取消的,如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun foo(): Flow<Int> = flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        if (3 == value) {
            cancel()
        }
        println(value)
    }
}
           

輸出如下,我們隻看到輸出了前三個數字,當嘗試發射4的時候就會抛出CancellationException的異常:

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@123a439b
	at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1579)
	at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:217)
	at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:215)
	at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:134)
	at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
	at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:77)
	at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
	at com.cool.cleaner.test.KotTestKt$foo$1.invokeSuspend(KotTest.kt:12)
	at com.cool.cleaner.test.KotTestKt$foo$1.invoke(KotTest.kt)
	at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)
	at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)
	at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:80)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:16)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)

Process finished with exit code 1
           

然而大部分flow操作符由于性能的原因是沒有檢查有沒有取消協程的,如果你用 IntRange.asFlow去實作一個循環且沒有調用suspend函數,那麼是不會執行是否已經取消這個檢查動作的,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value ->
        if (3 == value) cancel()
        println(value)
    }
}
           

從1到5的所有數字都會輸出,而隻有在從runBlocking傳回之前才檢查到取消操作,輸出如下:

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@aec6354
	at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1579)
	at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:217)
	at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:215)
	at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)
	at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)
	at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:71)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:9)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)

Process finished with exit code 1
           

17.1、Making busy flow cancellable

在協程中當你有一個循環的時候,你必須顯示的執行檢查是否取消了協程這個動作,你可以調用.onEach { currentCoroutineContext().ensureActive() },但是還有一個可取消的操作符,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    (1..5).asFlow()
        .cancellable()
        .collect { value ->
        if (3 == value) cancel()
        println(value)
    }
}
           

從輸出可以看到使用cancellable操作符後隻有1到3的數字輸出了,如下:

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@7fbe847c
	at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1579)
	at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:217)
	at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:215)
	at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)
	at kotlinx.coroutines.flow.CancellableFlowImpl$collect$$inlined$collect$1.emit(Collect.kt:135)
	at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)
	at kotlinx.coroutines.flow.CancellableFlowImpl.collect(Context.kt:342)
	at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:74)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:10)
	at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)

Process finished with exit code 1
           

18、Flow及響應式流

對于熟悉響應式流的讀者來說會覺得Flow非常熟悉。

實際上Flow的設計就是受到響應式流的啟發而設計的,但是Flow的設計是盡可能的簡單,而要達到這個目标是離不開響應式程式設計開發者們的支援的,在這裡你可能了解到相應的故事。