天天看點

15.Go語言基礎之并發

并發程式設計在目前軟體領域是一個非常重要的概念,随着CPU等硬體的發展,我們無一例外的想讓我們的程式運作的快一點、再快一點。Go語言在語言層面天生支援并發,充分利用現代CPU的多核優勢,這也是Go語言能夠大範圍流行的一個很重要的原因。

基本概念

首先我們先來了解幾個與并發程式設計相關的基本概念。

串行、并發與并行

串行:我們都是先讀國小,國小畢業後再讀國中,讀完國中再讀高中。

并發:同一時間段内執行多個任務(你在用微信和兩個女朋友聊天)。

并行:同一時刻執行多個任務(你和你朋友都在用微信和女朋友聊天)。

程序、線程和協程

程序(process):程式在作業系統中的一次執行過程,系統進行資源配置設定和排程的一個獨立機關。

線程(thread):作業系統基于程序開啟的輕量級程序,是作業系統排程執行的最小機關。

協程(coroutine):非作業系統提供而是由使用者自行建立和控制的使用者态‘線程’,比線程更輕量級。

并發模型

業界将如何實作并發程式設計總結歸納為各式各樣的并發模型,常見的并發模型有以下幾種:

  • 線程&鎖模型
  • Actor模型
  • CSP模型
  • Fork&Join模型

Go語言中的并發程式主要是通過基于CSP(communicating sequential processes)的goroutine和channel來實作,當然也支援使用傳統的多線程共享記憶體的并發方式。

goroutine

Goroutine 是 Go 語言支援并發的核心,在一個Go程式中同時建立成百上千個goroutine是非常普遍的,一個goroutine會以一個很小的棧開始其生命周期,一般隻需要2KB。差別于作業系統線程由系統核心進行排程, goroutine 是由Go運作時(runtime)負責排程。例如Go運作時會智能地将 m個goroutine 合理地配置設定給n個作業系統線程,實作類似m:n的排程機制,不再需要Go開發者自行在代碼層面維護一個線程池。

Goroutine 是 Go 程式中最基本的并發執行單元。每一個 Go 程式都至少包含一個 goroutine——main goroutine,當 Go 程式啟動時它會自動建立。

在Go語言程式設計中你不需要去自己寫程序、線程、協程,你的技能包裡隻有一個技能——goroutine,當你需要讓某個任務并發執行的時候,你隻需要把這個任務包裝成一個函數,開啟一個 goroutine 去執行這個函數就可以了,就是這麼簡單粗暴。

go關鍵字

Go語言中使用 goroutine 非常簡單,隻需要在函數或方法調用前加上

go

關鍵字就可以建立一個 goroutine ,進而讓該函數或方法在新建立的 goroutine 中執行。

匿名函數也支援使用

go

關鍵字建立 goroutine 去執行。

go func(){
  // ...
}()
           

一個 goroutine 必定對應一個函數/方法,可以建立多個 goroutine 去執行相同的函數/方法。

啟動單個goroutine

啟動 goroutine 的方式非常簡單,隻需要在調用函數(普通函數和匿名函數)前加上一個

go

關鍵字。

我們先來看一個在 main 函數中執行普通函數調用的示例。

package main

import (
	"fmt"
)

func hello() {
	fmt.Println("hello")
}

func main() {
	hello()
	fmt.Println("你好")
}
           

将上面的代碼編譯後執行,得到的結果如下:

hello
你好
           

代碼中 hello 函數和其後面的列印語句是串行的。

15.Go語言基礎之并發

接下來我們在調用 hello 函數前面加上關鍵字

go

,也就是啟動一個 goroutine 去執行 hello 這個函數。

func main() {
	go hello() // 啟動另外一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
}
           

将上述代碼重新編譯後執行,得到輸出結果如下。

你好
           

這一次的執行結果隻在終端列印了”你好”,并沒有列印

hello

。這是為什麼呢?

其實在 Go 程式啟動時,Go 程式就會為 main 函數建立一個預設的 goroutine 。在上面的代碼中我們在 main 函數中使用 go 關鍵字建立了另外一個 goroutine 去執行 hello 函數,而此時 main goroutine 還在繼續往下執行,我們的程式中此時存在兩個并發執行的 goroutine。當 main 函數結束時整個程式也就結束了,同時 main goroutine 也結束了,所有由 main goroutine 建立的 goroutine 也會一同退出。也就是說我們的 main 函數退出太快,另外一個 goroutine 中的函數還未執行完程式就退出了,導緻未列印出“hello”。

main goroutine 就像是《權利的遊戲》中的夜王,其他的 goroutine 都是夜王轉化出的異鬼,夜王一死它轉化的那些異鬼也就全部GG了。

是以我們要想辦法讓 main 函數‘“等一等”将在另一個 goroutine 中運作的 hello 函數。其中最簡單粗暴的方式就是在 main 函數中“time.Sleep”一秒鐘了(這裡的1秒鐘隻是我們為了保證新的 goroutine 能夠被正常建立和執行而設定的一個值)。

按如下方式修改我們的示例代碼。

package main

import (
	"fmt"
	"time"
)

func hello() {
	fmt.Println("hello")
}

func main() {
	go hello()
	fmt.Println("你好")
	time.Sleep(time.Second)
}
           

将我們的程式重新編譯後再次執行,程式會在終端輸出如下結果,并且會短暫停頓一會兒。

你好
hello
           

為什麼會先列印

你好

呢?

這是因為在程式中建立 goroutine 執行函數需要一定的開銷,而與此同時 main 函數所在的 goroutine 是繼續執行的。

15.Go語言基礎之并發

在上面的程式中使用

time.Sleep

讓 main goroutine 等待 hello goroutine執行結束是不優雅的,當然也是不準确的。

Go 語言中通過

sync

包為我們提供了一些常用的并發原語,我們會在後面的小節單獨介紹

sync

包中的内容。在這一小節,我們會先介紹一下 sync 包中的

WaitGroup

。當你并不關心并發操作的結果或者有其它方式收集并發操作的結果時,

WaitGroup

是實作等待一組并發操作完成的好方法。

下面的示例代碼中我們在 main goroutine 中使用

sync.WaitGroup

來等待 hello goroutine 完成後再退出。

package main

import (
	"fmt"
	"sync"
)

// 聲明全局等待組變量
var wg sync.WaitGroup

func hello() {
	fmt.Println("hello")
	wg.Done() // 告知目前goroutine完成
}

func main() {
	wg.Add(1) // 登記1個goroutine
	go hello()
	fmt.Println("你好")
	wg.Wait() // 阻塞等待登記的goroutine完成
}
           

将代碼編譯後再執行,得到的輸出結果和之前一緻,但是這一次程式不再會有多餘的停頓,hello goroutine 執行完畢後程式直接退出。

啟動多個goroutine

在 Go 語言中實作并發就是這樣簡單,我們還可以啟動多個 goroutine 。讓我們再來看一個新的代碼示例。這裡同樣使用了

sync.WaitGroup

來實作 goroutine 的同步。

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // goroutine結束就登記-1
	fmt.Println("hello", i)
}
func main() {
	for i := 0; i < 10; i++ {
		wg.Add(1) // 啟動一個goroutine就登記+1
		go hello(i)
	}
	wg.Wait() // 等待所有登記的goroutine都結束
}
           

多次執行上面的代碼會發現每次終端上列印數字的順序都不一緻。這是因為10個 goroutine 是并發執行的,而 goroutine 的排程是随機的。

動态棧

作業系統的線程一般都有固定的棧記憶體(通常為2MB),而 Go 語言中的 goroutine 非常輕量級,一個 goroutine 的初始棧空間很小(一般為2KB),是以在 Go 語言中一次建立數萬個 goroutine 也是可能的。并且 goroutine 的棧不是固定的,可以根據需要動态地增大或縮小, Go 的 runtime 會自動為 goroutine 配置設定合适的棧空間。

goroutine排程

作業系統核心在排程時會挂起目前正在執行的線程并将寄存器中的内容儲存到記憶體中,然後選出接下來要執行的線程并從記憶體中恢複該線程的寄存器資訊,然後恢複執行該線程的現場并開始執行線程。從一個線程切換到另一個線程需要完整的上下文切換。因為可能需要多次記憶體通路,索引這個切換上下文的操作開銷較大,會增加運作的cpu周期。

差別于作業系統核心排程作業系統線程,goroutine 的排程是Go語言運作時(runtime)層面的實作,是完全由 Go 語言本身實作的一套排程系統——go scheduler。它的作用是按照一定的規則将所有的 goroutine 排程到作業系統線程上執行。

在經曆數個版本的疊代之後,目前 Go 語言的排程器采用的是

GPM

排程模型。

15.Go語言基礎之并發

其中:

  • G:表示 goroutine,每執行一次

    go f()

    就建立一個 G,包含要執行的函數和上下文資訊。
  • 全局隊列(Global Queue):存放等待運作的 G。
  • P:表示 goroutine 執行所需的資源,最多有 GOMAXPROCS 個。
  • P 的本地隊列:同全局隊列類似,存放的也是等待運作的G,存的數量有限,不超過256個。建立 G 時,G 優先加入到 P 的本地隊列,如果本地隊列滿了會批量移動部分 G 到全局隊列。
  • M:線程想運作任務就得擷取 P,從 P 的本地隊列擷取 G,當 P 的本地隊列為空時,M 也會嘗試從全局隊列或其他 P 的本地隊列擷取 G。M 運作 G,G 執行之後,M 會從 P 擷取下一個 G,不斷重複下去。
  • Goroutine 排程器和作業系統排程器是通過 M 結合起來的,每個 M 都代表了1個核心線程,作業系統排程器負責把核心線程配置設定到 CPU 的核上執行。

單從線程排程講,Go語言相比起其他語言的優勢在于OS線程是由OS核心來排程的, goroutine 則是由Go運作時(runtime)自己的排程器排程的,完全是在使用者态下完成的, 不涉及核心态與使用者态之間的頻繁切換,包括記憶體的配置設定與釋放,都是在使用者态維護着一塊大的記憶體池, 不直接調用系統的malloc函數(除非記憶體池需要改變),成本比排程OS線程低很多。 另一方面充分利用了多核的硬體資源,近似的把若幹goroutine均分在實體線程上, 再加上本身 goroutine 的超輕量級,以上種種特性保證了 goroutine 排程方面的性能。

GOMAXPROCS

Go運作時的排程器使用

GOMAXPROCS

參數來确定需要使用多少個 OS 線程來同時執行 Go 代碼。預設值是機器上的 CPU 核心數。例如在一個 8 核心的機器上,GOMAXPROCS 預設為 8。Go語言中可以通過

runtime.GOMAXPROCS

函數設定目前程式并發時占用的 CPU邏輯核心數。(Go1.5版本之前,預設使用的是單核心執行。Go1.5 版本之後,預設使用全部的CPU 邏輯核心數。)

練習題

  1. 請寫出下面程式的執行結果。
for i := 0; i < 5; i++ {
   	go func() {
   		fmt.Println(i)
   	}()
   }
           

channel

單純地将函數并發執行是沒有意義的。函數與函數間需要交換資料才能展現并發執行函數的意義。

雖然可以使用共享記憶體進行資料交換,但是共享記憶體在不同的 goroutine 中容易發生競态問題。為了保證資料交換的正确性,很多并發模型中必須使用互斥量對記憶體進行加鎖,這種做法勢必造成性能問題。

Go語言采用的并發模型是

CSP(Communicating Sequential Processes)

,提倡通過通信共享記憶體而不是通過共享記憶體而實作通信。

如果說 goroutine 是Go程式并發的執行體,

channel

就是它們之間的連接配接。

channel

是可以讓一個 goroutine 發送特定值到另一個 goroutine 的通信機制。

Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發資料的順序。每一個通道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。

channel類型

channel

是 Go 語言中一種特有的類型。聲明通道類型變量的格式如下:

var 變量名稱 chan 元素類型
           

其中:

  • chan:是關鍵字
  • 元素類型:是指通道中傳遞元素的類型

舉幾個例子:

var ch1 chan int   // 聲明一個傳遞整型的通道
var ch2 chan bool  // 聲明一個傳遞布爾型的通道
var ch3 chan []int // 聲明一個傳遞int切片的通道
           

channel零值

未初始化的通道類型變量其預設零值是

nil

var ch chan int
fmt.Println(ch) // <nil>
           

初始化channel

聲明的通道類型變量需要使用内置的

make

函數初始化之後才能使用。具體格式如下:

其中:

  • channel的緩沖大小是可選的。

舉幾個例子:

ch4 := make(chan int)
ch5 := make(chan bool, 1)  // 聲明一個緩沖區大小為1的通道
           

channel操作

通道共有發送(send)、接收(receive)和關閉(close)三種操作。而發送和接收操作都使用

<-

符号。

現在我們先使用以下語句定義一個通道:

發送

将一個值發送到通道中。

接收

從一個通道中接收值。

x := <- ch // 從ch中接收值并指派給變量x
<-ch       // 從ch中接收值,忽略結果
           

關閉

我們通過調用内置的

close

函數來關閉通道。

**注意:**一個通道值是可以被垃圾回收掉的。通道通常由發送方執行關閉操作,并且隻有在接收方明确等待通道關閉的信号時才需要執行關閉操作。它和關閉檔案不一樣,通常在結束操作之後關閉檔案是必須要做的,但關閉通道不是必須的。

關閉後的通道有以下特點:

  1. 對一個關閉的通道再發送值就會導緻 panic。
  2. 對一個關閉的通道進行接收會一直擷取值直到通道為空。
  3. 對一個關閉的并且沒有值的通道執行接收操作會得到對應類型的零值。
  4. 關閉一個已經關閉的通道會導緻 panic。

無緩沖的通道

無緩沖的通道又稱為阻塞的通道。我們來看一下如下代碼片段。

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("發送成功")
}
           

上面這段代碼能夠通過編譯,但是執行的時候會出現以下錯誤:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../main.go:8 +0x54
           

deadlock

表示我們程式中的 goroutine 都被挂起導緻程式死鎖了。為什麼會出現

deadlock

錯誤呢?

因為我們使用

ch := make(chan int)

建立的是無緩沖的通道,無緩沖的通道隻有在有接收方能夠接收值的時候才能發送成功,否則會一直處于等待發送的階段。同理,如果對一個無緩沖通道執行接收操作時,沒有任何向通道中發送值的操作那麼也會導緻接收操作阻塞。就像田徑比賽中的4x100接力賽,想要完成交棒必須有一個能夠接棒的運動員,否則隻能等待。簡單來說就是無緩沖的通道必須有至少一個接收方才能發送成功。

上面的代碼會阻塞在

ch <- 10

這一行代碼形成死鎖,那如何解決這個問題呢?

其中一種可行的方法是建立一個 goroutine 去接收值,例如:

func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}

func main() {
	ch := make(chan int)
	go recv(ch) // 建立一個 goroutine 從通道接收值
	ch <- 10
	fmt.Println("發送成功")
}
           

首先無緩沖通道

ch

上的發送操作會阻塞,直到另一個 goroutine 在該通道上執行接收操作,這時數字10才能發送成功,兩個 goroutine 将繼續執行。相反,如果接收操作先執行,接收方所在的 goroutine 将阻塞,直到 main goroutine 中向該通道發送數字10。

使用無緩沖通道進行通信将導緻發送和接收的 goroutine 同步化。是以,無緩沖通道也被稱為

同步通道

有緩沖的通道

還有另外一種解決上面死鎖問題的方法,那就是使用有緩沖區的通道。我們可以在使用 make 函數初始化通道時,可以為其指定通道的容量,例如:

func main() {
	ch := make(chan int, 1) // 建立一個容量為1的有緩沖區通道
	ch <- 10
	fmt.Println("發送成功")
}
           

隻要通道的容量大于零,那麼該通道就屬于有緩沖的通道,通道的容量表示通道中最大能存放的元素數量。當通道内已有元素數達到最大容量後,再向通道執行發送操作就會阻塞,除非有從通道執行接收操作。就像你小區的快遞櫃隻有那麼個多格子,格子滿了就裝不下了,就阻塞了,等到别人取走一個快遞員就能往裡面放一個。

我們可以使用内置的

len

函數擷取通道内元素的數量,使用

cap

函數擷取通道的容量,雖然我們很少會這麼做。

多傳回值模式

當向通道中發送完資料時,我們可以通過

close

函數來關閉通道。當一個通道被關閉後,再往該通道發送值會引發

panic

,從該通道取值的操作會先取完通道中的值。通道内的值被接收完後再對通道執行接收操作得到的值會一直都是對應元素類型的零值。那我們如何判斷一個通道是否被關閉了呢?

對一個通道執行接收操作時支援使用如下多傳回值模式。

value, ok := <- ch
           

其中:

  • value:從通道中取出的值,如果通道被關閉則傳回對應類型的零值。
  • ok:通道ch關閉時傳回 false,否則傳回 true。

下面代碼片段中的

f2

函數會循環從通道

ch

中接收所有值,直到通道被關閉後退出。

func f2(ch chan int) {
	for {
		v, ok := <-ch
		if !ok {
			fmt.Println("通道已關閉")
			break
		}
		fmt.Printf("v:%#v ok:%#v\n", v, ok)
	}
}

func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	close(ch)
	f2(ch)
}
           

for range接收值

通常我們會選擇使用

for range

循環從通道中接收值,當通道被關閉後,會在通道内的所有值被接收完畢後會自動退出循環。上面那個示例我們使用

for range

改寫後會很簡潔。

func f3(ch chan int) {
	for v := range ch {
		fmt.Println(v)
	}
}
           

**注意:**目前Go語言中并沒有提供一個不對通道進行讀取操作就能判斷通道是否被關閉的方法。不能簡單的通過

len(ch)

操作來判斷通道是否被關閉。

單向通道

在某些場景下我們可能會将通道作為參數在多個任務函數間進行傳遞,通常我們會選擇在不同的任務函數中對通道的使用進行限制,比如限制通道在某個函數中隻能執行發送或隻能執行接收操作。想象一下,我們現在有

Producer

Consumer

兩個函數,其中

Producer

函數會傳回一個通道,并且會持續将符合條件的資料發送至該通道,并在發送完成後将該通道關閉。而

Consumer

函數的任務是從通道中接收值進行計算,這兩個函數之間通過

Processer

函數傳回的通道進行通信。完整的示例代碼如下。

package main

import (
	"fmt"
)

// Producer 傳回一個通道
// 并持續将符合條件的資料發送至傳回的通道中
// 資料發送完成後會将傳回的通道關閉
func Producer() chan int {
	ch := make(chan int, 2)
	// 建立一個新的goroutine執行發送資料的任務
	go func() {
		for i := 0; i < 10; i++ {
			if i%2 == 1 {
				ch <- i
			}
		}
		close(ch) // 任務完成後關閉通道
	}()

	return ch
}

// Consumer 從通道中接收資料進行計算
func Consumer(ch chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}

func main() {
	ch := Producer()

	res := Consumer(ch)
	fmt.Println(res) // 25

}
           

從上面的示例代碼中可以看出正常情況下

Consumer

函數中隻會對通道進行接收操作,但是這不代表不可以在

Consumer

函數中對通道進行發送操作。作為

Producer

函數的提供者,我們在傳回通道的時候可能隻希望調用方拿到傳回的通道後隻能對其進行接收操作。但是我們沒有辦法阻止在

Consumer

函數中對通道進行發送操作。

Go語言中提供了單向通道來處理這種需要限制通道隻能進行某種操作的情況。

<- chan int // 隻接收通道,隻能接收不能發送
chan <- int // 隻發送通道,隻能發送不能接收
           

其中,箭頭

<-

和關鍵字

chan

的相對位置表明了目前通道允許的操作,這種限制将在編譯階段進行檢測。另外對一個隻接收通道執行close也是不允許的,因為預設通道的關閉操作應該由發送方來完成。

我們使用單向通道将上面的示例代碼進行如下改造。

// Producer2 傳回一個接收通道
func Producer2() <-chan int {
	ch := make(chan int, 2)
	// 建立一個新的goroutine執行發送資料的任務
	go func() {
		for i := 0; i < 10; i++ {
			if i%2 == 1 {
				ch <- i
			}
		}
		close(ch) // 任務完成後關閉通道
	}()

	return ch
}

// Consumer2 參數為接收通道
func Consumer2(ch <-chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}

func main() {
	ch2 := Producer2()
  
	res2 := Consumer2(ch2)
	fmt.Println(res2) // 25
}
           

這一次,

Producer

函數傳回的是一個隻接收通道,這就從代碼層面限制了該函數傳回的通道隻能進行接收操作,保證了資料安全。很多讀者看到這個示例可能會覺着這樣的限制是多餘的,但是試想一下如果

Producer

函數可以在其他地方被其他人調用,你該如何限制他人不對該通道執行發送操作呢?并且傳回限制操作的單向通道也會讓代碼語義更清晰、更易讀。

在函數傳參及任何指派操作中全向通道(正常通道)可以轉換為單向通道,但是無法反向轉換。

var ch3 = make(chan int, 1)
ch3 <- 10
close(ch3)
Consumer2(ch3) // 函數傳參時将ch3轉為單向通道

var ch4 = make(chan int, 1)
ch4 <- 10
var ch5 <-chan int // 聲明一個隻接收通道ch5
ch5 = ch4          // 變量指派時将ch4轉為單向通道
<-ch5
           

總結

下面的表格中總結了對不同狀态下的通道執行相應操作的結果。

15.Go語言基礎之并發

**注意:**對已經關閉的通道再執行 close 也會引發 panic。

select多路複用

在某些場景下我們可能需要同時從多個通道接收資料。通道在接收資料時,如果沒有資料可以被接收那麼目前 goroutine 将會發生阻塞。你也許會寫出如下代碼嘗試使用周遊的方式來實作從多個通道中接收值。

for{
    // 嘗試從ch1接收值
    data, ok := <-ch1
    // 嘗試從ch2接收值
    data, ok := <-ch2
    …
}
           

這種方式雖然可以實作從多個通道接收值的需求,但是程式的運作性能會差很多。Go 語言内置了

select

關鍵字,使用它可以同時響應多個通道的操作。

Select 的使用方式類似于之前學到的 switch 語句,它也有一系列 case 分支和一個預設的分支。每個 case 分支會對應一個通道的通信(接收或發送)過程。select 會一直等待,直到其中的某個 case 的通信操作完成時,就會執行該 case 分支對應的語句。具體格式如下:

select {
case <-ch1:
	//...
case data := <-ch2:
	//...
case ch3 <- 10:
	//...
default:
	//預設操作
}
           

Select 語句具有以下特點。

  • 可處理一個或多個 channel 的發送/接收操作。
  • 如果多個 case 同時滿足,select 會随機選擇一個執行。
  • 對于沒有 case 的 select 會一直阻塞,可用于阻塞 main 函數,防止退出。

下面的示例代碼能夠在終端列印出10以内的奇數,我們借助這個代碼片段來看一下 select 的具體使用。

package main

import "fmt"

func main() {
	ch := make(chan int, 1)
	for i := 1; i <= 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}
           

上面的代碼輸出内容如下。

1
3
5
7
9
           

示例中的代碼首先是建立了一個緩沖區大小為1的通道 ch,進入 for 循環後:

  • 第一次循環時 i = 1,select 語句中包含兩個 case 分支,此時由于通道中沒有值可以接收,是以

    x := <-ch

    這個 case 分支不滿足,而

    ch <- i

    這個分支可以執行,會把1發送到通道中,結束本次 for 循環;
  • 第二次 for 循環時,i = 2,由于通道緩沖區已滿,是以

    ch <- i

    這個分支不滿足,而

    x := <-ch

    這個分支可以執行,從通道接收值1并指派給變量 x ,是以會在終端列印出 1;
  • 後續的 for 循環以此類推會依次列印出3、5、7、9。

通道誤用示例

接下來,我們将展示兩個因誤用通道導緻程式出現 bug 的代碼片段,希望能夠加深讀者對通道操作的印象。

示例1

各位讀者可以檢視以下示例代碼,嘗試找出其中存在的問題。

// demo1 通道誤用導緻的bug
func demo1() {
	wg := sync.WaitGroup{}

	ch := make(chan int, 10)
	for i := 0; i < 10; i++ {
		ch <- i
	}
	close(ch)

	wg.Add(3)
	for j := 0; j < 3; j++ {
		go func() {
			for {
				task := <-ch
				// 這裡假設對接收的資料執行某些操作
				fmt.Println(task)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}
           

将上述代碼編譯執行後,匿名函數所在的 goroutine 并不會按照預期在通道被關閉後退出。因為

task := <- ch

的接收操作在通道被關閉後會一直接收到零值,而不會退出。此處的接收操作應該使用

task, ok := <- ch

,通過判斷布爾值

ok

為假時退出;或者使用select 來處理通道。

示例2

各位讀者閱讀下方代碼片段,嘗試找出其中存在的問題。

// demo2 通道誤用導緻的bug
func demo2() {
	ch := make(chan string)
	go func() {
		// 這裡假設執行一些耗時的操作
		time.Sleep(3 * time.Second)
		ch <- "job result"
	}()

	select {
	case result := <-ch:
		fmt.Println(result)
	case <-time.After(time.Second): // 較小的逾時時間
		return
	}
}
           

上述代碼片段可能導緻 goroutine 洩露(goroutine 并未按預期退出并銷毀)。由于 select 命中了逾時邏輯,導緻通道沒有消費者(無接收操作),而其定義的通道為無緩沖通道,是以 goroutine 中的

ch <- "job result"

操作會一直阻塞,最終導緻 goroutine 洩露。

并發安全和鎖

有時候我們的代碼中可能會存在多個 goroutine 同時操作一個資源(臨界區)的情況,這種情況下就會發生

競态問題

(資料競态)。這就好比現實生活中十字路口被各個方向的汽車競争,還有火車上的衛生間被車廂裡的人競争。

我們用下面的代碼示範一個資料競争的示例。

package main

import (
	"fmt"
	"sync"
)

var (
	x int64

	wg sync.WaitGroup // 等待組
)

// add 對全局變量x執行5000次加1操作
func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}

func main() {
	wg.Add(2)

	go add()
	go add()

	wg.Wait()
	fmt.Println(x)
}
           

我們将上面的代碼編譯後執行,不出意外每次執行都會輸出諸如9537、5865、6527等不同的結果。這是為什麼呢?

在上面的示例代碼片中,我們開啟了兩個 goroutine 分别執行 add 函數,這兩個 goroutine 在通路和修改全局的

x

變量時就會存在資料競争,某個 goroutine 中對全局變量

x

的修改可能會覆寫掉另一個 goroutine 中的操作,是以導緻最後的結果與預期不符。

互斥鎖

互斥鎖是一種常用的控制共享資源通路的方法,它能夠保證同一時間隻有一個 goroutine 可以通路共享資源。Go 語言中使用

sync

包中提供的

Mutex

類型來實作互斥鎖。

sync.Mutex

提供了兩個方法供我們使用。

方法名 功能
func (m *Mutex) Lock() 擷取互斥鎖
func (m *Mutex) Unlock() 釋放互斥鎖

我們在下面的示例代碼中使用互斥鎖限制每次隻有一個 goroutine 才能修改全局變量

x

,進而修複上面代碼中的問題。

package main

import (
	"fmt"
	"sync"
)

// sync.Mutex

var (
	x int64

	wg sync.WaitGroup // 等待組

	m sync.Mutex // 互斥鎖
)

// add 對全局變量x執行5000次加1操作
func add() {
	for i := 0; i < 5000; i++ {
		m.Lock() // 修改x前加鎖
		x = x + 1
		m.Unlock() // 改完解鎖
	}
	wg.Done()
}

func main() {
	wg.Add(2)

	go add()
	go add()

	wg.Wait()
	fmt.Println(x)
}
           

将上面的代碼編譯後多次執行,每一次都會得到預期中的結果——10000。

使用互斥鎖能夠保證同一時間有且隻有一個 goroutine 進入臨界區,其他的 goroutine 則在等待鎖;當互斥鎖釋放後,等待的 goroutine 才可以擷取鎖進入臨界區,多個 goroutine 同時等待一個鎖時,喚醒的政策是随機的。

讀寫互斥鎖

互斥鎖是完全互斥的,但是實際上有很多場景是讀多寫少的,當我們并發的去讀取一個資源而不涉及資源修改的時候是沒有必要加互斥鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在 Go 語言中使用

sync

包中的

RWMutex

類型。

sync.RWMutex

提供了以下5個方法。

方法名 功能
func (rw *RWMutex) Lock() 擷取寫鎖
func (rw *RWMutex) Unlock() 釋放寫鎖
func (rw *RWMutex) RLock() 擷取讀鎖
func (rw *RWMutex) RUnlock() 釋放讀鎖
func (rw *RWMutex) RLocker() Locker 傳回一個實作Locker接口的讀寫鎖

讀寫鎖分為兩種:讀鎖和寫鎖。當一個 goroutine 擷取到讀鎖之後,其他的 goroutine 如果是擷取讀鎖會繼續獲得鎖,如果是擷取寫鎖就會等待;而當一個 goroutine 擷取寫鎖之後,其他的 goroutine 無論是擷取讀鎖還是寫鎖都會等待。

下面我們使用代碼構造一個讀多寫少的場景,然後分别使用互斥鎖和讀寫鎖檢視它們的性能差異。

var (
	x       int64
	wg      sync.WaitGroup
	mutex   sync.Mutex
	rwMutex sync.RWMutex
)

// writeWithLock 使用互斥鎖的寫操作
func writeWithLock() {
	mutex.Lock() // 加互斥鎖
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假設讀操作耗時10毫秒
	mutex.Unlock()                    // 解互斥鎖
	wg.Done()
}

// readWithLock 使用互斥鎖的讀操作
func readWithLock() {
	mutex.Lock()                 // 加互斥鎖
	time.Sleep(time.Millisecond) // 假設讀操作耗時1毫秒
	mutex.Unlock()               // 釋放互斥鎖
	wg.Done()
}

// writeWithLock 使用讀寫互斥鎖的寫操作
func writeWithRWLock() {
	rwMutex.Lock() // 加寫鎖
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假設讀操作耗時10毫秒
	rwMutex.Unlock()                  // 釋放寫鎖
	wg.Done()
}

// readWithRWLock 使用讀寫互斥鎖的讀操作
func readWithRWLock() {
	rwMutex.RLock()              // 加讀鎖
	time.Sleep(time.Millisecond) // 假設讀操作耗時1毫秒
	rwMutex.RUnlock()            // 釋放讀鎖
	wg.Done()
}

func do(wf, rf func(), wc, rc int) {
	start := time.Now()
	// wc個并發寫操作
	for i := 0; i < wc; i++ {
		wg.Add(1)
		go wf()
	}

	//  rc個并發讀操作
	for i := 0; i < rc; i++ {
		wg.Add(1)
		go rf()
	}

	wg.Wait()
	cost := time.Since(start)
	fmt.Printf("x:%v cost:%v\n", x, cost)

}
           

我們假設每一次讀操作都會耗時1ms,而每一次寫操作會耗時10ms,我們分别測試使用互斥鎖和讀寫互斥鎖執行10次并發寫和1000次并發讀的耗時資料。

// 使用互斥鎖,10并發寫,1000并發讀
do(writeWithLock, readWithLock, 10, 1000) // x:10 cost:1.466500951s

// 使用讀寫互斥鎖,10并發寫,1000并發讀
do(writeWithRWLock, readWithRWLock, 10, 1000) // x:10 cost:117.207592ms
           

從最終的執行結果可以看出,使用讀寫互斥鎖在讀多寫少的場景下能夠極大地提高程式的性能。不過需要注意的是如果一個程式中的讀操作和寫操作數量級差别不大,那麼讀寫互斥鎖的優勢就發揮不出來。

sync.WaitGroup

在代碼中生硬的使用

time.Sleep

肯定是不合适的,Go語言中可以使用

sync.WaitGroup

來實作并發任務的同步。

sync.WaitGroup

有以下幾個方法:

方法名 功能
func (wg * WaitGroup) Add(delta int) 計數器+delta
(wg *WaitGroup) Done() 計數器-1
(wg *WaitGroup) Wait() 阻塞直到計數器變為0

sync.WaitGroup

内部維護着一個計數器,計數器的值可以增加和減少。例如當我們啟動了 N 個并發任務時,就将計數器值增加N。每個任務完成時通過調用 Done 方法将計數器減1。通過調用 Wait 來等待并發任務執行完,當計數器值為 0 時,表示所有并發任務已經完成。

我們利用

sync.WaitGroup

将上面的代碼優化一下:

var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)
	go hello() // 啟動另外一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
	wg.Wait()
}
           

需要注意

sync.WaitGroup

是一個結構體,進行參數傳遞的時候要傳遞指針。

sync.Once

在某些場景下我們需要確定某些操作即使在高并發的場景下也隻會被執行一次,例如隻加載一次配置檔案等。

Go語言中的

sync

包中提供了一個針對隻執行一次場景的解決方案——

sync.Once

sync.Once

隻有一個

Do

方法,其簽名如下:

**注意:**如果要執行的函數

f

需要傳遞參數就需要搭配閉包來使用。

加載配置檔案示例

延遲一個開銷很大的初始化操作到真正用到它的時候再執行是一個很好的實踐。因為預先初始化一個變量(比如在init函數中完成初始化)會增加程式的啟動耗時,而且有可能實際執行過程中這個變量沒有用上,那麼這個初始化操作就不是必須要做的。我們來看一個例子:

var icons map[string]image.Image

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 被多個goroutine調用時不是并發安全的
func Icon(name string) image.Image {
	if icons == nil {
		loadIcons()
	}
	return icons[name]
}
           

多個 goroutine 并發調用Icon函數時不是并發安全的,現代的編譯器和CPU可能會在保證每個 goroutine 都滿足串行一緻的基礎上自由地重排通路記憶體的順序。loadIcons函數可能會被重排為以下結果:

func loadIcons() {
	icons = make(map[string]image.Image)
	icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}
           

在這種情況下就會出現即使判斷了

icons

不是nil也不意味着變量初始化完成了。考慮到這種情況,我們能想到的辦法就是添加互斥鎖,保證初始化

icons

的時候不會被其他的 goroutine 操作,但是這樣做又會引發性能問題。

使用

sync.Once

改造的示例代碼如下:

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是并發安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}
           

并發安全的單例模式

下面是借助

sync.Once

實作的并發安全的單例模式:

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}
           

sync.Once

其實内部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和資料的安全,而布爾值用來記錄初始化是否完成。這樣設計就能保證初始化操作的時候是并發安全的并且初始化操作也不會被執行多次。

sync.Map

Go 語言中内置的 map 不是并發安全的,請看下面這段示例代碼。

package main

import (
	"fmt"
	"strconv"
	"sync"
)

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}
           

将上面的代碼編譯後執行,會報出

fatal error: concurrent map writes

錯誤。我們不能在多個 goroutine 中并發對内置的 map 進行讀寫操作,否則會存在資料競争問題。

像這種場景下就需要為 map 加鎖來保證并發的安全性了,Go語言的

sync

包中提供了一個開箱即用的并發安全版 map——

sync.Map

。開箱即用表示其不用像内置的 map 一樣使用 make 函數初始化就能直接使用。同時

sync.Map

内置了諸如

Store

Load

LoadOrStore

Delete

Range

等操作方法。

方法名 功能
func (m *Map) Store(key, value interface{}) 存儲key-value資料
func (m *Map) Load(key interface{}) (value interface{}, ok bool) 查詢key對應的value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) 查詢或存儲key對應的value
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) 查詢并删除key
func (m *Map) Delete(key interface{}) 删除key
func (m *Map) Range(f func(key, value interface{}) bool) 對map中的每個key-value依次調用f

下面的代碼示例示範了并發讀寫

sync.Map

package main

import (
	"fmt"
	"strconv"
	"sync"
)

// 并發安全的map
var m = sync.Map{}

func main() {
	wg := sync.WaitGroup{}
	// 對m執行20個并發的讀寫操作
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)         // 存儲key-value
			value, _ := m.Load(key) // 根據key取值
			fmt.Printf("k=:%v,v:=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}
           

原子操作

針對整數資料類型(int32、uint32、int64、uint64)我們還可以使用原子操作來保證并發安全,通常直接使用原子操作比使用鎖操作效率更高。Go語言中原子操作由内置的标準庫

sync/atomic

提供。

atomic包

方法 解釋
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) 讀取操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) 寫入操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) 修改操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) 交換操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) 比較并交換操作

示例

我們填寫一個示例來比較下互斥鎖和原子操作的性能。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Counter interface {
	Inc()
	Load() int64
}

// 普通版
type CommonCounter struct {
	counter int64
}

func (c CommonCounter) Inc() {
	c.counter++
}

func (c CommonCounter) Load() int64 {
	return c.counter
}

// 互斥鎖版
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// 原子操作版
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main() {
	c1 := CommonCounter{} // 非并發安全
	test(c1)
	c2 := MutexCounter{} // 使用互斥鎖實作并發安全
	test(&c2)
	c3 := AtomicCounter{} // 并發安全且比互斥鎖效率更高
	test(&c3)
}
           

atomic

包提供了底層的原子級記憶體操作,對于同步算法的實作很有用。這些函數必須謹慎地保證正确使用。除了某些特殊的底層應用,使用通道或者 sync 包的函數/類型實作同步更好。

練習題

  1. 使用 goroutine 和 channel 實作一個計算int64随機數各位數和的程式,例如生成随機數61345,計算其每個位數上的數字之和為19。
    1. 開啟一個 goroutine 循環生成int64類型的随機數,發送到

      jobChan

    2. 開啟24個 goroutine 從

      jobChan

      中取出随機數計算各位數的和,将結果發送到

      resultChan

    3. 主 goroutine 從

      resultChan

      取出結果并列印到終端輸出

繼續閱讀