天天看點

Go 分布式學習利器(20)-- Go并發程式設計之多路選擇和逾時控制,channel的關閉和廣播

Select 多路選擇

基本使用文法如下:

select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
  t.Logf("result %s \n",ret)
case ret := <-retCh2:
  t.Logf("result %s \n", rest)
default :
  t.Error("return empty")      

關于channel部分其實是阻塞的,也就是select實際執行的過程中會阻塞在對應的channel部分,直到某一個case對應的channel有有效的資料才會執行該case下的邏輯。

實際程式執行的過程中 如果出現兩個channel同時有有效資料,那兩個case内部的執行順序是無法嚴格保證的,隻能由程式員自己來控制。

Select 逾時控制

同樣如上代碼,我們要控制channel擷取資料的時間,防止channel中等待有效資料時間過長,是以可以增加一些逾時控制:

select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
  t.Logf("result %s \n",ret)

// 等待1s 傳回一個channel的有效資料,且第一個case還未得到有效資料,則輸出逾時
case <- time.After(time.Second * 1): 
  t.Error("time out")      

是以 select 可以用于保證多個協程之間的高可用,防止slow response的出現。

以上兩種select多路選擇用法 的 測試代碼如下:

package select_test

import (
  "fmt"
  "testing"
  "time"
)
func service() string {
  time.Sleep(time.Millisecond * 400)
  return "Service1 is Done"
}

func AsyncService(i int) chan string {
  rech := make(chan string,1) // 聲明一個channel
  var ret string

  go func() {
    if i == 1 {
      ret = service()
    } else {
      ret = service1()
    }
    fmt.Println("resturned result")
    rech <- ret // 向 channel 中放資料
    fmt.Println("service exited")
  }()

  return rech // 傳回channel
}

// 測試逾時機制來避免等待channel時間過長
// ret 這個channel需要等待AsyncService 函數中的routine中的
// service函數傳回結果,才會将資料填充到ret 中
// service需要執行400ms,是以這裡會出現逾時的情況
func TestSelectTimeout(t *testing.T) {
  select {
  case ret := <-AsyncService(1):
    t.Logf("result is %s", ret)
  case <- time.After(time.Millisecond * 100):
    t.Error("time out")
  }
}

func service1() string {
  time.Sleep(time.Millisecond * 500)
  return "Service2 is Done"
}

// 測試多個channel傳回資料,挑選其中一個先準備好的channel來執行
func TestSelect(t *testing.T) {
  select {
  case ret1 := <- AsyncService(1):
    t.Logf("result is %s",ret1)
  case ret2 := <- AsyncService(2):
    t.Logf("result is %s", ret2)
  case <- time.After(time.Millisecond * 600):
    t.Log("Time out")
  }
}      

channel 的關閉和廣播

channel 可以說是Go語言中 協程之間通信的一種機制,支援帶buffer和不帶buffer兩種模式,非常友善得實作不同協程之間的通信過程,但是在具體的通信過程中也會暴露一些問題,如下生産者,消費者代碼:

package close_channel

import (
  "fmt"
  "sync"
  "testing"
)

// 資料生産者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    for i := 0; i < 10; i ++ {
      ch <- i
    }
    wg.Done()
  }()
}

// 資料消費者
func dataComsumer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    // 沒有辦法準确知道channel中什麼時候沒有資料,這裡保持和生産者相同的填充資料的次數
    for i := 0;i < 10; i++ { 
      data := <-ch
      fmt.Printf("consumer data %d\n",data)
    }
    wg.Done()
  }()
}

func TestProducer(t *testing.T) {
  var wg sync.WaitGroup // wait group
  ch := make(chan int)

  wg.Add(1)
  dataProducer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg)

  wg.Wait() // 阻塞,直到waitgroup執行完畢,wg的值變為0      

輸出如下:

=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
--- PASS: TestProducer (0.00s)      

上面代碼中消費者協程在channel buffer 内部沒有資料的時候隻能夠被動阻塞等待,直到channel中資料有效。這個實作導緻生産者消費者之間的代碼耦合度比較高,且當程式中存在多個producer和多個receiver的時候,receivers并不一定能夠确切得知道什麼時候producer才不會生産資料。

還是如上代碼,我們如果啟動多個消費者就能夠很明顯得看到問題,如下測試代碼:

func TestProducer(t *testing.T) {
  var wg sync.WaitGroup
  ch := make(chan int)

  wg.Add(1)
  dataProducer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg)
  wg.Wait()
}      
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
consumer data 0
consumer data 0
consumer data 4
consumer data 0
consumer data 0
consumer data 0
consumer data 0
consumer data 0      

執行的過程中可以發現消費者消費了0,因為這個時候生産者已經不再生産資料了,再去消費的話會取到channel預設的值即0,且channel沒有關閉,消費者還在等待有效的資料,還會一直阻塞程式運作。

是以channel 也提供了主動關閉的機制,即當生産者不再發送資料的時候可以主動關閉channel,而消費者再次使用channel的時候隻需要确認一下channel的狀态即可。如果channel為不可用,即可傳回。

關于 關閉的channel 需要注意如下幾點:

  • 向關閉的channel 發送資料會導緻panic異常
  • v,ok <- ch; 接受channel的值和狀态,如果ok為true,則表示channel可以接受資料;如果ok 為false,表示channel已經關閉,無法接受資料
  • 所有channel的接受者在channel關閉的時候都會從阻塞等待中傳回上述OK值為false。這個廣播機制可以被用作向多個訂閱者發送信号。

修改測試代碼如下:

package close_channel

import (
  "fmt"
  "sync"
  "testing"
)

func dataProducer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    for i := 0; i < 10; i ++ {
      ch <- i
    }
    close(ch)
    //ch <- 11 // 向關閉後的channel發送資料會報panic錯誤
    wg.Done()
  }()
}

func dataComsumer(ch chan int, wg *sync.WaitGroup) {
  go func() {
    for {
      if data,ok := <-ch; ok {// 接受關閉後channel的廣播,保證channel的輸出結果是有效的
        fmt.Printf("consumer data %d\n",data)
      }else {
        break
      }
    }
    wg.Done()
  }()
}

func TestProducer(t *testing.T) {
  var wg sync.WaitGroup
  ch := make(chan int)

  wg.Add(1)
  dataProducer(ch, &wg)

  wg.Add(1)
  dataComsumer(ch, &wg) // 啟動多個消費者

  wg.Add(1)
  dataComsumer(ch, &wg)
  wg.Wait()
}      
=== RUN   TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 9
consumer data 8
--- PASS: TestProducer (0.00s)