七. Flow 線程操作
7.1 更為簡化的線程切換
相對于 RxJava 多線程的學習曲線,Flow 對線程的切換友好地多。
在之前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾經介紹過 Flow 的切換線程,以及 flowOn 操作符。
Flow 隻需使用 flowOn 操作符,而不必像 RxJava 需要去深入了解 observeOn、subscribeOn 之間的差別。
7.2 flowOn VS RxJava 的 observeOn
RxJava 的 observeOn 操作符,接收一個 Scheduler 參數,用來指定下遊操作運作在特定的線程排程器 Scheduler 上。
Flow 的 flowOn 操作符,接收一個 CoroutineContext 參數,影響的是上遊的操作。
例如:
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
複制
flow builder 和 map 操作符都會受到
flowOn
的影響,并使用 Dispatchers.io 線程池。
再例如:
val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.map {
it+1
}
.flowOn(customerDispatcher)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
複制
flow builder 和兩個 map 操作符都會受到兩個
flowOn
的影響,其中 flow builder 和第一個 map 操作符跟上面的例子一樣,第二個 map 操作符會切換到指定的 customerDispatcher 線程池。
7.3 buffer 實作并發操作
在 Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中,曾介紹 buffer 操作符對應 RxJava Backpressure 中的 BUFFER 政策。
事實上 buffer 操作符也可以并發地執行任務,它是除了使用 flowOn 操作符之外的另一種方式,隻是不能顯示地指定 Dispatchers。
例如:
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.buffer()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
複制
執行結果:
1
2
3
4
5
Collected in 1676 ms
複制
在上述例子中,所有的 delay 所花費的時間是2000ms。然而通過 buffer 操作符
并發
地執行 emit,再順序地執行 collect 函數後,所花費的時間在 1700ms 左右。
如果去掉 buffer 操作符。
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
複制
執行結果:
1
2
3
4
5
Collected in 2039 ms
複制
所花費的時間比剛才多了300多ms。
7.4 并行操作
在講解并行操作之前,先來了解一下并發和并行的差別。
并發(concurrency):是指一個處理器同時處理多個任務。
并行(parallelism):是多個處理器或者是多核的處理器同時處理多個不同的任務。并行是同時發生的多個并發事件,具有并發的含義,而并發則不一定是并行。
RxJava 可以借助 flatMap 操作符實作并行,亦可以使用 ParallelFlowable 類實作并行操作。
下面,以 flatMap 操作符為例實作 RxJava 的并行:
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
複制
Flow 也有相應的操作符 flatMapMerge 可以實作并行。
fun main() = runBlocking {
val result = arrayListOf<Int>()
for (index in 1..100){
result.add(index)
}
result.asFlow()
.flatMapMerge {
flow {
emit(it)
}
.flowOn(Dispatchers.IO)
}
.collect { println("$it") }
}
複制
總體而言,Flow 相比于 RxJava 更加簡潔一些。