天天看點

Go語言系列之并發程式設計

Go語言中的并發程式設計

并發與并行

并發:同一時間段内執行多個任務(宏觀上并行,微觀上并發)。

并行:同一時刻執行多個任務(宏觀和微觀都是并行)。

Go語言的并發通過

goroutine

實作。

goroutine

類似于線程,屬于使用者态的線程,我們可以根據需要建立成千上萬個

goroutine

并發工作。

goroutine

是由Go語言的運作時(runtime)排程完成,而線程是由作業系統排程完成。

Go語言還提供

channel

在多個

goroutine

間進行通信。

goroutine

channel

是 Go 語言秉承的 CSP(Communicating Sequential Process)并發模式的重要實作基礎。

goroutine

在java/c++中我們要實作并發程式設計的時候,我們通常需要自己維護一個線程池,并且需要自己去包裝一個又一個的任務,同時需要自己去排程線程執行任務并維護上下文切換,這一切通常會耗費程式員大量的心智。那麼能不能有一種機制,程式員隻需要定義很多個任務,讓系統去幫助我們把這些任務配置設定到CPU上實作并發執行呢?

Go語言中的

goroutine

就是這樣一種機制,

goroutine

的概念類似于線程,但 

goroutine

是由Go的運作時(runtime)排程和管理的。Go程式會智能地将 goroutine 中的任務合理地配置設定給每個CPU。Go語言之是以被稱為現代化的程式設計語言,就是因為它在語言層面已經内置了排程和上下文切換的機制。

在Go語言程式設計中你不需要去自己寫程序、線程、協程,你的技能包裡隻有一個技能–

goroutine

,當你需要讓某個任務并發執行的時候,你隻需要把這個任務包裝成一個函數,開啟一個

goroutine

去執行這個函數就可以了,就是這麼簡單粗暴。

使用goroutine

Go語言中使用

goroutine

非常簡單,隻需要在調用函數的時候在前面加上

go

關鍵字,就可以為一個函數建立一個

goroutine

一個

goroutine

必定對應一個函數,可以建立多個

goroutine

去執行相同的函數。

啟動單個goroutine

啟動goroutine的方式非常簡單,隻需要在調用的函數(普通函數和匿名函數)前面加上一個

go

關鍵字。

舉個例子如下:

func hello() {
	fmt.Println("Hello Goroutine!")
}
func main() {
	hello()
	fmt.Println("main goroutine done!")
}      

這個示例中hello函數和下面的語句是串行的,執行的結果是列印完

Hello Goroutine!

後列印

main goroutine done!

接下來我們在調用hello函數前面加上關鍵字

go

,也就是啟動一個goroutine去執行hello這個函數。

func main() {
	go hello() // 啟動另外一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
}      

這一次的執行結果隻列印了

main goroutine done!

,并沒有列印

Hello Goroutine!

。為什麼呢?

在程式啟動時,Go程式就會為

main()

函數建立一個預設的

goroutine

當main()函數傳回的時候該

goroutine

就結束了,所有在

main()

函數中啟動的

goroutine

會一同結束,

main

函數所在的

goroutine

就像是權利的遊戲中的夜王,其他的

goroutine

都是異鬼,夜王一死它轉化的那些異鬼也就全部GG了。

是以我們要想辦法讓main函數等一等hello函數,最簡單粗暴的方式就是

time.Sleep

了。

func main() {
	go hello() // 啟動另外一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
	time.Sleep(time.Second)
}      

執行上面的代碼你會發現,這一次先列印

main goroutine done!

,然後緊接着列印

Hello Goroutine!

首先為什麼會先列印

main goroutine done!

是因為我們在建立新的goroutine的時候需要花費一些時間,而此時main函數所在的

goroutine

是繼續執行的。

啟動多個goroutine

在Go語言中實作并發就是這樣簡單,我們還可以啟動多個

goroutine

。讓我們再來一個例子: (這裡使用了

sync.WaitGroup

來實作goroutine的同步)

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // goroutine結束就登記-1
	fmt.Println("Hello Goroutine!", i)
}
func main() {

	for i := 0; i < 10; i++ {
		wg.Add(1) // 啟動一個goroutine就登記+1
		go hello(i)
	}
	wg.Wait() // 等待所有登記的goroutine都結束
}      

多次執行上面的代碼,會發現每次列印的數字的順序都不一緻。這是因為10個

goroutine

是并發執行的,而

goroutine

的排程是随機的。

goroutine與線程

可增長的棧

OS線程(作業系統線程)一般都有固定的棧記憶體(通常為2MB),一個

goroutine

的棧在其生命周期開始時隻有很小的棧(典型情況下2KB),

goroutine

的棧不是固定的,他可以按需增大和縮小,

goroutine

的棧大小限制可以達到1GB,雖然極少會用到這麼大。是以在Go語言中一次建立十萬左右的

goroutine

也是可以的。

goroutine排程

GPM

是Go語言運作時(runtime)層面的實作,是go語言自己實作的一套排程系統。差別于作業系統排程OS線程。

  • G

    很好了解,就是個goroutine的,裡面除了存放本goroutine資訊外 還有與所在P的綁定等資訊。
  • P

    管理着一組goroutine隊列,P裡面會存儲目前goroutine運作的上下文環境(函數指針,堆棧位址及位址邊界),P會對自己管理的goroutine隊列做一些排程(比如把占用CPU時間較長的goroutine暫停、運作後續的goroutine等等)當自己的隊列消費完了就去全局隊列裡取,如果全局隊列裡也消費完了會去其他P的隊列裡搶任務。
  • M(machine)

    是Go運作時(runtime)對作業系統核心線程的虛拟, M與核心線程一般是一一映射的關系, 一個groutine最終是要放到M上執行的;

P與M一般也是一一對應的。他們關系是: P管理着一組G挂載在M上運作。當一個G長久阻塞在一個M上時,runtime會建立一個M,阻塞G所在的P會把其他的G 挂載在建立的M上。當舊的G阻塞完成或者認為其已經死掉時 回收舊的M。

P的個數是通過

runtime.GOMAXPROCS

設定(最大256),Go1.5版本之後預設為實體線程數。 在并發量大的時候會增加一些P和M,但不會太多,切換太頻繁的話得不償失。

單從線程排程講,Go語言相比起其他語言的優勢在于OS線程是由OS核心來排程的,

goroutine

則是由Go運作時(runtime)自己的排程器排程的,這個排程器使用一個稱為m:n排程的技術(複用/排程m個goroutine到n個OS線程)。 其一大特點是goroutine的排程是在使用者态下完成的, 不涉及核心态與使用者态之間的頻繁切換,包括記憶體的配置設定與釋放,都是在使用者态維護着一塊大的記憶體池, 不直接調用系統的malloc函數(除非記憶體池需要改變),成本比排程OS線程低很多。 另一方面充分利用了多核的硬體資源,近似的把若幹goroutine均分在實體線程上, 再加上本身goroutine的超輕量,以上種種保證了go排程方面的性能。

點我了解更多

GOMAXPROCS

Go運作時的排程器使用

GOMAXPROCS

參數來确定需要使用多少個OS線程來同時執行Go代碼。預設值是機器上的CPU核心數。例如在一個8核心的機器上,排程器會把Go代碼同時排程到8個OS線程上(GOMAXPROCS是m:n排程中的n)。

Go語言中可以通過

runtime.GOMAXPROCS()

函數設定目前程式并發時占用的CPU邏輯核心數。

Go1.5版本之前,預設使用的是單核心執行。Go1.5版本之後,預設使用全部的CPU邏輯核心數。

我們可以通過将任務配置設定到不同的CPU邏輯核心上實作并行的效果,這裡舉個例子:

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(1)
	go a()
	go b()
	time.Sleep(time.Second)
}      

兩個任務隻有一個邏輯核心,此時是做完一個任務再做另一個任務。 将邏輯核心數設為2,此時兩個任務并行執行,代碼如下。

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(2)
	go a()
	go b()
	time.Sleep(time.Second)
}      

Go語言中的作業系統線程和goroutine的關系:

  1. 一個作業系統線程對應使用者态多個goroutine。
  2. go程式可以同時使用多個作業系統線程。
  3. goroutine和OS線程是多對多的關系,即m:n。

channel

單純地将函數并發執行是沒有意義的。函數與函數間需要交換資料才能展現并發執行函數的意義。

雖然可以使用共享記憶體進行資料交換,但是共享記憶體在不同的

goroutine

中容易發生競态問題。為了保證資料交換的正确性,必須使用互斥量對記憶體進行加鎖,這種做法勢必造成性能問題。

Go語言的并發模型是

CSP(Communicating Sequential Processes)

,提倡通過通信共享記憶體而不是通過共享記憶體而實作通信。

如果說

goroutine

是Go程式并發的執行體,

channel

就是它們之間的連接配接。

channel

是可以讓一個

goroutine

發送特定值到另一個

goroutine

的通信機制。

Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發資料的順序。每一個通道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。

channel類型

channel

是一種類型,一種引用類型。聲明通道類型的格式如下:

var 變量 chan 元素類型
           

舉幾個例子:

var ch1 chan int   // 聲明一個傳遞整型的通道
var ch2 chan bool  // 聲明一個傳遞布爾型的通道
var ch3 chan []int // 聲明一個傳遞int切片的通道
           

建立channel

通道是引用類型,通道類型的空值是

nil

var ch chan int
fmt.Println(ch) // <nil>
           

聲明的通道後需要使用

make

函數初始化之後才能使用。

建立channel的格式如下:

make(chan 元素類型, [緩沖大小])
           

channel的緩沖大小是可選的。

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)      

channel操作

通道有發送(send)、接收(receive)和關閉(close)三種操作。

發送和接收都使用

<-

符号。

現在我們先使用以下語句定義一個通道:

ch := make(chan int)
           

發送

将一個值發送到通道中。

ch <- 10 // 把10發送到ch中
           

接收

從一個通道中接收值。

x := <- ch // 從ch中接收值并指派給變量x
<-ch       // 從ch中接收值,忽略結果
           

關閉

我們通過調用内置的

close

函數來關閉通道。

close(ch)      

關于關閉通道需要注意的事情是,隻有在通知接收方goroutine所有的資料都發送完畢的時候才需要關閉通道。通道是可以被垃圾回收機制回收的,它和關閉檔案是不一樣的,在結束操作之後關閉檔案是必須要做的,但關閉通道不是必須的。

關閉後的通道有以下特點:

  1. 對一個關閉的通道再發送值就會導緻panic。
  2. 對一個關閉的通道進行接收會一直擷取值直到通道為空。
  3. 對一個關閉的并且沒有值的通道執行接收操作會得到對應類型的零值。
  4. 關閉一個已經關閉的通道會導緻panic。

無緩沖的通道

無緩沖的通道又稱為阻塞的通道。我們來看一下下面的代碼:

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("發送成功")
}      

上面這段代碼能夠通過編譯,但是執行的時候會出現以下錯誤:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54      

為什麼會出現

deadlock

錯誤呢?

因為我們使用

ch := make(chan int)

建立的是無緩沖的通道,無緩沖的通道隻有在有人接收值的時候才能發送值。就像你住的小區沒有快遞櫃和代收點,快遞員給你打電話必須要把這個物品送到你的手中,簡單來說就是無緩沖的通道必須有接收才能發送。

上面的代碼會阻塞在

ch <- 10

這一行代碼形成死鎖,那如何解決這個問題呢?

一種方法是啟用一個

goroutine

去接收值,例如:

func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}
func main() {
	ch := make(chan int)
	go recv(ch) // 啟用goroutine從通道接收值
	ch <- 10
	fmt.Println("發送成功")
}      

無緩沖通道上的發送操作會阻塞,直到另一個

goroutine

在該通道上執行接收操作,這時值才能發送成功,兩個

goroutine

将繼續執行。相反,如果接收操作先執行,接收方的goroutine将阻塞,直到另一個

goroutine

在該通道上發送一個值。

使用無緩沖通道進行通信将導緻發送和接收的

goroutine

同步化。是以,無緩沖通道也被稱為

同步通道

有緩沖的通道

解決上面問題的方法還有一種就是使用有緩沖區的通道。我們可以在使用make函數初始化通道的時候為其指定通道的容量,例如:

func main() {
	ch := make(chan int, 1) // 建立一個容量為1的有緩沖區通道
	ch <- 10
	fmt.Println("發送成功")
}      

隻要通道的容量大于零,那麼該通道就是有緩沖的通道,通道的容量表示通道中能存放元素的數量。就像你小區的快遞櫃隻有那麼個多格子,格子滿了就裝不下了,就阻塞了,等到别人取走一個快遞員就能往裡面放一個。

我們可以使用内置的

len

函數擷取通道内元素的數量,使用

cap

函數擷取通道的容量,雖然我們很少會這麼做。

for range從通道循環取值

當向通道中發送完資料時,我們可以通過

close

當通道被關閉時,再往該通道發送值會引發

panic

,從該通道取值的操作會先取完通道中的值,再然後取到的值一直都是對應類型的零值。那如何判斷一個通道是否被關閉了呢?

我們來看下面這個例子:

// channel 練習
func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	// 開啟goroutine将0~100的數發送到ch1中
	go func() {
		for i := 0; i < 100; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	// 開啟goroutine從ch1中接收值,并将該值的平方發送到ch2中
	go func() {
		for {
			i, ok := <-ch1 // 通道關閉後再取值ok=false
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()
	// 在主goroutine中從ch2中接收值列印
	for i := range ch2 { // 通道關閉後會退出for range循環
		fmt.Println(i)
	}
}      

從上面的例子中我們看到有兩種方式在接收值的時候判斷該通道是否被關閉,不過我們通常使用的是

for range

的方式。使用

for range

周遊通道,當通道被關閉的時候就會退出

for range

單向通道

有的時候我們會将通道作為參數在多個任務函數間傳遞,很多時候我們在不同的任務函數中使用通道都會對其進行限制,比如限制通道在函數中隻能發送或隻能接收。

Go語言中提供了單向通道來處理這種情況。例如,我們把上面的例子改造如下:

func counter(out chan<- int) {
	for i := 0; i < 100; i++ {
		out <- i
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for i := range in {
		out <- i * i
	}
	close(out)
}
func printer(in <-chan int) {
	for i := range in {
		fmt.Println(i)
	}
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go counter(ch1)
	go squarer(ch2, ch1)
	printer(ch2)
}      

其中,

  • chan<- int

    是一個隻寫單向通道(隻能對其寫入int類型值),可以對其執行發送操作但是不能執行接收操作;
  • <-chan int

    是一個隻讀單向通道(隻能從其讀取int類型值),可以對其執行接收操作但是不能執行發送操作。

在函數傳參及任何指派操作中可以将雙向通道轉換為單向通道,但反過來是不可以的。

通道總結

channel

常見的異常總結,如下圖:

Go語言系列之并發程式設計

關閉已經關閉的

channel

也會引發

panic

worker pool(goroutine池)

在工作中我們通常會使用可以指定啟動的goroutine數量–

worker pool

模式,控制

goroutine

的數量,防止

goroutine

洩漏和暴漲。

一個簡易的

work pool

示例代碼如下:

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
	}
}


func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// 開啟3個goroutine
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// 5個任務
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	// 輸出結果
	for a := 1; a <= 5; a++ {
		<-results
	}
}      

select多路複用

在某些場景下我們需要同時從多個通道接收資料。通道在接收資料時,如果沒有資料可以接收将會發生阻塞。你也許會寫出如下代碼使用周遊的方式來實作:

for{
    // 嘗試從ch1接收值
    data, ok := <-ch1
    // 嘗試從ch2接收值
    data, ok := <-ch2
    …
}      

這種方式雖然可以實作從多個通道接收值的需求,但是運作性能會差很多。為了應對這種場景,Go内置了

select

關鍵字,可以同時響應多個通道的操作。

select

的使用類似于switch語句,它有一系列case分支和一個預設的分支。每個case會對應一個通道的通信(接收或發送)過程。

select

會一直等待,直到某個

case

的通信操作完成時,就會執行

case

分支對應的語句。具體格式如下:

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        預設操作
}      

舉個小例子來示範下

select

的使用:

func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}      

使用

select

語句能提高代碼的可讀性。

  • 可處理一個或多個channel的發送/接收操作。
  • 如果多個

    case

    同時滿足,

    select

    會随機選擇一個。
  • 對于沒有

    case

    select{}

    會一直等待,可用于阻塞main函數。

并發安全和鎖

有時候在Go代碼中可能會存在多個

goroutine

同時操作一個資源(臨界區),這種情況會發生

競态問題

(資料競态)。類比現實生活中的例子有十字路口被各個方向的的汽車競争;還有火車上的衛生間被車廂裡的人競争。

舉個例子:

var x int64
var wg sync.WaitGroup

func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}      

上面的代碼中我們開啟了兩個

goroutine

去累加變量x的值,這兩個

goroutine

在通路和修改

x

變量的時候就會存在資料競争,導緻最後的結果與期待的不符。

互斥鎖

互斥鎖是一種常用的控制共享資源通路的方法,它能夠保證同時隻有一個

goroutine

可以通路共享資源。Go語言中使用

sync

包的

Mutex

類型來實作互斥鎖。 使用互斥鎖來修複上面代碼的問題:

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
	for i := 0; i < 5000; i++ {
		lock.Lock() // 加鎖
		x = x + 1
		lock.Unlock() // 解鎖
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}      

使用互斥鎖能夠保證同一時間有且隻有一個

goroutine

進入臨界區,其他的

goroutine

則在等待鎖;當互斥鎖釋放後,等待的

goroutine

才可以擷取鎖進入臨界區,多個

goroutine

同時等待一個鎖時,喚醒的政策是随機的。

讀寫互斥鎖

互斥鎖是完全互斥的,但是有很多實際的場景下是讀多寫少的,當我們并發的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用

sync

包中的

RWMutex

類型。

讀寫鎖分為兩種:讀鎖和寫鎖。當一個goroutine擷取讀鎖之後,其他的

goroutine

如果是擷取讀鎖會繼續獲得鎖,如果是擷取寫鎖就會等待;當一個

goroutine

擷取寫鎖之後,其他的

goroutine

無論是擷取讀鎖還是寫鎖都會等待。

讀寫鎖示例:

var (
	x      int64
	wg     sync.WaitGroup
	lock   sync.Mutex
	rwlock sync.RWMutex
)

func write() {
	// lock.Lock()   // 加互斥鎖
	rwlock.Lock() // 加寫鎖
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假設讀操作耗時10毫秒
	rwlock.Unlock()                   // 解寫鎖
	// lock.Unlock()                     // 解互斥鎖
	wg.Done()
}

func read() {
	// lock.Lock()                  // 加互斥鎖
	rwlock.RLock()               // 加讀鎖
	time.Sleep(time.Millisecond) // 假設讀操作耗時1毫秒
	rwlock.RUnlock()             // 解讀鎖
	// lock.Unlock()                // 解互斥鎖
	wg.Done()
}

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go read()
	}

	wg.Wait()
	end := time.Now()
	fmt.Println(end.Sub(start))
}      

需要注意的是讀寫鎖非常适合讀多寫少的場景,如果讀和寫的操作差别不大,讀寫鎖的優勢就發揮不出來。

sync.WaitGroup

在代碼中生硬的使用

time.Sleep

肯定是不合适的,Go語言中可以使用

sync.WaitGroup

來實作并發任務的同步。 

sync.WaitGroup

有以下幾個方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 計數器+delta
(wg *WaitGroup) Done() 計數器-1
(wg *WaitGroup) Wait() 阻塞直到計數器變為0

sync.WaitGroup

内部維護着一個計數器,計數器的值可以增加和減少。例如當我們啟動了N 個并發任務時,就将計數器值增加N。每個任務完成時通過調用Done()方法将計數器減1。通過調用Wait()來等待并發任務執行完,當計數器值為0時,表示所有并發任務已經完成。

我們利用

sync.WaitGroup

将上面的代碼優化一下:

var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)
	go hello() // 啟動另外一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
	wg.Wait()
}      

需要注意

sync.WaitGroup

是一個結構體,傳遞的時候要傳遞指針。

sync.Once

說在前面的話:這是一個進階知識點。

在程式設計的很多場景下我們需要確定某些操作在高并發的場景下隻執行一次,例如隻加載一次配置檔案、隻關閉一次通道等。

sync

包中提供了一個針對隻執行一次場景的解決方案–

sync.Once

sync.Once

隻有一個

Do

方法,其簽名如下:

func (o *Once) Do(f func()) {}      

備注:如果要執行的函數

f

需要傳遞參數就需要搭配閉包來使用。

加載配置檔案示例

延遲一個開銷很大的初始化操作到真正用到它的時候再執行是一個很好的實踐。因為預先初始化一個變量(比如在init函數中完成初始化)會增加程式的啟動耗時,而且有可能實際執行過程中這個變量沒有用上,那麼這個初始化操作就不是必須要做的。我們來看一個例子:

var icons map[string]image.Image

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 被多個goroutine調用時不是并發安全的
func Icon(name string) image.Image {
	if icons == nil {
		loadIcons()
	}
	return icons[name]
}      

多個

goroutine

并發調用Icon函數時不是并發安全的,現代的編譯器和CPU可能會在保證每個

goroutine

都滿足串行一緻的基礎上自由地重排通路記憶體的順序。loadIcons函數可能會被重排為以下結果:

func loadIcons() {
	icons = make(map[string]image.Image)
	icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}      

在這種情況下就會出現即使判斷了

icons

不是nil也不意味着變量初始化完成了。考慮到這種情況,我們能想到的辦法就是添加互斥鎖,保證初始化

icons

的時候不會被其他的

goroutine

操作,但是這樣做又會引發性能問題。

sync.Once

改造的示例代碼如下:

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是并發安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}      

并發安全的單例模式

下面是借助

sync.Once

實作的并發安全的單例模式:

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}      

sync.Once

其實内部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和資料的安全,而布爾值用來記錄初始化是否完成。這樣設計就能保證初始化操作的時候是并發安全的并且初始化操作也不會被執行多次。

sync.Map

Go語言中内置的map不是并發安全的。請看下面的示例:

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}      

上面的代碼開啟少量幾個

goroutine

的時候可能沒什麼問題,當并發多了之後執行上面的代碼就會報

fatal error: concurrent map writes

錯誤。

像這種場景下就需要為map加鎖來保證并發的安全性了,Go語言的

sync

包中提供了一個開箱即用的并發安全版map–

sync.Map

。開箱即用表示不用像内置的map一樣使用make函數初始化就能直接使用。同時

sync.Map

内置了諸如

Store

Load

LoadOrStore

Delete

Range

等操作方法。

var m = sync.Map{}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)
			value, _ := m.Load(key)
			fmt.Printf("k=:%v,v:=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}      

原子操作

atomic包

方法 解釋

func LoadInt32(addr *int32) (val int32)

func LoadInt64(addr *int64) (val int64)

func LoadUint32(addr *uint32) (val uint32)

func LoadUint64(addr *uint64) (val uint64)

func LoadUintptr(addr *uintptr) (val uintptr)

func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

讀取操作

func StoreInt32(addr *int32, val int32)

func StoreInt64(addr *int64, val int64)

func StoreUint32(addr *uint32, val uint32)

func StoreUint64(addr *uint64, val uint64)

func StoreUintptr(addr *uintptr, val uintptr)

func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

寫入操作

func AddInt32(addr *int32, delta int32) (new int32)

func AddInt64(addr *int64, delta int64) (new int64)

func AddUint32(addr *uint32, delta uint32) (new uint32)

func AddUint64(addr *uint64, delta uint64) (new uint64)

func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

修改操作

func SwapInt32(addr *int32, new int32) (old int32)

func SwapInt64(addr *int64, new int64) (old int64)

func SwapUint32(addr *uint32, new uint32) (old uint32)

func SwapUint64(addr *uint64, new uint64) (old uint64)

func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

交換操作

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

比較并交換操作

示例

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Counter interface {
	Inc()
	Load() int64
}

// 普通版
type CommonCounter struct {
	counter int64
}

func (c CommonCounter) Inc() {
	c.counter++
}

func (c CommonCounter) Load() int64 {
	return c.counter
}

// 互斥鎖版
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// 原子操作版
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main() {
	c1 := CommonCounter{} // 非并發安全
	test(c1)
	c2 := MutexCounter{} // 使用互斥鎖實作并發安全
	test(&c2)
	c3 := AtomicCounter{} // 并發安全且比互斥鎖效率更高
	test(&c3)
}      
Go語言系列之并發程式設計

作者:張亞飛