天天看點

Go-并發程式設計基礎(goroutine、channel、select等)概念協程 goroutine通道Channeltime與select

目錄

概念

協程 goroutine

goroutine排程-MPG模式

通道Channel

資料結構

聲明&初始化

發送與接收

周遊和關閉

單方向的channel

channel中的channel

常見錯誤

time與select

逾時

時間間隔

概念

  • 并發:指宏觀上在一段時間内能同時運作多個程式,微觀上交替運作。
  • 并行:指同一時刻能運作多個指令。
  • 程序:一段程式的執行過程,是系統進行資源配置設定的基本機關,一個程序至少有一個線程。
  • 線程:作業系統能夠進行運算排程的最小機關,它被包含在程序之中。

協程 goroutine

  • 有獨立的棧空間
  • 共享程式堆空間
  • 排程由使用者控制
  • 主線程是一個實體線程,直接作用在cpu上的,是重量級的,非常耗費cpu資源,
  • 協程從主線程開啟的,是輕量級的線程,是邏輯态,對資源消耗相對小。
  • Golang的協程機制是重要的特點,可以輕松的開啟上萬個協程。其它程式設計語言的并發機制是一般基于線程的,開啟過多的線程,資源耗費大,這裡就突顯Golang在并發上的優勢了

當一個程式啟動時,其主函數即在一個單獨的goroutine中運作,稱之為main goroutine。新的goroutine會用go語句來建立。在文法上,go語句是一個普通的函數或方法調用前加上關鍵字go。go語句會使其語句中的函數在一個新建立的goroutine中運作。而go語句本身會迅速地完成。當主函數傳回時,所有的goroutine都會直接打斷,程式退出。

操作已經就緒,對應的goroutine就會重新配置設定到邏輯處理器上來完成操作。排程器對可以建立的邏輯處理器的數量沒有限制,但語言運作時預設限制每個程式最多建立10000個線程。這個限制值可以通過調用runtime/debug包的SetMaxThreads方法來更改。如果程式試圖使用更多的線程,就會崩潰。

goroutine排程-MPG模式

M(Main Thread):作業系統的主線程(是實體線程),又稱核心線程。

P(Processor):處理器,管理協程,例如協程執行需要的上下文等

G(Goroutine):go協程

舉個例子:

Go-并發程式設計基礎(goroutine、channel、select等)概念協程 goroutine通道Channeltime與select

分成兩個部分來看

原來的情況是M主線程正在執行G0協程,另外有三個協程在隊列等待如果G0協程阻塞,比如讀取檔案或者資料庫等。

這時就會建立M1主線程(也可能是從已有的線程池中取出M1),并且将等待的3個協程挂到M1下開始執行,M0的主線程下的G0仍然執行檔案io的讀寫。等到G0不阻塞了,M0會被放到空閑的主線程繼續執行(從已有的線程池中取),同時G0又會被喚醒。

這樣的MPG排程模式,可以既讓G0執行,同時也不會讓隊列的其它協程一直阻塞,仍然可以并發/并行執行。

進行排程的排程器為Seched,它維護有存儲空閑的M隊列和空閑的P隊列,可運作的G隊列,自由的G隊列以及排程器的一些狀态資訊等。

package main

import (
	"fmt"
	"runtime"
	"time"
)

func routinetest(name string){
	for i:=0;i<3;i++{
		fmt.Println("routinetest",i,":hello,",name)
		time.Sleep(100*time.Millisecond)
	}
}

func main() {
	num :=runtime.NumCPU()
	fmt.Println(num)
	runtime.GOMAXPROCS(num)
	//runtime.GOMAXPROCS(1)
	//--------使用go開啟協程----------
	go routinetest("lady")
	go routinetest("killer")
	time.Sleep(time.Second)
}
           

可以通過runtime.GOMAXPROCS設定cpu數,這裡設定成了8個。

Go-并發程式設計基礎(goroutine、channel、select等)概念協程 goroutine通道Channeltime與select

通道Channel

相對于sync的低水準同步,使用channel可以實作高水準同步,channel是先進先出的。

資料結構

Channel 在運作時的内部表示是 runtime.hchan,該結構體中包含了用于保護成員變量的互斥鎖,從某種程度上說,Channel 是一個用于同步和通信的有鎖隊列,使用互斥鎖解決程式中可能存在的線程競争問題是很常見的,我們能很容易地實作有鎖隊列。

type hchan struct {
	qcount   uint
	dataqsiz uint
	buf      unsafe.Pointer
	elemsize uint16
	closed   uint32
	elemtype *_type
	sendx    uint
	recvx    uint
	recvq    waitq
	sendq    waitq
	lock mutex
}
           

runtime.hchan 結構體中的五個字段 

qcount

dataqsiz

buf

sendx

recv

 建構底層的循環隊列:

  • qcount

     — Channel 中的元素個數;
  • dataqsiz

     — Channel 中的循環隊列的長度;
  • buf

     — Channel 的緩沖區資料指針;
  • sendx

     — Channel 的發送操作處理到的位置;
  • recvx

     — Channel 的接收操作處理到的位置;

除此之外,

elemsize

 和 

elemtype

 分别表示目前 Channel 能夠收發的元素類型和大小;

sendq

 和 

recvq

 存儲了目前 Channel 由于緩沖區空間不足而阻塞的 Goroutine 清單,這些等待隊列使用雙向連結清單 runtime.waitq 表示,連結清單中所有的元素都是 runtime.sudog 結構:

type waitq struct {
	first *sudog
	last  *sudog
}
           

runtime.sudog 表示一個在等待清單中的 Goroutine,該結構中存儲了兩個分别指向前後 runtime.sudog 的指針以構成連結清單。

聲明&初始化

初始化需要使用make(t Type, size ...IntegerType) Type,size為緩存大小

var b chan int
var c = make(chan int)
var d = make(chan int,10)      

b為nil,c為無緩存channel,d為有緩存channel 

發送與接收

發送使用channel<-data,接收使用[var,ok]:=<-channel,當左側沒有變量接收時會直接丢棄掉資料,ok可以辨別channel是否有資料,無資料是,接收變量擷取到的是對應類型的零值。

對于nil的channel,發送和接收都會阻塞,是以不make的channel沒有用,實際程式設計中channel應該都初始化

對于無緩存的channel,發送後會阻塞,直至接收

對于有緩存的channel,滿了後發送會被阻塞,接收無影響

周遊和關閉

close

關閉後無法寫入,隻能讀取,例如

close(d)
           

普通for循環

for j := 0;j<len(c); j++{
		fmt.Println(<-c)
	}
           

若取的時候,沒有其他goroutine寫入的話,會讀出一半。例如,剛開始len(c)是10個,當j為5時,len(c)也是5了,就跳出循環了。

for j := 0;len(c)!=0; j++{
		fmt.Println(<-c)
	}
           

上面這種方法可以 

for range

關閉後可以正常周遊,周遊也是從channel中接收值,大小會變化,例如

for data := range d{
		fmt.Println(data)
	}
           

若不關閉,會一直接收資料,即使目前channel沒有資料了,無goroutine寫入時會block,若是在main routine中,會導緻deadlock錯誤。

channel狀态總結

動作\狀态 nil 非空 空的 滿了 沒滿
接收 阻塞 接收值 阻塞 接收值 接收值
發送         阻塞 發送值 發送值 阻塞 發送值
關閉 panic 關閉成功,讀完資料後傳回零值 關閉成功,傳回零值 關閉成功,讀完資料後傳回零值 關閉成功,讀完資料後傳回零值

單方向的channel

隻發送chan<-int

隻接收

var in =  make(chan <- int)
var out = make(<-chan int,3)
           

channel中的channel

package main

import "fmt"

type Request struct{
	num int
	result chan int
}

func result(r Request)  {
	r.result <- r.num + 1
}

func main() {
	r := Request{1,make(chan int)}
	go result(r)
	fmt.Println(<-r.result)
}
           

常見錯誤

panic: close of nil channel

關閉nil的channel

fatal error: all goroutines are asleep - deadlock!

main routine被永久阻塞,例如,接收一個空的channel,一直沒有goroutine向裡面放資料

panic: send on closed channel

向關閉的channel中發送資料

time與select

select是針對并發特有的控制結構。和switch很像,但每個case不是表達式而是通信,當有多個case可以時,将僞随機選擇一個,是以不能依賴select來做順序通信。

逾時

func After(d Duration) <-chan Time

 到達一定時間後可以從channel接收資料

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	timeout := time.After(2*time.Second)
	c := make(chan int)
	go func() {
		for {
			c<-0
			time.Sleep(time.Duration(rand.Intn(500))*time.Millisecond)
		}
	}()
	for {
		select {
		case <-c:
			fmt.Println("I'm working...")
		case <-timeout:
			fmt.Println("time out")
			return
		}
	}
}
           
Go-并發程式設計基礎(goroutine、channel、select等)概念協程 goroutine通道Channeltime與select

時間間隔

func Tick(d Duration) <-chan Time
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	timeout := time.After(3*time.Second)
	timetrick := time.Tick(time.Second)
	c := make(chan int)
	go func() {
		for {
			c<-0
			time.Sleep(time.Duration(rand.Intn(500))*time.Millisecond)
		}
	}()
	for {
		select {
		case <-c:
			fmt.Println("I'm working...")
		case <-timetrick:
			fmt.Println("1 second pass")
		case <-timeout:
			fmt.Println("3 second")
			return
		}
	}
}
           
Go-并發程式設計基礎(goroutine、channel、select等)概念協程 goroutine通道Channeltime與select

并發的後序内容檢視:

Go-并發模式1(Basic Examples)

Go-并發模式2(Patterns)

 更多Go相關内容:Go-Golang學習總結筆記

有問題請下方評論,轉載請注明出處,并附有原文連結,謝謝!如有侵權,請及時聯系。如果您感覺有所收獲,自願打賞,可選擇支付寶18833895206(小于),您的支援是我不斷更新的動力。