通道
goroutine和channel的存在,讓Go語言在并發程式設計很多情況下不需要考慮鎖機制以及由此帶來的各種問題。和Java多線程一樣,Go應用能非常有效的利用多核CPU,并發執行的性能好。
而Python由于全局鎖GIL的原因,多線程的Python程式并不能有效利用多核,單個Python應用隻能寫多程序來利用多核CPU。Python如果用标準庫裡的multiprocessing包又會對監控和管理造成不少的挑戰。部署Python應用的時候通常是每個CPU核部署一個應用,造成不少資源的浪費。
我們來看一下Go語言中的通道。Go語言中通道類型是chan關鍵字,通過make來建立。可以建立帶緩沖的通道和無緩沖的通道。
建立通道
unbuffered := make(chan int) // 無緩沖的整型通道
buffered := make(chan string, 10) // 有緩沖的字元型通道
緩沖通道和無緩沖通道有什麼差別?
無緩沖通道沒有容量,我們想要往通道中發送資料,那麼一定要有另一個操作在往這個通道取資料,否則,發送資料的那段代碼也會被阻塞。
緩沖通道隻是有一段容量可以緩沖一下,當緩沖區未滿時,往通道中發送資料不會阻塞目前操作;當緩沖區滿了時往通道中發送資料,會想無緩沖通道那樣阻塞。
發送資料
buffered <- "Gopher" // 通過通道發送一個字元串
接收資料
received := <- buffered // 通過通道接收一個字元串
<- unbuffered // 接收并丢棄字元串,如果unbuffered通道裡沒資料,則會一直阻塞直到有資料放入
receiver, ok := <- buffered
if !ok {
xxx // 通道被關閉等問題
}
關閉通道
通常來說,我們往通道裡發送完資料後,我們應該關閉通道(一般來說由生産者來關閉通道),這就相當于告訴消費者,我的資料發送完了,你取完了資料之後就離開吧。
我們應該在所有資料都放入通道後再關閉通道,否則,如果先關閉通道,再往裡放資料的話會産生panic。看下面的例子:
close(buffered)
go func() {
for {
j, more := <-jobs // 如果通道已被關閉,并且資料已全被取走,則more為false
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
for post := 1; post <= taskLoad; post++ {
tasks <- fmt.Sprintf("Task : %d", post)
}
// 當所有工作都處理完時關閉通道,以便所有goroutine退出
close(tasks)
通道可以用來實作同步功能:
func worker(done chan bool) {
fmt.Print("working...")
time.Sleep(time.Second)
fmt.Println("done")
done <- true
close(done)
}
func main() {
done := make(chan bool)
go worker(done)
<-done // 阻塞,直到通道中有資料
}
單向通道
預設地,我們使用make(ch chan int)建立的通道都是雙向通道,我們可以往裡面發送資料也可以從裡面讀取資料。我們也可以在定義通道的地方指明通道的方向,看下面的例子:
func ping(pings chan<- string, msg string) {
// pings單向通道,函數中隻能往pings發送資料
pings <- msg
}
func pong(pings <-chan string, pongs chan<- string) {
// pings單向通道,函數中隻能從pings接收資料
// pongs單向通道,函數中隻能往pongs發送資料
msg := <-pings
pongs <- msg
}
聲明單向通道最大的好處就是可以限制程式的行為。比如我們在定義接口給其他人調用時,我們就可以指明通道為單向通道,接口使用者隻能往通道發送資料或者隻能從通道讀取資料。
生産者消費者問題
生産者隻往通道裡面發送資料,發送完後關閉通道。消費者隻從通道裡面讀取資料。這可比Java中使用wait/notify來實作生産者消費者友善多了。
func producer(ch chan<- int) {
for i := 1; i <= 10; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for i := range ch {
fmt.Println("consumed something: ", i)
}
}
func main() {
ch := make(chan int)
go producer(ch)
go consumer(ch)
time.Sleep(time.Second)
}
goroutine
Go語言中的goroutine,一般也稱作協程,不由OS排程,而是使用者層自行釋放CPU,進而在執行體之間切換排程運作。Go語言在底層進行協助實作。涉及系統調用的地方由Go标準庫協助釋放CPU。總之,協程運作不通過OS進行切換,由Go語言自行切換,系統運作開支大大降低。一個程序内部可以運作多個線程,而每個線程又可以運作很多協程。線程要負責對協程進行排程。當一個協程睡眠時,它将線程的運作權讓給其它的協程來運作。同一個線程内部最多隻會有一個協程正在運作。
建立Goroutine
建立goroutine很友善,建立個匿名函數并調用,然後在前面加上go關鍵字就可以了。
go func(msg string) {
fmt.Println(msg)
}("going")
還可以把函數賦給一個變量,然後建立goroutine運作:
var myFunc = func(msg string) {
fmt.Println(msg)
}
go myFunc("going")
主協程和子協程
Go語言所有代碼都運作在協程中,由Go運作時進行排程,其中main函數所在的協程叫做主協程,其餘的協程叫做子協程。
func main() {
for i := 0; i < 5; i++ {
go func(i int) {
fmt.Print(i, " ")
}(i)
}
time.Sleep(time.Second)
}
// 輸出:0 4 3 1 2
當主協程執行完而子協程未執行完時,所有子協程會随着主協程的結束而退出運作。上面的代碼輸出“0 4 3 1 2 ”,這是因為建立完協程後放入運作時隊列到真正運作有個過程,哪個協程先被放入運作時隊列,哪個就能先執行。
線程的排程是由作業系統負責的,排程算法運作在核心态,而協程的調用是由 Go 語言的運作時負責的,排程算法運作在使用者态。這就使得Go語言的協程比Java的線程更加輕量級。單一的Java應用建立幾十上百個線程并運作就已經非常消耗記憶體和CPU資源了,而單一的Go應用可以建立上千萬的goroutine還正常地運作。
GPM
先回顧一下Java中線程和作業系統核心線程的對應關系。Java中線程模型如下:
Java線程的實作方式是通過作業系統提供的進階程式設計接口——LWP(輕量級程序)來對應到核心線程上的,一個Java線程對應到一個LWP,對應到一個KLT(核心線程)。當Java線程阻塞,需要排程時,由核心排程器來進行切換,把目前KLT-LWP對應的Java線程切換到另一個Java線程,這種切換操作在核心态中進行,這種切換是很耗時的(相對CPU來說)。Java的一個線程對應到一個核心線程,一般建立Java線程占用的記憶體都大于1M。
Go語言使用了使用者級線程的實作方式,Go語言中的goroutine可以了解為使用者态的線程,排程切換goroutine直接在使用者态進行,不用切換到核心态。
Go 排程器模型我們通常叫做GPM 模型,包括 4 個重要結構:
- G:Goroutine,每個 Goroutine 對應一個 G 結構體,我們使用go關鍵字建立goroutine,并非就一定建立了G結構體的執行個體,隻有當沒有可用的G時,才會建立G來裝載我們建立的goroutine,否則,會複用現有可用的G來裝載goroutine。G 存儲 Goroutine 的運作堆棧、狀态以及任務函數,可重用。G 并非執行體,每個 G 需要綁定到 P 才能被排程執行。
- P: Processor,表示邏輯處理器,對 G 來說,P 相當于 CPU 核心,G 隻有綁定到 P 才能被排程。對 M 來說,P 提供了相關的執行環境(Context),如記憶體配置設定狀态(mcache),任務隊列(G)等。P 的數量決定了系統内最大可并行的 G 的數量(前提:實體 CPU 核數 >= P 的數量)。P 的數量由使用者設定的 GOMAXPROCS 決定,但是不論 GOMAXPROCS 設定為多大,P 的數量最大為 256。
- M: Machine,OS 核心線程抽象,代表着真正執行計算的資源,在綁定有效的 P 後,進入 schedule 循環;而 schedule 循環的機制大緻是從 Global 隊列、P 的 Local 隊列以及 wait 隊列中擷取。M 的數量是不定的,由 Go Runtime 調整,為了防止建立過多 OS 線程導緻系統排程不過來,目前預設最大限制為 10000 個。M 并不保留 G 狀态,這是 G 可以跨 M 排程的基礎。
- Sched:Go 排程器,它維護有存儲 M 和 G 的隊列以及排程器的一些狀态資訊等。
GPM和Sched的關系如下:
Go語言排程器Sched擁有全局運作隊列(GPQ)和P的本地運作隊列(LRQ)。GRQ中維護着可以運作但是沒有綁定到P的所有goroutine,LRQ中維護着綁定到P的可運作的所有goroutine。可以看到,上圖中有一個綁定到P的goroutine正在M上運作,M對應到作業系統核心線程。
當正在M上運作的G在某些情況阻塞時,Sched可以直接把P上其他可運作的G排程進來,這種排程操作在使用者态進行,十分輕量級。而且,Go語言中建立的goroutine隻需要大約占用2KB的記憶體,比Java中線程占用空間小多了。是以,Go語言程式可以輕松hold住百萬goroutine的排程運作。
GPM源碼和Sched源碼如下,從源碼中,我們更能清楚GPM之間的關系:
G
G中會儲存goroutine運作函數,goroutine的id,還有被綁定運作的M的指針:
struct G {
uintptr stackguard; // 分段棧的可用空間下界
uintptr stackbase; // 分段棧的棧基址
Gobuf sched; //程序切換時,利用sched域來儲存上下文
uintptr stack0;
FuncVal* fnstart; // goroutine運作的函數
void* param; // 用于傳遞參數,睡眠時其它goroutine設定param,喚醒時此goroutine可以擷取
int16 status; // 狀态Gidle,Grunnable,Grunning,Gsyscall,Gwaiting,Gdead
int64 goid; // goroutine的id号
G* schedlink;
M* m; // for debuggers, but offset not hard-coded
M* lockedm; // G被鎖定隻能在這個m上運作
uintptr gopc; // 建立這個goroutine的go表達式的pc
...
};
P
P中含有本地運作隊列LRQ,P目前的狀态:
struct P {
Lock;
uint32 status; // Pidle或Prunning等
P* link;
uint32 schedtick; // 每次排程時将它加一
M* m; // 連結到它關聯的M (nil if idle)
MCache* mcache;
G* runq[256];
int32 runqhead;
int32 runqtail;
// Available G's (status == Gdead)
G* gfree;
int32 gfreecnt;
byte pad[64];
};
M
M中包含了綁定到M的P的指針,正在M上運作的G的指針,M的狀态等等:
struct M {
G* g0; // 帶有排程棧的goroutine
G* gsignal; // signal-handling G 處理信号的goroutine
void (*mstartfn)(void);
G* curg; // M中目前運作的goroutine
P* p; // 關聯P以執行Go代碼 (如果沒有執行Go代碼則P為nil)
P* nextp;
int32 id;
int32 mallocing; //狀态
int32 throwing;
int32 gcing;
int32 locks;
int32 helpgc; //不為0表示此m在做幫忙gc。helpgc等于n隻是一個編号
bool blockingsyscall;
bool spinning;
Note park;
M* alllink; // 這個域用于連結allm
M* schedlink;
MCache *mcache;
G* lockedg;
M* nextwaitm; // next M waiting for lock
GCStats gcstats;
...
};
Sched
Sched包含目前空閑的Md指針,目前空閑的P的指針,全局運作隊列GRQ等等:
struct Sched {
Lock;
uint64 goidgen;
M* midle; // idle m's waiting for work
int32 nmidle; // number of idle m's waiting for work
int32 nmidlelocked; // number of locked m's waiting for work
int3 mcount; // number of m's that have been created
int32 maxmcount; // maximum number of m's allowed (or die)
P* pidle; // idle P's
uint32 npidle; //idle P的數量
uint32 nmspinning;
// Global runnable queue.
G* runqhead;
G* runqtail;
int32 runqsize;
// Global cache of dead G's.
Lock gflock;
G* gfree;
int32 stopwait;
Note stopnote;
uint32 sysmonwait;
Note sysmonnote;
uint64 lastpoll;
int32 profilehz; // cpu profiling rate
}
喜歡的可以關注WeiXin訂閱号
參考
- Go 為什麼這麼“快”