// 1.通道 傳輸值流的方法
fun main_channel() = runBlocking {
val channel = Channel<Int>()//聲明一個channel
launch {
for (i in 1..3) {
channel.send(i + i)
}//發送1+1 2+2 3+3
channel.close()//沒有立即關閉通道
}
repeat(3) { // 這樣寫也是可以for (y in channel) println(y)
println("接收到的是${channel.receive()}")
}
println("end ..")
}
//2.建構通道生産者
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {//建構通道生産者
for (i in 1..3) send(i + i)
}
fun main_produce() = runBlocking {
val produce = producer()
produce.consumeEach { println("$it") }//疊代每一個生産者
println("end ..")
}
//3 管道
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce<Int> {//每次加1 生成無窮個 沒有傳回類型
var x = 1
while (true) send(x++)
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
produce {//先收到的,然後處理後再發出
for (x in numbers) send(x + x)
}
fun main_pip() = runBlocking {
val numbers = produceNumbers() //每次加1 生成無窮個 沒有傳回類型
val squares = square(numbers) //先收到的,然後處理後再發出
repeat(3) {//重複3次
println(squares.receive())
}
println("end ..") //完成
coroutineContext.cancelChildren() // 取消子協程
}
//4.管道的素數
fun main_filters() = runBlocking {
var cur = numberssend(2)//從二開始 2 3 4..
repeat(10) {//取10個數
val number = cur.receive()
println(" $number")
cur = filters(cur, number)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numberssend(start: Int) = produce<Int> {//資料每次加1發送
var x = start
while (true) send(x++)
}
//從2開始一個數字流,從目前通道擷取一個質數,并為找到的每個質數啟動新的管道
fun CoroutineScope.filters(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)//可以被發送的數整除 然後發送
}
//5.扇出
fun mainFanout() = runBlocking<Unit> {
val producer = produceNumber()
repeat(5) {
launchProcessor(it, producer)
}
delay(950)
producer.cancel() //取消
}
fun CoroutineScope.produceNumber() = produce<Int> {
var x = 1
while (true) {
send(x++) //從1開始和每次加1
delay(100) //
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
//6.扇入
fun main_in() = runBlocking {
val channel = Channel<String>()
//兩個協程來發送字元串
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
//7.帶緩沖的通道
fun main_Buffered() = runBlocking<Unit> {
val channel = Channel<Int>(4) //容量為4
val sender = launch { // 啟動協程
repeat(10) {
println("發送的資料 $it")//0-4
channel.send(it)
}
}
delay(100)//延遲1秒
sender.cancel() //取消
}
//8.通道是公平的
data class Ball(var hits: Int)
fun main_fair() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
//9.計時通道
fun main_Ticker() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}