本節介紹通道----Channels(官方文檔)
Deferred values提供了一種在協程之間傳遞單個值的簡便方式,通道(channels)則提供了在協程間傳遞流的方法。
1、Channel basics
熟悉Java的讀者應該都知道阻塞隊列BlockingQueue,而這裡說的通道在概念上則與BlockingQueue非常相似,一個重要的差別是通道使用的是suspend的send方法存儲資料而BlockingQueue使用的是阻塞的put方法,同時在取資料的時候通道使用的是suspend的receive而BlockingQueue使用是阻塞的take,代碼如下:
package com.cool.cleaner.test
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
/**
* this might be heavy CPU-consuming computation or async logic,
* we'll just send five squares.
*/
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) {
println(channel.receive())
}
println("Done!")
}
輸出如下:
1
4
9
16
25
Done!
Process finished with exit code 0
2、通道的關閉及疊代
與隊列不一樣的是通道可以被關閉以表示沒有元素會再被發射出去,在接收端可以使用for循環友善地接收通道裡面的資料。
從概念上講close就像是發射了一個特殊的辨別(token)到通道中,在接收端隻要收到相應的token就會馬上結束疊代,是以就可以保證在token發射之前的所有資料都會被接收到,如下代碼所示:
package com.cool.cleaner.test
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..9) {
if (x >= 7) {
channel.close()//we're done sending
break
} else {
channel.send(x * x)
}
}
}
// here we print received values using 'for'
// loop(until the channel is closed)
for (value in channel) {
println(value)
}
println("Done!")
}
3、通道生産者(Building channel producers)
在協程中生成一系列的元素這種模式是非常普遍的,這是在并發程式設計中經常遇到的生産者----消費者模式的一部分;你可以将通道作為參數傳遞給一個函數并封裝成一個生産者,但是這違背了結果必須從函數中傳回的一種認知常識。
在協程中有一個協程建構器produce可以使得建構生産者更容易,而擴充函數consumeEach則作為消費者端替代了for循環的功能,代碼如下:
package com.cool.cleaner.test
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (value in 1..5) send(value * value)
}
fun main() = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
4、Pipelines
一個pipeline就是一個協程産生無限(可能)的值,代碼如下: