天天看點

go語言的并發原理(goroutine)

串行、并發與并行

串行:我們都是先讀國小,國小畢業後再讀國中,讀完國中再讀高中。

并發:同一時間段内執行多個任務(你在用微信和兩個女朋友聊天)。

并行:同一時刻執行多個任務(你和你朋友都在用微信和女朋友聊天)。

業界将如何實作并發程式設計總結歸納為各式各樣的并發模型,常見的并發模型有以下幾種:

線程&鎖模型

Actor模型

CSP模型

Fork&Join模型

Go語言中的并發程式主要是通過基于CSP(communicating sequential processes)的goroutine和channel來實作,當然也支援使用傳統的多線程共享記憶體的并發方式。

goroutine

在Go語言程式設計中你不需要去自己寫程序、線程、協程,你的技能包裡隻有一個技能——goroutine,當你需要讓某個任務并發執行的時候,你隻需要把這個任務包裝成一個函數,開啟一個 goroutine 去執行這個函數就可以了,就是這麼簡單粗暴。

go關鍵字

Go語言中使用 goroutine 非常簡單,隻需要在函數或方法調用前加上go關鍵字就可以建立一個 goroutine ,進而讓該函數或方法在新建立的 goroutine 中執行。

go func(){
  // ...
}()
           

啟動單個goroutine

啟動 goroutine 的方式非常簡單,隻需要在調用函數(普通函數和匿名函數)前加上一個go關鍵字。

package main

import (
	"fmt"
)

func hello() {
	fmt.Println("hello")
}

func main() {
	go hello() // 啟動另外一個goroutine去執行hello函數
	fmt.Println("你好")
}
           

将上述代碼重新編譯後執行,得到輸出結果如下。

你好

這一次的執行結果隻在終端列印了”你好”,并沒有列印 hello。這是為什麼呢?

其實在 Go 程式啟動時,Go 程式就會為 main 函數建立一個預設的 goroutine 。在上面的代碼中我們在 main 函數中使用 go 關鍵字建立了另外一個 goroutine 去執行 hello 函數,而此時 main goroutine 還在繼續往下執行,我們的程式中此時存在兩個并發執行的 goroutine。當 main 函數結束時整個程式也就結束了,同時 main goroutine 也結束了,所有由 main goroutine 建立的 goroutine 也會一同退出。也就是說我們的 main 函數退出太快,另外一個 goroutine 中的函數還未執行完程式就退出了,導緻未列印出“hello”。

那麼如何讓他完整執行呢。

//在mian函數的最後加上Sleep函數,休眠一秒鐘
time.Sleep(time.Second)
}
           

在上面的程式中使用time.Sleep讓 main goroutine 等待 hello goroutine執行結束是不優雅的,當然也是不準确的。

Go 語言中通過sync包為我們提供了一些常用的并發原語, sync 包中的WaitGroup。當你并不關心并發操作的結果或者有其它方式收集并發操作的結果時,WaitGroup是實作等待一組并發操作完成的好方法。

package main

import (
	"fmt"
	"sync"
)

// 聲明全局等待組變量
var wg sync.WaitGroup

func hello() {
	fmt.Println("hello")
	wg.Done() // 告知目前goroutine完成
}

func main() {
	wg.Add(1) // 登記1個goroutine
	go hello()
	fmt.Println("你好")
	wg.Wait() // 阻塞等待登記的goroutine完成,等待ag減為0
}
           

goroutine排程

goroutine 的排程是Go語言運作時(runtime)層面的實作,是完全由 Go 語言本身實作的一套排程系統——go scheduler。它的作用是按照一定的規則将所有的 goroutine 排程到作業系統線程上執行。

在經曆數個版本的疊代之後,目前 Go 語言的排程器采用的是 GPM 排程模型

go語言的并發原理(goroutine)

其中:

G:表示 goroutine,每執行一次go f()就建立一個 G,包含要執行的函數和上下文資訊。

全局隊列(Global Queue):存放等待運作的 G。

P:表示 goroutine 執行所需的資源,最多有 GOMAXPROCS 個。

P 的本地隊列:同全局隊列類似,存放的也是等待運作的G,存的數量有限,不超過256個。建立 G 時,G 優先加入到 P 的本地隊列,如果本地隊列滿了會批量移動部分 G 到全局隊列。

M:線程想運作任務就得擷取 P,從 P 的本地隊列擷取 G,當 P 的本地隊列為空時,M 也會嘗試從全局隊列或其他 P 的本地隊列擷取 G。M 運作 G,G 執行之後,M 會從 P 擷取下一個 G,不斷重複下去。

Goroutine 排程器和作業系統排程器是通過 M 結合起來的,每個 M 都代表了1個核心線程,作業系統排程器負責把核心線程配置設定到 CPU 的核上執行。

單從線程排程講,Go語言相比起其他語言的優勢在于OS線程是由OS核心來排程的, goroutine 則是由Go運作時(runtime)自己的排程器排程的,完全是在使用者态下完成的, 不涉及核心态與使用者态之間的頻繁切換,包括記憶體的配置設定與釋放,都是在使用者态維護着一塊大的記憶體池, 不直接調用系統的malloc函數(除非記憶體池需要改變),成本比排程OS線程低很多。 另一方面充分利用了多核的硬體資源,近似的把若幹goroutine均分在實體線程上, 再加上本身 goroutine 的超輕量級,以上種種特性保證了 goroutine 排程方面的性能。

M:N:把m個goroutine配置設定給n個作業系統線程去執行

goroutine的初始棧大小是2K,能夠輕松建立上萬個goroutine

channel

雖然可以使用共享記憶體進行資料交換,但是共享記憶體在不同的 goroutine 中容易發生競态問題。為了保證資料交換的正确性,很多并發模型中必須使用互斥量對記憶體進行加鎖,這種做法勢必造成性能問題。

Go語言采用的并發模型是CSP(Communicating Sequential Processes),提倡通過通信共享記憶體而不是通過共享記憶體而實作通信。

如果說 goroutine 是Go程式并發的執行體,channel就是它們之間的連接配接。channel是可以讓一個 goroutine 發送特定值到另一個 goroutine 的通信機制。

Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發資料的順序。每一個通道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。

channel類型

var 變量名稱 chan 元素類型
           

chan:是關鍵字

元素類型:是指通道中傳遞元素的類型

var ch1 chan int   // 聲明一個傳遞整型的通道
var ch2 chan bool  // 聲明一個傳遞布爾型的通道
var ch3 chan []int // 聲明一個傳遞int切片的通道
           

初始化channel

聲明的通道類型變量需要使用内置的make函數初始化之後才能使用。具體格式如下:

其中:

channel的緩沖大小是可選的。

ch4 := make(chan int)
ch5 := make(chan bool, 1)  // 聲明一個緩沖區大小為1的通道
           

channel操作

通道共有發送(send)、接收(receive)和關閉(close)三種操作。而發送和接收操作都使用**<-**符号。

發送

将一個值發送到通道中。

接收

從一個通道中接收值。

x := <- ch // 從ch中接收值并指派給變量x
<-ch       // 從ch中接收值,忽略結果
           

關閉

我們通過調用内置的close函數來關閉通道。

close(ch)

注意:一個通道值是可以被垃圾回收掉的。通道通常由發送方執行關閉操作,并且隻有在接收方明确等待通道關閉的信号時才需要執行關閉操作。它和關閉檔案不一樣,通常在結束操作之後關閉檔案是必須要做的,但關閉通道不是必須的。

關閉後的通道有以下特點:

對一個關閉的通道再發送值就會導緻 panic。

對一個關閉的通道進行接收會一直擷取值直到通道為空。

對一個關閉的并且沒有值的通道執行接收操作會得到對應類型的零值。

關閉一個已經關閉的通道會導緻 panic。

無緩沖的通道

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("發送成功")
}
           

上面這段代碼能夠通過編譯,但是執行的時候會出現以下錯誤:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:

main.main()

…/main.go:8 +0x54

deadlock表示我們程式中的 goroutine 都被挂起導緻程式死鎖了。為什麼會出現deadlock錯誤呢?

因為我們使用ch := make(chan int)建立的是無緩沖的通道,**無緩沖的通道隻有在有接收方能夠接收值的時候才能發送成功,否則會一直處于等待發送的階段。**同理,如果對一個無緩沖通道執行接收操作時,沒有任何向通道中發送值的操作那麼也會導緻接收操作阻塞。就像田徑比賽中的4x100接力賽,想要完成交棒必須有一個能夠接棒的運動員,否則隻能等待。簡單來說就是無緩沖的通道必須有至少一個接收方才能發送成功。

上面的代碼會阻塞在ch <- 10這一行代碼形成死鎖,那如何解決這個問題呢?

其中一種可行的方法是建立一個 goroutine 去接收值,例如:

func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}
           
func main() {
	ch := make(chan int)
	go recv(ch) // 建立一個 goroutine 從通道接收值
	ch <- 10
	fmt.Println("發送成功")
}
           

首先無緩沖通道ch上的發送操作會阻塞,直到另一個 goroutine 在該通道上執行接收操作,這時數字10才能發送成功,兩個 goroutine 将繼續執行。相反,如果接收操作先執行,接收方所在的 goroutine 将阻塞,直到 main goroutine 中向該通道發送數字10。

使用無緩沖通道進行通信将導緻發送和接收的 goroutine 同步化。是以,無緩沖通道也被稱為同步通道。

有緩沖的通道

還有另外一種解決上面死鎖問題的方法,那就是使用有緩沖區的通道。我們可以在使用 make 函數初始化通道時,可以為其指定通道的容量,例如:

func main() {
	ch := make(chan int, 1) // 建立一個容量為1的有緩沖區通道
	ch <- 10
	fmt.Println("發送成功")
}
           

隻要通道的容量大于零,那麼該通道就屬于有緩沖的通道,通道的容量表示通道中最大能存放的元素數量。當通道内已有元素數達到最大容量後,再向通道執行發送操作就會阻塞,除非有從通道執行接收操作。就像你小區的快遞櫃隻有那麼個多格子,格子滿了就裝不下了,就阻塞了,等到别人取走一個快遞員就能往裡面放一個。

我們可以使用内置的len函數擷取通道内元素的數量,使用cap函數擷取通道的容量,雖然我們很少會這麼做。

多傳回值模式

當向通道中發送完資料時,我們可以通過close函數來關閉通道。當一個通道被關閉後,再往該通道發送值會引發panic,從該通道取值的操作會先取完通道中的值。通道内的值被接收完後再對通道執行接收操作得到的值會一直都是對應元素類型的零值。那我們如何判斷一個通道是否被關閉了呢?

對一個通道執行接收操作時支援使用如下多傳回值模式。

value, ok := <- ch

其中:

value:從通道中取出的值,如果通道被關閉則傳回對應類型的零值。

ok:通道ch關閉時傳回 false,否則傳回 true。

下面代碼片段中的f2函數會循環從通道ch中接收所有值,直到通道被關閉後退出。

func f2(ch chan int) {
	for {
		v, ok := <-ch
		if !ok {
			fmt.Println("通道已關閉")
			break
		}
		fmt.Printf("v:%#v ok:%#v\n", v, ok)
	}
}

func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	close(ch)
	f2(ch)
}
           
for range接收值

通常我們會選擇使用for range循環從通道中接收值,當通道被關閉後,會在通道内的所有值被接收完畢後會自動退出循環。

func f3(ch chan int) {
	for v := range ch {
		fmt.Println(v)
	}
}
           

注意:目前Go語言中并沒有提供一個不對通道進行讀取操作就能判斷通道是否被關閉的方法。不能簡單的通過len(ch)操作來判斷通道是否被關閉。

單向通道

在某些場景下我們可能會将通道作為參數在多個任務函數間進行傳遞,通常我們會選擇在不同的任務函數中對通道的使用進行限制,比如限制通道在某個函數中隻能執行發送或隻能執行接收操作。想象一下,我們現在有Producer和Consumer兩個函數,其中Producer函數會傳回一個通道,并且會持續将符合條件的資料發送至該通道,并在發送完成後将該通道關閉。而Consumer函數的任務是從通道中接收值進行計算,這兩個函數之間通過Processer函數傳回的通道進行通信。完整的示例代碼如下。

package main

import (
	"fmt"
)

// Producer 傳回一個通道
// 并持續将符合條件的資料發送至傳回的通道中
// 資料發送完成後會将傳回的通道關閉
func Producer() chan int {
	ch := make(chan int, 2)
	// 建立一個新的goroutine執行發送資料的任務
	go func() {
		for i := 0; i < 10; i++ {
			if i%2 == 1 {
				ch <- i
			}
		}
		close(ch) // 任務完成後關閉通道
	}()

	return ch
}

// Consumer 從通道中接收資料進行計算
func Consumer(ch chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}

func main() {
	ch := Producer()

	res := Consumer(ch)
	fmt.Println(res) // 25

}
           

從上面的示例代碼中可以看出正常情況下Consumer函數中隻會對通道進行接收操作,但是這不代表不可以在Consumer函數中對通道進行發送操作。作為Producer函數的提供者,我們在傳回通道的時候可能隻希望調用方拿到傳回的通道後隻能對其進行接收操作。但是我們沒有辦法阻止在Consumer函數中對通道進行發送操作。

Go語言中提供了單向通道來處理這種需要限制通道隻能進行某種操作的情況。

<- chan int // 隻接收通道,隻能接收不能發送
chan <- int // 隻發送通道,隻能發送不能接收
           

其中,箭頭<-和關鍵字chan的相對位置表明了目前通道允許的操作,這種限制将在編譯階段進行檢測。另外對一個隻接收通道執行close也是不允許的,因為預設通道的關閉操作應該由發送方來完成。

我們使用單向通道将上面的示例代碼進行如下改造。

// Producer2 傳回一個接收通道
func Producer2() <-chan int {
	ch := make(chan int, 2)
	// 建立一個新的goroutine執行發送資料的任務
	go func() {
		for i := 0; i < 10; i++ {
			if i%2 == 1 {
				ch <- i
			}
		}
		close(ch) // 任務完成後關閉通道
	}()

	return ch
}

// Consumer2 參數為接收通道
func Consumer2(ch <-chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}

func main() {
	ch2 := Producer2()
  
	res2 := Consumer2(ch2)
	fmt.Println(res2) // 25
}
           

這一次,Producer函數傳回的是一個隻接收通道,這就從代碼層面限制了該函數傳回的通道隻能進行接收操作,保證了資料安全。很多讀者看到這個示例可能會覺着這樣的限制是多餘的,但是試想一下如果Producer函數可以在其他地方被其他人調用,你該如何限制他人不對該通道執行發送操作呢?并且傳回限制操作的單向通道也會讓代碼語義更清晰、更易讀。

在函數傳參及任何指派操作中全向通道(正常通道)可以轉換為單向通道,但是無法反向轉換。

var ch3 = make(chan int, 1)
ch3 <- 10
close(ch3)
Consumer2(ch3) // 函數傳參時将ch3轉為單向通道

var ch4 = make(chan int, 1)
ch4 <- 10
var ch5 <-chan int // 聲明一個隻接收通道ch5
ch5 = ch4          // 變量指派時将ch4轉為單向通道
<-ch5
           
go語言的并發原理(goroutine)

worker pool(goroutine池)

使用workerpool模式,控制goroutine的數量,防止goroutine洩露和暴漲

go語言的并發原理(goroutine)

繼續閱讀