并發(并行),一直以來都是一個程式設計語言裡的核心主題之一,也是被開發者關注最多的話題;Go語言作為一個出道以來就自帶 『高并發』光環的富二代程式設計語言,它的并發(并行)程式設計肯定是值得開發者去探究的,而Go語言中的并發(并行)程式設計是經由goroutine實作的,goroutine是golang最重要的特性之一,具有使用成本低、消耗資源低、能效高等特點,官方宣稱原生goroutine并發成千上萬不成問題,于是它也成為Gopher們經常使用的特性。
一、goroutine簡介
Golang被極度贊揚的是它的異步機制,也就是goroutine。goroutine使用方式非常的簡單,隻需使用go關鍵字即可啟動一個協程, 并且它是處于異步方式運作,你不需要等它運作完成以後再執行以後的代碼。
go func()//通過go關鍵字啟動一個協程來運作函數
除去文法上的簡潔,goroutine是一個協程,也就是比線程更節省資源,一個線程中可以有多個協程,而且goroutine被配置設定到多個CPU上運作,是真正意義上的并發。
go func()//通過go關鍵字啟動一個協程來運作函數
二、goroutine内部原理
在介紹goroutine原理之前,先對一些關鍵概念進行介紹:
關鍵概念
并發
一個cpu上能同時執行多項任務,在很短時間内,cpu來回切換任務執行(在某段很短時間内執行程式a,然後又迅速得切換到程式b去執行),有時間上的重疊(宏觀上是同時的,微觀仍是順序執行),這樣看起來多個任務像是同時執行,這就是并發。
并行
當系統有多個CPU時,每個CPU同一時刻都運作任務,互不搶占自己所在的CPU資源,同時進行,稱為并行。

簡單了解
你吃飯吃到一半,電話來了,你一直到吃完了以後才去接,這就說明你不支援并發也不支援并行。
你吃飯吃到一半,電話來了,你停了下來接了電話,接完後繼續吃飯,這說明你支援并發。
你吃飯吃到一半,電話來了,你一邊打電話一邊吃飯,這說明你支援并行。
并發的關鍵是你有處理多個任務的能力,不一定要同時。
并行的關鍵是你有同時處理多個任務的能力。
在計算機中就是:
是以我認為它們最關鍵的點就是:是否是『同時』。![]()
GO的并發之道-Goroutine排程原理&Channel詳解
程序
cpu在切換程式的時候,如果不儲存上一個程式的狀态(也就是我們常說的context--上下文),直接切換下一個程式,就會丢失上一個程式的一系列狀态,于是引入了程序這個概念,用以劃分好程式運作時所需要的資源。
是以程序就是一個程式運作時候的所需要的基本資源機關(也可以說是程式運作的一個實體)。
線程
cpu切換多個程序的時候,會花費不少的時間,因為切換程序需要切換到核心态,而每次排程需要核心态都需要讀取使用者态的資料,程序一旦多起來,cpu排程會消耗一大堆資源,是以引入了線程的概念,線程本身幾乎不占有資源,他們共享程序裡的資源,核心排程起來不會那麼像程序切換那麼耗費資源。
線程是程序的一個執行實體,是CPU排程和分派的基本機關,它是比程序更小的能獨立運作的基本機關。
NOTE:線程包括三大類,而且goroutine也并非真正地協程。(請檢視:《線程那些事兒》)
有時候為了友善了解可以簡單把goroutine類比成協程,但心裡一定要有個清晰的認知 — goroutine并不等同于協程。
協程
協程擁有自己的寄存器上下文和棧。協程排程切換時,将寄存器上下文和棧儲存到其他地方,在切回來的時候,恢複先前儲存的寄存器上下文和棧。是以,協程能保留上一次調用時的狀态(即所有局部狀态的一個特定組合),每次過程重入時,就相當于進入上一次調用的狀态,換種說法:進入上一次離開時所處邏輯流的位置。線程和程序的操作是由程式觸發系統接口,最後的執行者是系統;協程的操作執行者則是使用者自身程式,goroutine也是協程。
G-P-M排程模型簡介
groutine能擁有強大的并發實作是通過GPM排程模型實作,下面就來解釋下goroutine的排程模型。
Go的排程器内部的三個重要的結構:M,P,G
M:M代表核心級線程,一個M就是一個線程,goroutine就是跑在M之上的;M是一個很大的結構,裡面維護小對象記憶體cache(mcache)、目前執行的goroutine、随機數發生器等等非常多的資訊
G:代表一個goroutine,它有自己的棧,instruction pointer和其他資訊(正在等待的channel等等),用于排程。
P:P全稱是Processor,處理器,它的主要用途就是用來執行goroutine的,是以它也維護了一個goroutine隊列,裡面存儲了所有需要它來執行的goroutine
NOTE:G-P-M模型詳解,請檢視該篇博文。
排程實作
從上圖中看,有2個實體線程M,每一個M都擁有一個處理器P,每一個也都有一個正在運作的goroutine。
P的數量可以通過GOMAXPROCS()來設定,它其實也就代表了真正的并發度,即有多少個goroutine可以同時運作。
圖中灰色的那些goroutine并沒有運作,而是出于ready的就緒态,正在等待被排程。P維護着這個隊列(稱之為runqueue),
Go語言裡,啟動一個goroutine很容易:go function 就行,是以每有一個go語句被執行,runqueue隊列就在其末尾加入一個
goroutine,在下一個排程點,就從runqueue中取出(如何決定取哪個goroutine?)一個goroutine執行。
當一個OS線程M0陷入阻塞時(如下圖),P轉而在運作M1,圖中的M1可能是正被建立,或者從線程緩存中取出。
當MO傳回時,它必須嘗試取得一個P來運作goroutine,一般情況下,它會從其他的OS線程那裡拿一個P過來,
如果沒有拿到的話,它就把goroutine放在一個global runqueue裡,然後自己睡眠(放入線程緩存裡)。所有的P也會周期性的檢查global runqueue并運作其中的goroutine,否則global runqueue上的goroutine永遠無法執行。
另一種情況是P所配置設定的任務G很快就執行完了(配置設定不均),這就導緻了這個處理器P很忙,但是其他的P還有任務,此時如果global runqueue沒有任務G了,那麼P不得不從其他的P裡拿一些G來執行。一般來說,如果P從其他的P那裡要拿任務的話,一般就拿run queue的一半,這就確定了每個OS線程都能充分的使用,如下圖:
參考位址:http://morsmachine.dk/go-scheduler
三、使用goroutine
基本使用
設定goroutine運作的CPU數量,最新版本的go已經預設已經設定了。
num := runtime.NumCPU() //擷取主機的邏輯CPU個數
runtime.GOMAXPROCS(num) //設定可同時執行的最大CPU數
使用示例
package main
import (
"fmt"
"time"
)
func cal(a int , b int ) {
c := a+b
fmt.Printf("%d + %d = %d\n",a,b,c)
}
func main() {
for i :=0 ; i<10 ;i++{
go cal(i,i+1) //啟動10個goroutine 來計算
}
time.Sleep(time.Second * 2) // sleep作用是為了等待所有任務完成
}
//結果
//8 + 9 = 17
//9 + 10 = 19
//4 + 5 = 9
//5 + 6 = 11
//0 + 1 = 1
//1 + 2 = 3
//2 + 3 = 5
//3 + 4 = 7
//7 + 8 = 15
//6 + 7 = 13
View Code
goroutine異常捕捉
當啟動多個goroutine時,如果其中一個goroutine異常了,并且我們并沒有對進行異常處理,那麼整個程式都會終止,是以我們在編寫程式時候最好每個goroutine所運作的函數都做異常處理,異常處理采用recover
package main
import (
"fmt"
"time"
)
func addele(a []int ,i int) {
defer func() { //匿名函數捕獲錯誤
err := recover()
if err != nil {
fmt.Println("add ele fail")
}
}()
a[i]=i
fmt.Println(a)
}
func main() {
Arry := make([]int,4)
for i :=0 ; i<10 ;i++{
go addele(Arry,i)
}
time.Sleep(time.Second * 2)
}
//結果
add ele fail
[0 0 0 0]
[0 1 0 0]
[0 1 2 0]
[0 1 2 3]
add ele fail
add ele fail
add ele fail
add ele fail
add ele fail
同步的goroutine
由于goroutine是異步執行的,那很有可能出現主程式退出時還有goroutine沒有執行完,此時goroutine也會跟着退出。此時如果想等到所有goroutine任務執行完畢才退出,go提供了sync包和channel來解決同步問題,當然如果你能預測每個goroutine執行的時間,你還可以通過time.Sleep方式等待所有的groutine執行完成以後在退出程式(如上面的列子)。
示例一:使用sync包同步goroutine
sync大緻實作方式
WaitGroup 等待一組goroutinue執行完畢. 主程式調用 Add 添加等待的goroutinue數量. 每個goroutinue在執行結束時調用 Done ,此時等待隊列數量減1.,主程式通過Wait阻塞,直到等待隊列為0.
package main
import (
"fmt"
"sync"
)
func cal(a int , b int ,n *sync.WaitGroup) {
c := a+b
fmt.Printf("%d + %d = %d\n",a,b,c)
defer n.Done() //goroutinue完成後, WaitGroup的計數-1
}
func main() {
var go_sync sync.WaitGroup //聲明一個WaitGroup變量
for i :=0 ; i<10 ;i++{
go_sync.Add(1) // WaitGroup的計數加1
go cal(i,i+1,&go_sync)
}
go_sync.Wait() //等待所有goroutine執行完畢
}
//結果
9 + 10 = 19
2 + 3 = 5
3 + 4 = 7
4 + 5 = 9
5 + 6 = 11
1 + 2 = 3
6 + 7 = 13
7 + 8 = 15
0 + 1 = 1
8 + 9 = 17
示例二:通過channel實作goroutine之間的同步。
實作方式:通過channel能在多個groutine之間通訊,當一個goroutine完成時候向channel發送退出信号,等所有goroutine退出時候,利用for循環channe去channel中的信号,若取不到資料會阻塞原理,等待所有goroutine執行完畢,使用該方法有個前提是你已經知道了你啟動了多少個goroutine。
package main
import (
"fmt"
"time"
)
func cal(a int , b int ,Exitchan chan bool) {
c := a+b
fmt.Printf("%d + %d = %d\n",a,b,c)
time.Sleep(time.Second*2)
Exitchan <- true
}
func main() {
Exitchan := make(chan bool,10) //聲明并配置設定管道記憶體
for i :=0 ; i<10 ;i++{
go cal(i,i+1,Exitchan)
}
for j :=0; j<10; j++{
<- Exitchan //取信号資料,如果取不到則會阻塞
}
close(Exitchan) // 關閉管道
}
goroutine之間的通訊
goroutine本質上是協程,可以了解為不受核心排程,而受go排程器管理的線程。goroutine之間可以通過channel進行通信或者說是資料共享,當然你也可以使用全局變量來進行資料共享。
示例:使用channel模拟消費者和生産者模式
package main
import (
"fmt"
"sync"
)
func Productor(mychan chan int,data int,wait *sync.WaitGroup) {
mychan <- data
fmt.Println("product data:",data)
wait.Done()
}
func Consumer(mychan chan int,wait *sync.WaitGroup) {
a := <- mychan
fmt.Println("consumer data:",a)
wait.Done()
}
func main() {
datachan := make(chan int, 100) //通訊資料管道
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go Productor(datachan, i,&wg) //生産資料
wg.Add(1)
}
for j := 0; j < 10; j++ {
go Consumer(datachan,&wg) //消費資料
wg.Add(1)
}
wg.Wait()
}
//結果
consumer data: 4
product data: 5
product data: 6
product data: 7
product data: 8
product data: 9
consumer data: 1
consumer data: 5
consumer data: 6
consumer data: 7
consumer data: 8
consumer data: 9
product data: 2
consumer data: 2
product data: 3
consumer data: 3
product data: 4
consumer data: 0
product data: 0
product data: 1
四、channel
不同goroutine之間是如何進行通訊的呢?
- 方法一:全局變量和鎖同步
- 方法二:Channel
這裡我們主要注重講解下go中特有的channel,其類似于UNIX中的管道(piple)。
channel概念
channel俗稱管道,用于資料傳遞或資料共享,其本質是一個先進先出的隊列,使用goroutine+channel進行資料通訊簡單高效,同時也線程安全,多個goroutine可同時修改一個channel,不需要加鎖。
channel操作
定義和聲明:
1 var 變量名 chan 類型 //channel是有類型的,一個整數的channel隻能存放整數
2
3 var test chan int
4
5 var test chan map[string]string
6
7 var test chan *stu
channel可分為三種:
隻讀channel:隻能讀channel裡面資料,不可寫入
隻寫channel:隻能寫資料,不可讀
一般channel:可讀可寫
var readOnlyChan <-chan int // 隻讀chan
var writeOnlyChan chan<- int // 隻寫chan
var mychan chan int //讀寫channel
mychannel = make(chan int,10)
//或者
read_only := make (<-chan int,10)//定義隻讀的channel
write_only := make (chan<- int,10)//定義隻寫的channel
read_write := make (chan int,10)//可同時讀寫
定義完成以後需要make來配置設定記憶體空間,不然會deadlock!
GO的并發之道-Goroutine排程原理&Channel詳解
//定義一個結構體類型的channel
package main
type student struct{
name string
}
func main() {
var stuChan chan student
stuChan = make(chan student, 10)
stu := student{name:"syu01"}
stuChan <- stu
}
struct類型channel
讀寫資料
ch <- "wd" //寫資料
a := <- ch //讀取資料
a, ok := <-ch //推薦的讀取資料方法
注意:
- 管道如果未關閉,在讀取逾時會則會引發deadlock異常
- 管道如果關閉進行寫入資料會pannic
- 當管道中沒有資料時候再行讀取或讀取到預設值,如int類型預設值是0
周遊管道
- 使用for range周遊管道,如果管道未關閉會引發deadlock錯誤。
- 如果采用for死循環已經關閉的管道,當管道沒有資料時候,讀取的資料會是管道的預設值,并且循環不會退出。
package main
import (
"fmt"
"time"
)
func main() {
mychannel := make(chan int,10)
for i := 0;i < 10;i++{
mychannel <- i
}
close(mychannel) //關閉管道
fmt.Println("data lenght: ",len(mychannel))
for v := range mychannel { //周遊管道
fmt.Println(v)
}
fmt.Printf("data lenght: %d",len(mychannel))
}
帶緩沖區channe和不帶緩沖區channel
帶緩沖區channel:定義聲明時候制定了緩沖區大小(長度),可以儲存多個資料。
不帶緩沖區channel:隻能存一個資料,并且隻有當該資料被取出時候才能存下一個資料。
ch := make(chan int) //不帶緩沖區
ch := make(chan int ,10) //帶緩沖區
不帶緩沖區示例:
package main
import "fmt"
func test(c chan int) {
for i := 0; i < 10; i++ {
fmt.Println("send ", i)
c <- i
}
}
func main() {
ch := make(chan int)
go test(ch)
for j := 0; j < 10; j++ {
fmt.Println("get ", <-ch)
}
}
//結果:
send 0
send 1
get 0
get 1
send 2
send 3
get 2
get 3
send 4
send 5
get 4
get 5
send 6
send 7
get 6
get 7
send 8
send 9
get 8
get 9
channel實作作業池
我們建立三個channel,一個channel用于接受任務,一個channel用于保持結果,還有個channel用于決定程式退出的時候。
package main
import (
"fmt"
)
func Task(taskch, resch chan int, exitch chan bool) {
defer func() { //異常處理
err := recover()
if err != nil {
fmt.Println("do task error:", err)
return
}
}()
for t := range taskch { // 處理任務
fmt.Println("do task :", t)
resch <- t //
}
exitch <- true //處理完發送退出信号
}
func main() {
taskch := make(chan int, 20) //任務管道
resch := make(chan int, 20) //結果管道
exitch := make(chan bool, 5) //退出管道
go func() {
for i := 0; i < 10; i++ {
taskch <- i
}
close(taskch)
}()
for i := 0; i < 5; i++ { //啟動5個goroutine做任務
go Task(taskch, resch, exitch)
}
go func() { //等5個goroutine結束
for i := 0; i < 5; i++ {
<-exitch
}
close(resch) //任務處理完成關閉結果管道,不然range報錯
close(exitch) //關閉退出管道
}()
for res := range resch{ //列印結果
fmt.Println("task res:",res)
}
}
隻讀channel和隻寫channel
一般定義隻讀和隻寫的管道意義不大,更多時候我們可以在參數傳遞時候指明管道可讀還是可寫,即使目前管道是可讀寫的。
package main
import (
"fmt"
"time"
)
//隻能向chan裡寫資料
func send(c chan<- int) {
for i := 0; i < 10; i++ {
c <- i
}
}
//隻能取channel中的資料
func get(c <-chan int) {
for i := range c {
fmt.Println(i)
}
}
func main() {
c := make(chan int)
go send(c)
go get(c)
time.Sleep(time.Second*1)
}
//結果
0
1
2
3
4
5
6
7
8
9
View Code
select-case實作非阻塞channel
原理通過select+case加入一組管道,當滿足(這裡說的滿足意思是有資料可讀或者可寫)select中的某個case時候,那麼該case傳回,若都不滿足case,則走default分支。
package main
import (
"fmt"
)
func send(c chan int) {
for i :=1 ; i<10 ;i++ {
c <-i
fmt.Println("send data : ",i)
}
}
func main() {
resch := make(chan int,20)
strch := make(chan string,10)
go send(resch)
strch <- "wd"
select {
case a := <-resch:
fmt.Println("get data : ", a)
case b := <-strch:
fmt.Println("get data : ", b)
default:
fmt.Println("no channel actvie")
}
}
//結果:get data : wd
channel中定時器的使用
在對channel進行讀寫的時,可以對讀寫進行頻率控制,通過time.Ticke實作
示例:
package main
import (
"time"
"fmt"
)
func main(){
requests:= make(chan int ,5)
for i:=1;i<5;i++{
requests<-i
}
close(requests)
limiter := time.Tick(time.Second*1)
for req:=range requests{
<-limiter
fmt.Println("requets",req,time.Now()) //執行到這裡,需要隔1秒才繼續往下執行,time.Tick(timer)上面已定義
}
}
//結果:
requets 1 2018-07-06 10:17:35.98056403 +0800 CST m=+1.004248763
requets 2 2018-07-06 10:17:36.978123472 +0800 CST m=+2.001798205
requets 3 2018-07-06 10:17:37.980869517 +0800 CST m=+3.004544250
requets 4 2018-07-06 10:17:38.976868836 +0800 CST m=+4.000533569