Select 多路选择
基本使用语法如下:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
t.Logf("result %s \n",ret)
case ret := <-retCh2:
t.Logf("result %s \n", rest)
default :
t.Error("return empty")
关于channel部分其实是阻塞的,也就是select实际执行的过程中会阻塞在对应的channel部分,直到某一个case对应的channel有有效的数据才会执行该case下的逻辑。
实际程序执行的过程中 如果出现两个channel同时有有效数据,那两个case内部的执行顺序是无法严格保证的,只能由程序员自己来控制。
Select 超时控制
同样如上代码,我们要控制channel获取数据的时间,防止channel中等待有效数据时间过长,所以可以增加一些超时控制:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
t.Logf("result %s \n",ret)
// 等待1s 返回一个channel的有效数据,且第一个case还未得到有效数据,则输出超时
case <- time.After(time.Second * 1):
t.Error("time out")
所以 select 可以用于保证多个协程之间的高可用,防止slow response的出现。
以上两种select多路选择用法 的 测试代码如下:
package select_test
import (
"fmt"
"testing"
"time"
)
func service() string {
time.Sleep(time.Millisecond * 400)
return "Service1 is Done"
}
func AsyncService(i int) chan string {
rech := make(chan string,1) // 声明一个channel
var ret string
go func() {
if i == 1 {
ret = service()
} else {
ret = service1()
}
fmt.Println("resturned result")
rech <- ret // 向 channel 中放数据
fmt.Println("service exited")
}()
return rech // 返回channel
}
// 测试超时机制来避免等待channel时间过长
// ret 这个channel需要等待AsyncService 函数中的routine中的
// service函数返回结果,才会将数据填充到ret 中
// service需要执行400ms,所以这里会出现超时的情况
func TestSelectTimeout(t *testing.T) {
select {
case ret := <-AsyncService(1):
t.Logf("result is %s", ret)
case <- time.After(time.Millisecond * 100):
t.Error("time out")
}
}
func service1() string {
time.Sleep(time.Millisecond * 500)
return "Service2 is Done"
}
// 测试多个channel返回数据,挑选其中一个先准备好的channel来执行
func TestSelect(t *testing.T) {
select {
case ret1 := <- AsyncService(1):
t.Logf("result is %s",ret1)
case ret2 := <- AsyncService(2):
t.Logf("result is %s", ret2)
case <- time.After(time.Millisecond * 600):
t.Log("Time out")
}
}
channel 的关闭和广播
channel 可以说是Go语言中 协程之间通信的一种机制,支持带buffer和不带buffer两种模式,非常方便得实现不同协程之间的通信过程,但是在具体的通信过程中也会暴露一些问题,如下生产者,消费者代码:
package close_channel
import (
"fmt"
"sync"
"testing"
)
// 数据生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i ++ {
ch <- i
}
wg.Done()
}()
}
// 数据消费者
func dataComsumer(ch chan int, wg *sync.WaitGroup) {
go func() {
// 没有办法准确知道channel中什么时候没有数据,这里保持和生产者相同的填充数据的次数
for i := 0;i < 10; i++ {
data := <-ch
fmt.Printf("consumer data %d\n",data)
}
wg.Done()
}()
}
func TestProducer(t *testing.T) {
var wg sync.WaitGroup // wait group
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg)
wg.Wait() // 阻塞,直到waitgroup执行完毕,wg的值变为0
输出如下:
=== RUN TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
--- PASS: TestProducer (0.00s)
上面代码中消费者协程在channel buffer 内部没有数据的时候只能够被动阻塞等待,直到channel中数据有效。这个实现导致生产者消费者之间的代码耦合度比较高,且当程序中存在多个producer和多个receiver的时候,receivers并不一定能够确切得知道什么时候producer才不会生产数据。
还是如上代码,我们如果启动多个消费者就能够很明显得看到问题,如下测试代码:
func TestProducer(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg)
wg.Wait()
}
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
consumer data 0
consumer data 0
consumer data 4
consumer data 0
consumer data 0
consumer data 0
consumer data 0
consumer data 0
执行的过程中可以发现消费者消费了0,因为这个时候生产者已经不再生产数据了,再去消费的话会取到channel默认的值即0,且channel没有关闭,消费者还在等待有效的数据,还会一直阻塞程序运行。
所以channel 也提供了主动关闭的机制,即当生产者不再发送数据的时候可以主动关闭channel,而消费者再次使用channel的时候只需要确认一下channel的状态即可。如果channel为不可用,即可返回。
关于 关闭的channel 需要注意如下几点:
- 向关闭的channel 发送数据会导致panic异常
- v,ok <- ch; 接受channel的值和状态,如果ok为true,则表示channel可以接受数据;如果ok 为false,表示channel已经关闭,无法接受数据
- 所有channel的接受者在channel关闭的时候都会从阻塞等待中返回上述OK值为false。这个广播机制可以被用作向多个订阅者发送信号。
修改测试代码如下:
package close_channel
import (
"fmt"
"sync"
"testing"
)
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i ++ {
ch <- i
}
close(ch)
//ch <- 11 // 向关闭后的channel发送数据会报panic错误
wg.Done()
}()
}
func dataComsumer(ch chan int, wg *sync.WaitGroup) {
go func() {
for {
if data,ok := <-ch; ok {// 接受关闭后channel的广播,保证channel的输出结果是有效的
fmt.Printf("consumer data %d\n",data)
}else {
break
}
}
wg.Done()
}()
}
func TestProducer(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg) // 启动多个消费者
wg.Add(1)
dataComsumer(ch, &wg)
wg.Wait()
}
=== RUN TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 9
consumer data 8
--- PASS: TestProducer (0.00s)