天天看點

Channel

  • 并發模型
    • 并發與并行
    • 什麼是CSP
  • 什麼是channel
    • channel實作CSP
  • channel原理
//無緩沖channel
func ch() {
	var ch = make(chan int)
	//無緩沖區,會阻塞等待消費
	go func(ch chan int) {
		ch <- 1
		ch <- 2
		ch <- 3
		fmt.Println("send finished")
	}(ch)

	for {
		select {
		case i := <-ch:
			fmt.Println("receive", i)
		case <-time.After(time.Second):
			fmt.Println("Time out")
			os.Exit(1) //程式退出
			// return
		}
	}
}

//單向隻寫channel
func chLimit() {
	var ch = make(chan int)

	go func(ch chan<- int) {
		ch <- 1
		ch <- 2
		ch <- 3
		fmt.Println("send finished")
		//invalid operation: cannot receive from send-only channel ch (variable of type chan<- int)
		//r := <-ch
	}(ch)

	for {
		select {
		case i := <-ch:
			fmt.Println("receive", i)
		case <-time.After(time.Second):
			fmt.Println("Time out")
			os.Exit(1) //程式退出
			// return
		}
	}
}

//close channel
func chClose() {
	var ch = make(chan int)

	go func(ch chan<- int) {
		ch <- 1
		ch <- 2
		ch <- 3
		fmt.Println("send finished")
		close(ch)
	}(ch)

	for {
		select {
		case i, ok := <-ch:
			if ok {
				fmt.Println("receive", i)
			} else {
				fmt.Println("channel close")
				os.Exit(0)
			}

		case <-time.After(time.Second):
			fmt.Println("Time out")
			os.Exit(1) //程式退出
		}
	}
}

//關閉ch出現的問題
func chCloseErr() {
	var ch = make(chan int)

	go func(ch chan<- int) {
		ch <- 1
		ch <- 2
		ch <- 3
		fmt.Println("send finished")
		close(ch) //i,ok:=<-ch; 關閉ch,ok傳回值為false,i為ch的預設值
	}(ch)

	for {
		select {
		//如果不判斷,那麼i就會一直得到chan類型的預設值,如int為0,則永遠不會停止
		case i := <-ch:
			fmt.Println("receive", i)
		case <-time.After(time.Millisecond * 100): //ch永遠會傳回預設值,是以定時不會被執行
			fmt.Println("Time out")
			os.Exit(1) //程式退出
		}
	}
}

//異步任務排程
func chTask() {
	var doneCh = make(chan struct{})
	var errCh = make(chan error)

	go func(doneCh chan<- struct{}, errCh chan<- error) {
		if time.Now().Unix()%2 == 0 {
			doneCh <- struct{}{}
		} else {
			errCh <- errors.New("unix time is an odd")
		}
	}(doneCh, errCh)

	select {
	case <-doneCh:
		fmt.Println("done")
	case err := <-errCh:
		fmt.Println("get an error:", err)
	case <-time.After(time.Second):
		fmt.Println("time out")
	}
}

//有緩沖區channel,buf不滿
func chBuf() {
	var ch = make(chan int, 3)
	//有緩沖區,相當于一個消息隊列,這裡不會阻塞,隊列滿會阻塞
	go func(ch chan int) {
		ch <- 1
		ch <- 2
		ch <- 3
		fmt.Println("send finished") //ch不阻塞,會先列印這句話
	}(ch)

	for {
		select {
		case i := <-ch:
			fmt.Println("receive", i)
		case <-time.After(time.Second):
			fmt.Println("Time out")
			os.Exit(1) //程式退出
			// return
		}
	}
}

//有緩沖區channel,buf滿
func chBufs() {
	var ch = make(chan int, 3)
	//有緩沖區,相當于一個消息隊列,這裡不會阻塞,隊列滿會阻塞
	go func(ch chan int) {
		ch <- 1
		ch <- 2
		ch <- 3
		ch <- 4
		ch <- 5
		ch <- 6
		ch <- 7
		ch <- 8
		ch <- 9
		ch <- 10
		fmt.Println("send finished") //ch阻塞時,不會執行到這
	}(ch)

	for {
		select {
		case i := <-ch:
			fmt.Println("receive", i)
			time.Sleep(time.Second)
		case <-time.After(time.Second * 10):
			fmt.Println("Time out")
			os.Exit(1) //程式退出
			// return
		}
	}
}

//for range讀取ch
func chBufRange() {
	var ch = make(chan int, 3)
	//有緩沖區,相當于一個消息隊列,這裡不會阻塞,隊列滿會阻塞
	go func(ch chan int) {
		ch <- 1
		ch <- 2
		ch <- 3
		fmt.Println("send finished") //ch不阻塞,會先列印這句話
		close(ch)
	}(ch)

	//如果不關閉ch或無資料,則會阻塞等待資料
	for i := range ch {
		fmt.Println("receive:", i)
	}
}

//channel  是環型隊列,好處是記憶體重複使用

//ex1
type Ball struct {
	hits int
}

func passBall() {
	table := make(chan *Ball)
	go player("ping", table)
	go player("pong", table)
	//Tip:核心邏輯:往channel裡放入資料,作為啟動信号;從channel讀出資料,作為關閉信号
	table <- new(Ball)
	time.Sleep(time.Second)
	<-table
	close(table)
	time.Sleep(time.Second * 5)
	fmt.Println("passBall exit")
}
func player(name string, table chan *Ball) {
	for ball := range table {
		//Tip:剛進goroutine時,先阻塞在這裡
		// ball := <-table
		ball.hits++
		fmt.Println(name, ball.hits)
		time.Sleep(time.Millisecond * 100)
		//Tip:運作到這裡時,另一個goroutine在收資料,是以能準确送達
		table <- ball
	}
	fmt.Println("player exit")
}

//ex2,channel嵌套
type sub struct {
	//Tip 把chan error看作一個整體,作為關閉的通道
	closing chan chan error
	updates chan string
}

func (s *sub) Close() error {
	//Tip:核心邏輯:兩層通知,第一層作為準備關閉的通知,第二層作為關閉結果的傳回
	errc := make(chan error)
	//Tip 第一步:要關閉時,先傳一個chan error過去,通知要關閉了
	s.closing <- errc
	//Tip 第三步:從chan error中讀取錯誤,阻塞等待,等待gorutine退出前回複
	return <-errc
}
func (s *sub) loop() {
	var err error
	for {
		select {
		case errc := <-s.closing:
			//Tip 第二步:收到關閉後,進行處理,處理後把error傳回去,通知發送關閉的gorutine
			errc <- err
			close(s.updates)
			return
		case s, ok := <-s.updates:
			if ok {
				fmt.Println("update:", s)
			} else {
				fmt.Println("update exit")
				return
			}
		}
	}
}
func m() {
	var s sub = sub{
		closing: make(chan chan error),
		updates: make(chan string),
	}
	go s.loop()
	s.updates <- "1"
	s.updates <- "2"
	s.updates <- "3"
	time.Sleep(time.Second * 2)
	err := s.Close()
	fmt.Println("err:", err)
}