天天看点

Go 分布式学习利器(20)-- Go并发编程之多路选择和超时控制,channel的关闭和广播

Select 多路选择

基本使用语法如下:

select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
  t.Logf("result %s \n",ret)
case ret := <-retCh2:
  t.Logf("result %s \n", rest)
default :
  t.Error("return empty")      

关于channel部分其实是阻塞的,也就是select实际执行的过程中会阻塞在对应的channel部分,直到某一个case对应的channel有有效的数据才会执行该case下的逻辑。

实际程序执行的过程中 如果出现两个channel同时有有效数据,那两个case内部的执行顺序是无法严格保证的,只能由程序员自己来控制。

Select 超时控制

同样如上代码,我们要控制channel获取数据的时间,防止channel中等待有效数据时间过长,所以可以增加一些超时控制:

select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
  t.Logf("result %s \n",ret)

// 等待1s 返回一个channel的有效数据,且第一个case还未得到有效数据,则输出超时
case <- time.After(time.Second * 1): 
  t.Error("time out")      

所以 select 可以用于保证多个协程之间的高可用,防止slow response的出现。

以上两种select多路选择用法 的 测试代码如下:

package select_test

import (
  "fmt"
  "testing"
  "time"
)
func service() string {
  time.Sleep(time.Millisecond * 400)
  return "Service1 is Done"
}

func AsyncService(i int) chan string {
  rech := make(chan string,1) // 声明一个channel
  var ret string

  go func() {
    if i == 1 {
      ret = service()
    } else {
      ret = service1()
    }
    fmt.Println("resturned result")
    rech <- ret // 向 channel 中放数据
    fmt.Println("service exited")
  }()

  return rech // 返回channel
}

// 测试超时机制来避免等待channel时间过长
// ret 这个channel需要等待AsyncService 函数中的routine中的
// service函数返回结果,才会将数据填充到ret 中
// service需要执行400ms,所以这里会出现超时的情况
func TestSelectTimeout(t *testing.T) {
  select {
  case ret := <-AsyncService(1):
    t.Logf("result is %s", ret)
  case <- time.After(time.Millisecond * 100):
    t.Error("time out")
  }
}

func service1() string {
  time.Sleep(time.Millisecond * 500)
  return "Service2 is Done"
}

// 测试多个channel返回数据,挑选其中一个先准备好的channel来执行
func TestSelect(t *testing.T) {
  select {
  case ret1 := <- AsyncService(1):
    t.Logf("result is %s",ret1)
  case ret2 := <- AsyncService(2):
    t.Logf("result is %s", ret2)
  case <- time.After(time.Millisecond * 600):
    t.Log("Time out")
  }
}      

channel 的关闭和广播

channel 可以说是Go语言中 协程之间通信的一种机制,支持带buffer和不带buffer两种模式,非常方便得实现不同协程之间的通信过程,但是在具体的通信过程中也会暴露一些问题,如下生产者,消费者代码:

package close_channel

import (
  "fmt"
  "sync"
  "testing"
)

// 数据生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    for i := 0; i < 10; i ++ {
      ch <- i
    }
    wg.Done()
  }()
}

// 数据消费者
func dataComsumer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    // 没有办法准确知道channel中什么时候没有数据,这里保持和生产者相同的填充数据的次数
    for i := 0;i < 10; i++ { 
      data := <-ch
      fmt.Printf("consumer data %d\n",data)
    }
    wg.Done()
  }()
}

func TestProducer(t *testing.T) {
  var wg sync.WaitGroup // wait group
  ch := make(chan int)

  wg.Add(1)
  dataProducer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg)

  wg.Wait() // 阻塞,直到waitgroup执行完毕,wg的值变为0      

输出如下:

=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
--- PASS: TestProducer (0.00s)      

上面代码中消费者协程在channel buffer 内部没有数据的时候只能够被动阻塞等待,直到channel中数据有效。这个实现导致生产者消费者之间的代码耦合度比较高,且当程序中存在多个producer和多个receiver的时候,receivers并不一定能够确切得知道什么时候producer才不会生产数据。

还是如上代码,我们如果启动多个消费者就能够很明显得看到问题,如下测试代码:

func TestProducer(t *testing.T) {
  var wg sync.WaitGroup
  ch := make(chan int)

  wg.Add(1)
  dataProducer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg)
  wg.Wait()
}      
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
consumer data 0
consumer data 0
consumer data 4
consumer data 0
consumer data 0
consumer data 0
consumer data 0
consumer data 0      

执行的过程中可以发现消费者消费了0,因为这个时候生产者已经不再生产数据了,再去消费的话会取到channel默认的值即0,且channel没有关闭,消费者还在等待有效的数据,还会一直阻塞程序运行。

所以channel 也提供了主动关闭的机制,即当生产者不再发送数据的时候可以主动关闭channel,而消费者再次使用channel的时候只需要确认一下channel的状态即可。如果channel为不可用,即可返回。

关于 关闭的channel 需要注意如下几点:

  • 向关闭的channel 发送数据会导致panic异常
  • v,ok <- ch; 接受channel的值和状态,如果ok为true,则表示channel可以接受数据;如果ok 为false,表示channel已经关闭,无法接受数据
  • 所有channel的接受者在channel关闭的时候都会从阻塞等待中返回上述OK值为false。这个广播机制可以被用作向多个订阅者发送信号。

修改测试代码如下:

package close_channel

import (
  "fmt"
  "sync"
  "testing"
)

func dataProducer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    for i := 0; i < 10; i ++ {
      ch <- i
    }
    close(ch)
    //ch <- 11 // 向关闭后的channel发送数据会报panic错误
    wg.Done()
  }()
}

func dataComsumer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    for {
      if data,ok := <-ch; ok {// 接受关闭后channel的广播,保证channel的输出结果是有效的
        fmt.Printf("consumer data %d\n",data)
      }else {
        break
      }
    }
    wg.Done()
  }()
}

func TestProducer(t *testing.T) {
  var wg sync.WaitGroup
  ch := make(chan int)

  wg.Add(1)
  dataProducer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg) // 启动多个消费者

  wg.Add(1)
  dataComsumer(ch, &wg)
  wg.Wait()
}      
=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 9
consumer data 8
--- PASS: TestProducer (0.00s)