Select 多路選擇
基本使用文法如下:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
t.Logf("result %s \n",ret)
case ret := <-retCh2:
t.Logf("result %s \n", rest)
default :
t.Error("return empty")
關于channel部分其實是阻塞的,也就是select實際執行的過程中會阻塞在對應的channel部分,直到某一個case對應的channel有有效的資料才會執行該case下的邏輯。
實際程式執行的過程中 如果出現兩個channel同時有有效資料,那兩個case内部的執行順序是無法嚴格保證的,隻能由程式員自己來控制。
Select 逾時控制
同樣如上代碼,我們要控制channel擷取資料的時間,防止channel中等待有效資料時間過長,是以可以增加一些逾時控制:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息
t.Logf("result %s \n",ret)
// 等待1s 傳回一個channel的有效資料,且第一個case還未得到有效資料,則輸出逾時
case <- time.After(time.Second * 1):
t.Error("time out")
是以 select 可以用于保證多個協程之間的高可用,防止slow response的出現。
以上兩種select多路選擇用法 的 測試代碼如下:
package select_test
import (
"fmt"
"testing"
"time"
)
func service() string {
time.Sleep(time.Millisecond * 400)
return "Service1 is Done"
}
func AsyncService(i int) chan string {
rech := make(chan string,1) // 聲明一個channel
var ret string
go func() {
if i == 1 {
ret = service()
} else {
ret = service1()
}
fmt.Println("resturned result")
rech <- ret // 向 channel 中放資料
fmt.Println("service exited")
}()
return rech // 傳回channel
}
// 測試逾時機制來避免等待channel時間過長
// ret 這個channel需要等待AsyncService 函數中的routine中的
// service函數傳回結果,才會将資料填充到ret 中
// service需要執行400ms,是以這裡會出現逾時的情況
func TestSelectTimeout(t *testing.T) {
select {
case ret := <-AsyncService(1):
t.Logf("result is %s", ret)
case <- time.After(time.Millisecond * 100):
t.Error("time out")
}
}
func service1() string {
time.Sleep(time.Millisecond * 500)
return "Service2 is Done"
}
// 測試多個channel傳回資料,挑選其中一個先準備好的channel來執行
func TestSelect(t *testing.T) {
select {
case ret1 := <- AsyncService(1):
t.Logf("result is %s",ret1)
case ret2 := <- AsyncService(2):
t.Logf("result is %s", ret2)
case <- time.After(time.Millisecond * 600):
t.Log("Time out")
}
}
channel 的關閉和廣播
channel 可以說是Go語言中 協程之間通信的一種機制,支援帶buffer和不帶buffer兩種模式,非常友善得實作不同協程之間的通信過程,但是在具體的通信過程中也會暴露一些問題,如下生産者,消費者代碼:
package close_channel
import (
"fmt"
"sync"
"testing"
)
// 資料生産者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i ++ {
ch <- i
}
wg.Done()
}()
}
// 資料消費者
func dataComsumer(ch chan int, wg *sync.WaitGroup) {
go func() {
// 沒有辦法準确知道channel中什麼時候沒有資料,這裡保持和生産者相同的填充資料的次數
for i := 0;i < 10; i++ {
data := <-ch
fmt.Printf("consumer data %d\n",data)
}
wg.Done()
}()
}
func TestProducer(t *testing.T) {
var wg sync.WaitGroup // wait group
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg)
wg.Wait() // 阻塞,直到waitgroup執行完畢,wg的值變為0
輸出如下:
=== RUN TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
--- PASS: TestProducer (0.00s)
上面代碼中消費者協程在channel buffer 内部沒有資料的時候隻能夠被動阻塞等待,直到channel中資料有效。這個實作導緻生産者消費者之間的代碼耦合度比較高,且當程式中存在多個producer和多個receiver的時候,receivers并不一定能夠确切得知道什麼時候producer才不會生産資料。
還是如上代碼,我們如果啟動多個消費者就能夠很明顯得看到問題,如下測試代碼:
func TestProducer(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg)
wg.Wait()
}
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
consumer data 0
consumer data 0
consumer data 4
consumer data 0
consumer data 0
consumer data 0
consumer data 0
consumer data 0
執行的過程中可以發現消費者消費了0,因為這個時候生産者已經不再生産資料了,再去消費的話會取到channel預設的值即0,且channel沒有關閉,消費者還在等待有效的資料,還會一直阻塞程式運作。
是以channel 也提供了主動關閉的機制,即當生産者不再發送資料的時候可以主動關閉channel,而消費者再次使用channel的時候隻需要确認一下channel的狀态即可。如果channel為不可用,即可傳回。
關于 關閉的channel 需要注意如下幾點:
- 向關閉的channel 發送資料會導緻panic異常
- v,ok <- ch; 接受channel的值和狀态,如果ok為true,則表示channel可以接受資料;如果ok 為false,表示channel已經關閉,無法接受資料
- 所有channel的接受者在channel關閉的時候都會從阻塞等待中傳回上述OK值為false。這個廣播機制可以被用作向多個訂閱者發送信号。
修改測試代碼如下:
package close_channel
import (
"fmt"
"sync"
"testing"
)
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i ++ {
ch <- i
}
close(ch)
//ch <- 11 // 向關閉後的channel發送資料會報panic錯誤
wg.Done()
}()
}
func dataComsumer(ch chan int, wg *sync.WaitGroup) {
go func() {
for {
if data,ok := <-ch; ok {// 接受關閉後channel的廣播,保證channel的輸出結果是有效的
fmt.Printf("consumer data %d\n",data)
}else {
break
}
}
wg.Done()
}()
}
func TestProducer(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataComsumer(ch, &wg) // 啟動多個消費者
wg.Add(1)
dataComsumer(ch, &wg)
wg.Wait()
}
=== RUN TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 9
consumer data 8
--- PASS: TestProducer (0.00s)