天天看點

Go基礎系列:channel入門

channel基礎

channel用于goroutines之間的通信,讓它們之間可以進行資料交換。像管道一樣,一個goroutine_A向channel_A中放資料,另一個goroutine_B從channel_A取資料。

channel是指針類型的資料類型,通過make來配置設定記憶體。例如:

1ch := make(chan int)
           

這表示建立一個channel,這個channel中隻能儲存int類型的資料。也就是說一端隻能向此channel中放進int類型的值,另一端隻能從此channel中讀出int類型的值。

需要注意,

chan TYPE

才表示channel的類型。是以其作為參數或傳回值時,需指定為

xxx chan int

類似的格式。

向ch這個channel放資料的操作形式為:

1ch <- VALUE
           

從ch這個channel讀資料的操作形式為:

1<-ch // 從ch中讀取一個值val = <-chval := <-ch // 從ch中讀取一個值并儲存到val變量中val,ok = <-ch // 從ch讀取一個值,判斷是否讀取成功,如果成功則儲存到val變量中
           

其實很簡單,當ch出現在

<-

的左邊表示send,當ch出現在

<-

的右邊表示recv。

例如:

1package mainimport ( "fmt"
 2 "time")func main() {
 3 ch := make(chan string) go sender(ch) // sender goroutine
 4 go recver(ch) // recver goroutine
 5 time.Sleep(1e9)
 6}func sender(ch chan string) {
 7 ch <- "malongshuai"
 8 ch <- "gaoxiaofang"
 9 ch <- "wugui"
10 ch <- "tuner"}func recver(ch chan string) { var recv string
11 for {
12 recv = <-ch
13 fmt.Println(recv)
14 }
15}
           

輸出結果:

1malongshuaigaoxiaofang
2wugui
3tuner
           

上面激活了一個goroutine用于執行sender()函數,該函數每次向channel ch中發送一個字元串。同時還激活了另一個goroutine用于執行recver()函數,該函數每次從channel ch中讀取一個字元串。

注意上面的

recv = <-ch

,當channel中沒有資料可讀時,recver goroutine将會阻塞在此行。由于recver中讀取channel的操作放在了無限for循環中,表示recver goroutine将一直阻塞,直到從channel ch中讀取到資料,讀取到資料後進入下一輪循環由被阻塞在

recv = <-ch

上。直到main中的time.Sleep()指定的時間到了,main程式終止,所有的goroutine将全部被強制終止。

因為receiver要不斷從channel中讀取可能存在的資料,是以receiver一般都使用一個無限循環來讀取channel,避免sender發送的資料被丢棄。

channel的屬性和分類

每個channel都有3種操作:send、receive和close

 ●  send:表示sender端的goroutine向channel中投放資料

 ●  receive:表示receiver端的goroutine從channel中讀取資料

 ●  close:表示關閉channel

     ●  關閉channel後,send操作将導緻painc

     ●  關閉channel後,recv操作将傳回對應類型的0值以及一個狀态碼false

     ●  close并非強制需要使用close(ch)來關閉channel,在某些時候可以自動被關閉

     ●  如果使用close(),建議條件允許的情況下加上defer

     ●  隻在sender端上顯式使用close()關閉channel。因為關閉通道意味着沒有資料再需要發送

例如,判斷channel是否被關閉:

1val, ok := <-counterif ok {
2 fmt.Println(val)
3}
           

channel分為兩種:unbuffered channel和buffered channel

 ●  unbuffered channel:阻塞、同步模式

 ●  sender端向channel中send一個資料,然後阻塞,直到receiver端将此資料receive

 ●  receiver端一直阻塞,直到sender端向channel發送了一個資料

 ●  buffered channel:非阻塞、異步模式

 ●  sender端可以向channel中send多個資料(隻要channel容量未滿),容量滿之前不會阻塞

 ●  receiver端按照隊列的方式(FIFO,先進先出)從buffered channel中按序receive其中資料

buffered channel有兩個屬性:容量和長度:和slice的capacity和length的概念是一樣的

 ●  capacity:表示bufffered channel最多可以緩沖多少個資料

 ●  length:表示buffered channel目前已緩沖多少個資料

 ●  建立buffered channel的方式為

make(chan TYPE,CAP)

unbuffered channel可以認為是容量為0的buffered channel,是以每發送一個資料就被阻塞。注意,不是容量為1的buffered channel,因為容量為1的channel,是在channel中已有一個資料,并發送第二個資料的時候才被阻塞。

換句話說,send被阻塞的時候,其實是沒有發送成功的,隻有被另一端讀走一個資料之後才算是send成功。對于unbuffered channel來說,這是send/recv的同步模式。

實際上,當向一個channel進行send的時候,先關閉了channel,再讀取channel時會發現錯誤在send,而不是recv。它會提示向已經關閉了的channel發送資料。

1func main() {
2 counter := make(chan int) go func() {
3 counter <- 32
4 }() close(counter)
5 fmt.Println(<-counter)
6}
           

輸出報錯:

1panic: send on closed channel
           

是以,在Go的内部行為中,send和recv是一個整體行為,資料未讀就表示未send成功。

死鎖(deadlock)

當channel的某一端(sender/receiver)期待另一端的(receiver/sender)操作,另一端正好在期待本端的操作時,也就是說兩端都因為對方而使得自己目前處于阻塞狀态,這時将會出現死鎖問題。

比如,在main函數中,它有一個預設的goroutine,如果在此goroutine中建立一個unbuffered channel,并在main goroutine中向此channel中發送資料并直接receive資料,将會出現死鎖:

1package main 
2
3import ( "fmt")func main (){
4 goo(32)
5}func goo(s int) {
6 counter := make(chan int)
7 counter <- s
8 fmt.Println(<-counter)
9}
           

在上面的示例中,向unbuffered channel中send資料的操作

counter <- s

是在main goroutine中進行的,從此channel中recv的操作

<-counter

也是在main goroutine中進行的。send的時候會直接阻塞main goroutine,使得recv操作無法被執行,go将探測到此問題,并報錯:

1fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan send]:
           

要修複此問題,隻需将send操作放在另一個goroutine中執行即可:

1package mainimport ( "fmt")func main() {
2 goo(32)
3}func goo(s int) {
4 counter := make(chan int) go func() {
5 counter <- s
6 }()
7 fmt.Println(<-counter)
8}
           

或者,将counter設定為一個容量為1的buffered channel:

1counter := make(chan int,1)
           

這樣放完一個資料後send不會阻塞(被recv之前放第二個資料才會阻塞),可以執行到recv操作。

unbuffered channel同步通信示例

下面通過sync.WaitGroup類型來等待程式的結束,分析多個goroutine之間通信時狀态的轉換。因為建立的channel是unbuffered類型的,是以send和recv都是阻塞的。

1package mainimport ( "fmt"
 2 "sync")// wg用于等待程式執行完成var wg sync.WaitGroupfunc main() {
 3 count := make(chan int) // 增加兩個待等待的goroutines
 4 wg.Add(2)
 5 fmt.Println("Start Goroutines") // 激活一個goroutine,label:"Goroutine-1"
 6 go printCounts("Goroutine-1", count) // 激活另一個goroutine,label:"Goroutine-2"
 7 go printCounts("Goroutine-2", count)
 8
 9 fmt.Println("Communication of channel begins") // 向channel中發送初始資料
10 count <- 1
11
12 // 等待goroutines都執行完成
13 fmt.Println("Waiting To Finish")
14 wg.Wait()
15 fmt.Println("\nTerminating the Program")
16}func printCounts(label string, count chan int) { // goroutine執行完成時,wg的計數器減1
17 defer wg.Done() for { // 從channel中接收資料
18 // 如果無資料可recv,則goroutine阻塞在此
19 val, ok := <-count if !ok {
20 fmt.Println("Channel was closed:",label) return
21 }
22 fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 {
23 fmt.Printf("Channel Closed from %s \n", label) // Close the channel
24 close(count) return
25 } // 輸出接收到的資料後,加1,并重新将其send到channel中
26 val++
27 count <- val
28 }
29}
           

上面的程式中,激活了兩個goroutine,激活這兩個goroutine後,向channel中發送一個初始資料值1,然後main goroutine将因為wg.Wait()等待2個goroutine都執行完成而被阻塞。

再看這兩個goroutine,這兩個goroutine執行完全一樣的函數代碼,它們都接收count這個channel的資料,但可能是goroutine1先接收到channel中的初始值1,也可能是goroutine2先接收到初始值1。接收到資料後輸出值,并在輸出後對資料加1,然後将加1後的資料再次send到channel,每次send都會将自己這個goroutine阻塞(因為unbuffered channel),此時另一個goroutine因為等待recv而執行。當加1後發送給channel的資料為10之後,某goroutine将關閉count channel,該goroutine将退出,wg的計數器減1,另一個goroutine因等待recv而阻塞的狀态将因為channel的關閉而失敗,ok狀态碼将讓該goroutine退出,于是wg的計數器減為0,main goroutine因為wg.Wait()而繼續執行後面的代碼。

使用for range疊代channel

前面都是在for無限循環中讀取channel中的資料,但也可以使用range來疊代channel,它會傳回每次疊代過程中所讀取的資料,直到channel被關閉。

例如,将上面示例中的printCounts()改為for-range的循環形式。

1func printCounts(label string, count chan int) { defer wg.Done() for val := range count {
2 fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 {
3 fmt.Printf("Channel Closed from %s \n", label) close(count) return
4 }
5 val++
6 count <- val
7 }
8}
           
多個"管道":輸出作為輸入

channel是goroutine與goroutine之間通信的基礎,一邊産生資料放進channel,另一邊從channel讀取放進來的資料。可以借此實作多個goroutine之間的資料交換,例如

goroutine_1->goroutine_2->goroutine_3

,就像bash的管道一樣,上一個指令的輸出可以不斷傳遞給下一個指令的輸入,隻不過golang借助channel可以在多個goroutine(如函數的執行)之間傳,而bash是在指令之間傳。

以下是一個示例,第一個函數getRandNum()用于生成随機整數,并将生成的整數放進第一個channel ch1中,第二個函數addRandNum()用于接收ch1中的資料(來自第一個函數),将其輸出,然後對接收的值加1後放進第二個channel ch2中,第三個函數printRes接收ch2中的資料并将其輸出。

如果将函數認為是Linux的指令,則類似于下面的指令行:ch1相當于第一個管道,ch2相當于第二個管道

1getRandNum | addRandNum | printRes
           

以下是代碼部分:

1package mainimport ( "fmt"
 2 "math/rand"
 3 "sync")var wg sync.WaitGroupfunc main() {
 4 wg.Add(3) // 建立兩個channel
 5 ch1 := make(chan int)
 6 ch2 := make(chan int) // 3個goroutine并行
 7 go getRandNum(ch1) go addRandNum(ch1, ch2) go printRes(ch2)
 8
 9 wg.Wait()
10}func getRandNum(out chan int) { // defer the wg.Done()
11 defer wg.Done() var random int
12 // 總共生成10個随機數
13 for i := 0; i < 10; i++ { // 生成[0,30)之間的随機整數并放進channel out
14 random = rand.Intn(30)
15 out <- random
16 } close(out)
17}func addRandNum(in,out chan int) { defer wg.Done() for v := range in { // 輸出從第一個channel中讀取到的資料
18 // 并将值+1後放進第二個channel中
19 fmt.Println("before +1:",v)
20 out <- (v + 1)
21 } close(out)
22}func printRes(in chan int){ defer wg.Done() for v := range in {
23 fmt.Println("after +1:",v)
24 }
25}           
指定channel的方向

上面通過兩個channel将3個goroutine連接配接起來,其中起連接配接作用的是第二個函數addRandNum()。在這個函數中使用了兩個channel作為參數:一個channel用于接收、一個channel用于發送。

其實channel類的參數變量可以指定資料流向: ●  

in <-chan int

:表示channel in通道隻用于接收資料

out chan<- int

:表示channel out通道隻用于發送資料

Go基礎系列:channel入門

隻用于接收資料的通道

<-chan

不可被關閉,因為關閉通道是針對發送資料而言的,表示無資料再需發送。對于recv來說,關閉通道是沒有意義的。

是以,上面示例中三個函數可改寫為:

1func getRandNum(out chan<- int) {
2 ...
3}func addRandNum(in <-chan int, out chan<- int) {
4 ...
5}func printRes(in <-chan int){
6 ...
7}
           
buffered channel異步隊列請求示例

下面是使用buffered channel實作異步處理請求的示例。

在此示例中:

 ●  有(最多)3個worker,每個worker是一個goroutine,它們有worker ID。

 ●  每個worker都從一個buffered channel中取出待執行的任務,每個任務是一個struct結構,包含了任務id(JobID),目前任務的隊列号(ID)以及任務的狀态(worker是否執行完成該任務)。

 ●  在main goroutine中将每個任務struct發送到buffered channel中,這個buffered channel的容量為10,也就是最多隻允許10個任務進行排隊。

 ●  worker每次取出任務後,輸出任務号,然後執行任務(run),最後輸出任務id已完成。

 ●  每個worker執行任務的方式很簡單:随機睡眠0-1秒鐘,并将任務标記為完成。

1package mainimport ( "fmt"
 2 "math/rand"
 3 "sync"
 4 "time")type Task struct {
 5 ID int
 6 JobID int
 7 Status string
 8 CreateTime time.Time
 9}func (t *Task) run() {
10 sleep := rand.Intn(1000)
11 time.Sleep(time.Duration(sleep) * time.Millisecond)
12 t.Status = "Completed"}var wg sync.WaitGroup// worker的數量,即使用多少goroutine執行任務const workerNum = 3func main() {
13 wg.Add(workerNum) // 建立容量為10的buffered channel
14 taskQueue := make(chan *Task, 10) // 激活goroutine,執行任務
15 for workID := 0; workID <= workerNum; workID++ { go worker(taskQueue, workID)
16 } // 将待執行任務放進buffered channel,共15個任務
17 for i := 1; i <= 15; i++ {
18 taskQueue <- &Task{
19 ID: i,
20 JobID: 100 + i,
21 CreateTime: time.Now(),
22 }
23 } close(taskQueue)
24 wg.Wait()
25}// 從buffered channel中讀取任務,并執行任務func worker(in <-chan *Task, workID int) { defer wg.Done() for v := range in {
26 fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
27 v.run()
28 fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
29 }
30}           
select多路監聽

很多時候想要同時操作多個channel,比如從ch1、ch2讀資料。Go提供了一個select語句塊,它像switch一樣工作,裡面放一些case語句塊,用來輪詢每個case語句塊的send或recv情況。

select

用法格式示例:

1select { // ch1有資料時,讀取到v1變量中
2 case v1 := <-ch1:
3 ... // ch2有資料時,讀取到v2變量中
4 case v2 := <-ch2:
5 ... // 所有case都不滿足條件時,執行default
6 default:
7 ...
8}
           

defalut語句是可選的,不允許fall through行為,但允許case語句塊為空塊。select會被return、break關鍵字中斷。

select的行為模式主要是對channel是否可讀進行輪詢,但也可以用來向channel發送資料。它的行為如下:

 ●  如果所有的case語句塊都被阻塞,則阻塞直到某個語句塊可以被處理

 ●  如果多個case同時滿足條件,則随機選擇一個進行處理

 ●  如果存在default且其它case都不滿足條件,則執行default。是以default必須要可執行而不能阻塞

需要注意的是,如果在select中執行send操作,則可能會永遠被send阻塞。是以,在使用send的時候,應該也使用defalut語句塊,保證send不會被阻塞。

一般來說,select會放在一個無限循環語句中,一直輪詢channel的可讀事件。

下面是一個示例,pump1()和pump2()都用于産生資料(一個産生偶數,一個産生奇數),并将資料分别放進ch1和ch2兩個通道,suck()則從ch1和ch2中讀取資料。然後在無限循環中使用select輪詢這兩個通道是否可讀,最後main goroutine在1秒後強制中斷所有goroutine。

1package mainimport ( "fmt"
 2 "time")func main() {
 3 ch1 := make(chan int)
 4 ch2 := make(chan int) go pump1(ch1) go pump2(ch2) go suck(ch1, ch2)
 5 time.Sleep(1e9)
 6}func pump1(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 0 {
 7 ch <- i
 8 }
 9 }
10}func pump2(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 1 {
11 ch <- i
12 }
13 }
14}func suck(ch1 chan int, ch2 chan int) { for { select { case v := <-ch1:
15 fmt.Printf("Recv on ch1: %d\n", v) case v := <-ch2:
16 fmt.Printf("Recv on ch2: %d\n", v)
17 }
18 }
19}           

原文釋出時間為:2018-11-20

本文作者:xxx

本文來自雲栖社群合作夥伴“

Golang語言社群

”,了解相關資訊可以關注“

”。