- 并發模型
- 什麼是channel
- channel原理
//無緩沖channel
func ch() {
var ch = make(chan int)
//無緩沖區,會阻塞等待消費
go func(ch chan int) {
ch <- 1
ch <- 2
ch <- 3
fmt.Println("send finished")
}(ch)
for {
select {
case i := <-ch:
fmt.Println("receive", i)
case <-time.After(time.Second):
fmt.Println("Time out")
os.Exit(1) //程式退出
// return
}
}
}
//單向隻寫channel
func chLimit() {
var ch = make(chan int)
go func(ch chan<- int) {
ch <- 1
ch <- 2
ch <- 3
fmt.Println("send finished")
//invalid operation: cannot receive from send-only channel ch (variable of type chan<- int)
//r := <-ch
}(ch)
for {
select {
case i := <-ch:
fmt.Println("receive", i)
case <-time.After(time.Second):
fmt.Println("Time out")
os.Exit(1) //程式退出
// return
}
}
}
//close channel
func chClose() {
var ch = make(chan int)
go func(ch chan<- int) {
ch <- 1
ch <- 2
ch <- 3
fmt.Println("send finished")
close(ch)
}(ch)
for {
select {
case i, ok := <-ch:
if ok {
fmt.Println("receive", i)
} else {
fmt.Println("channel close")
os.Exit(0)
}
case <-time.After(time.Second):
fmt.Println("Time out")
os.Exit(1) //程式退出
}
}
}
//關閉ch出現的問題
func chCloseErr() {
var ch = make(chan int)
go func(ch chan<- int) {
ch <- 1
ch <- 2
ch <- 3
fmt.Println("send finished")
close(ch) //i,ok:=<-ch; 關閉ch,ok傳回值為false,i為ch的預設值
}(ch)
for {
select {
//如果不判斷,那麼i就會一直得到chan類型的預設值,如int為0,則永遠不會停止
case i := <-ch:
fmt.Println("receive", i)
case <-time.After(time.Millisecond * 100): //ch永遠會傳回預設值,是以定時不會被執行
fmt.Println("Time out")
os.Exit(1) //程式退出
}
}
}
//異步任務排程
func chTask() {
var doneCh = make(chan struct{})
var errCh = make(chan error)
go func(doneCh chan<- struct{}, errCh chan<- error) {
if time.Now().Unix()%2 == 0 {
doneCh <- struct{}{}
} else {
errCh <- errors.New("unix time is an odd")
}
}(doneCh, errCh)
select {
case <-doneCh:
fmt.Println("done")
case err := <-errCh:
fmt.Println("get an error:", err)
case <-time.After(time.Second):
fmt.Println("time out")
}
}
//有緩沖區channel,buf不滿
func chBuf() {
var ch = make(chan int, 3)
//有緩沖區,相當于一個消息隊列,這裡不會阻塞,隊列滿會阻塞
go func(ch chan int) {
ch <- 1
ch <- 2
ch <- 3
fmt.Println("send finished") //ch不阻塞,會先列印這句話
}(ch)
for {
select {
case i := <-ch:
fmt.Println("receive", i)
case <-time.After(time.Second):
fmt.Println("Time out")
os.Exit(1) //程式退出
// return
}
}
}
//有緩沖區channel,buf滿
func chBufs() {
var ch = make(chan int, 3)
//有緩沖區,相當于一個消息隊列,這裡不會阻塞,隊列滿會阻塞
go func(ch chan int) {
ch <- 1
ch <- 2
ch <- 3
ch <- 4
ch <- 5
ch <- 6
ch <- 7
ch <- 8
ch <- 9
ch <- 10
fmt.Println("send finished") //ch阻塞時,不會執行到這
}(ch)
for {
select {
case i := <-ch:
fmt.Println("receive", i)
time.Sleep(time.Second)
case <-time.After(time.Second * 10):
fmt.Println("Time out")
os.Exit(1) //程式退出
// return
}
}
}
//for range讀取ch
func chBufRange() {
var ch = make(chan int, 3)
//有緩沖區,相當于一個消息隊列,這裡不會阻塞,隊列滿會阻塞
go func(ch chan int) {
ch <- 1
ch <- 2
ch <- 3
fmt.Println("send finished") //ch不阻塞,會先列印這句話
close(ch)
}(ch)
//如果不關閉ch或無資料,則會阻塞等待資料
for i := range ch {
fmt.Println("receive:", i)
}
}
//channel 是環型隊列,好處是記憶體重複使用
//ex1
type Ball struct {
hits int
}
func passBall() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
//Tip:核心邏輯:往channel裡放入資料,作為啟動信号;從channel讀出資料,作為關閉信号
table <- new(Ball)
time.Sleep(time.Second)
<-table
close(table)
time.Sleep(time.Second * 5)
fmt.Println("passBall exit")
}
func player(name string, table chan *Ball) {
for ball := range table {
//Tip:剛進goroutine時,先阻塞在這裡
// ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(time.Millisecond * 100)
//Tip:運作到這裡時,另一個goroutine在收資料,是以能準确送達
table <- ball
}
fmt.Println("player exit")
}
//ex2,channel嵌套
type sub struct {
//Tip 把chan error看作一個整體,作為關閉的通道
closing chan chan error
updates chan string
}
func (s *sub) Close() error {
//Tip:核心邏輯:兩層通知,第一層作為準備關閉的通知,第二層作為關閉結果的傳回
errc := make(chan error)
//Tip 第一步:要關閉時,先傳一個chan error過去,通知要關閉了
s.closing <- errc
//Tip 第三步:從chan error中讀取錯誤,阻塞等待,等待gorutine退出前回複
return <-errc
}
func (s *sub) loop() {
var err error
for {
select {
case errc := <-s.closing:
//Tip 第二步:收到關閉後,進行處理,處理後把error傳回去,通知發送關閉的gorutine
errc <- err
close(s.updates)
return
case s, ok := <-s.updates:
if ok {
fmt.Println("update:", s)
} else {
fmt.Println("update exit")
return
}
}
}
}
func m() {
var s sub = sub{
closing: make(chan chan error),
updates: make(chan string),
}
go s.loop()
s.updates <- "1"
s.updates <- "2"
s.updates <- "3"
time.Sleep(time.Second * 2)
err := s.Close()
fmt.Println("err:", err)
}