天天看點

6、通道(Channels)

本節介紹通道----Channels(官方文檔)

Deferred values提供了一種在協程之間傳遞單個值的簡便方式,通道(channels)則提供了在協程間傳遞流的方法。

1、Channel basics

熟悉Java的讀者應該都知道阻塞隊列BlockingQueue,而這裡說的通道在概念上則與BlockingQueue非常相似,一個重要的差別是通道使用的是suspend的send方法存儲資料而BlockingQueue使用的是阻塞的put方法,同時在取資料的時候通道使用的是suspend的receive而BlockingQueue使用是阻塞的take,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        /**
         * this might be heavy CPU-consuming computation or async logic,
         * we'll just send five squares.
         */
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) {
        println(channel.receive())
    }
    println("Done!")
}
           

輸出如下:

1
4
9
16
25
Done!

Process finished with exit code 0
           

2、通道的關閉及疊代

與隊列不一樣的是通道可以被關閉以表示沒有元素會再被發射出去,在接收端可以使用for循環友善地接收通道裡面的資料。

從概念上講close就像是發射了一個特殊的辨別(token)到通道中,在接收端隻要收到相應的token就會馬上結束疊代,是以就可以保證在token發射之前的所有資料都會被接收到,如下代碼所示:

package com.cool.cleaner.test

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..9) {
            if (x >= 7) {
                channel.close()//we're done sending
                break
            } else {
                channel.send(x * x)
            }
        }
    }
    // here we print received values using 'for'
    // loop(until the channel is closed)
    for (value in channel) {
        println(value)
    }
    println("Done!")
}
           

3、通道生産者(Building channel producers)

在協程中生成一系列的元素這種模式是非常普遍的,這是在并發程式設計中經常遇到的生産者----消費者模式的一部分;你可以将通道作為參數傳遞給一個函數并封裝成一個生産者,但是這違背了結果必須從函數中傳回的一種認知常識。

在協程中有一個協程建構器produce可以使得建構生産者更容易,而擴充函數consumeEach則作為消費者端替代了for循環的功能,代碼如下:

package com.cool.cleaner.test

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (value in 1..5) send(value * value)
}
fun main() = runBlocking<Unit> {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}
           

4、Pipelines

一個pipeline就是一個協程産生無限(可能)的值,代碼如下:

繼續閱讀