本文索引
- 問題引入
- context包簡介
- 示例
goroutine為我們提供了輕量級的并發實作,作為golang最大的亮點之一更是備受推崇。
goroutine的簡單固然有利于我們的開發,但簡單總是有代價的,考慮如下例子:
func httpDo(req *http.Request, resp *http.Response) {
for {
select {
case <-time.After(5 * time.Second):
// 從req讀取資料然後發送給resp
// 其他的一些邏輯(如果有的話)
}
}
}
func startListener() {
// start http listener
for {
req, resp := HTTPListener.Accept()
go httpDo(req, resp)
}
}
上面的例子中,goroutine
httpDo
每隔5秒讀取一次請求資料并發送給響應連結,
startListener
則每收到一個請求就啟動一個goroutine去處理,雖然是僞代碼,不過你已經發現了這是golang處理請求等并發任務時的慣用模型。
看着不是很簡單嗎,簡單而又強大。确實如此,但有一個小問題。假如我的
startListener
崩潰了或者需要重新啟動,這時前面那些連結都需要斷開重連,那麼我們應該怎麼停止那些goroutine呢?
答案是做不到。原因很簡單,當我們使用
go func()
啟動一個goroutine後,除了
channel
和
sync
包中的同步手段之外,我們沒有任何可以控制goroutine的方法。簡單的說,除非goroutine在函數體内return或者主goroutine終止運作,否則我們是不能通過外部手段幹擾goroutine使其終止的。是以在上述例子中那些goroutine無法終止,這會造成goroutine leak。開頭已經說過,goroutine足夠輕量,通常對于一個函數體不是死循環的goroutine來說我們大可不必關心它的退出操作,然而對于例子中的goroutine來說它會持續運作下去,雖然每個goroutine隻占用很少的資源,但如果數量足夠大的話被浪費的資源是相當驚人的,而一個長時間運作的程式必然因為得不到釋放的資源而出問題。更為緻命的是這種leak的goroutine可能還會造成邏輯上的錯誤進而引發更嚴重的問題。
當然,一點簡單的改造就可以避免問題,這也是goroutine的強大之處。前面我們提到
channel
等同步手段可以間接地控制goroutine,是以我們可以利用一個空
chan
來達到終止所有goroutine的目的:
func httpDo(req *http.Request, resp *http.Response, done <-chan struct{}) {
for {
select {
case <-done:
// 避免goroutine leak
return
case <-time.After(5 * time.Second):
// 從req讀取資料然後發送給resp
// 其他的一些邏輯(如果有的話)
}
}
}
func startListener() {
// start http listener
done := make(chan struct{})
defer close(done)
for {
req, resp := HTTPListener.Accept()
go httpDo(req, resp, done)
}
}
修改過的程式我們使用一個
chan struct{}
變量進行控制,當
startListener
退出時(無論正常結束還是panic)done都會關閉,關閉後的
chan
會傳回對應類型0值,于是goroutine的select會收到done關閉的信号,随後跟着退出,goroutine leak被避免。
當然,這麼做不夠優雅,畢竟當
startListener
這樣的函數增多後我們不得不每次都寫大量重複的代碼,這樣會讓開發變得乏味。
是以golang1.7引入了
context
包用來優雅地退出goroutine。
golang為了實作優雅地退出goroutine,在1.7引入了`context`。雖然名字叫“上下文”(context)不過其實隻是我們在上一節例子的包裝。
context.Context
是一個接口:
type Context interface {
// 傳回逾時時間(duration加上建立context對象時的時間),如果已經逾時ok為true
// 傳回的時間也可以是自己設定的time.Time
Deadline() (deadline time.Time, ok bool)
// done信号,和上一節的做法一樣,這裡進行了一些包裝
Done() <-chan struct{}
// 如果Done未被關閉就傳回nil。
// 否則傳回相應的錯誤,比如調用了cancel()會傳回Canceled;逾時會傳回DeadlineExceeded
Err() error
// 可以給context設定一些值,使用方法和map類似,key需要支援==比較操作,value需要是并發安全的
Value(key interface{}) interface{}
}
實作了Context接口的對象都是并發安全的(如果你自己實作了這個接口也必須確定并發安全)。
context的使用很簡單,首先在需要産生goroutine的函數中建立一個context對象,然後将其作為goroutine的第一個參數傳入,例如
go func(ctx context.Context) {} (ctx)
,如果在goroutine裡還會運作新的goroutine,那麼就繼續傳遞這個context對象。
如此一來最初的那個context對象就被稱為parent, 其餘goroutine中的被稱為關聯context,通過這種關系我們就可以把相關的goroutine聯系在一起。
對于一個作為parent的context對象來說它也必須基于一個parent來建立,是以context提供了兩個建立空context的函數:
func Background() Context
func TODO() Context
兩者都傳回一個空context,一個context不會被取消(cancel),也不會逾時。它們唯一的差別是
TODO
表示你的代碼正在準備使用context但仍然需要一些調整,這回告訴靜态代碼分析工具
go vet
不彙報某些context的使用錯誤,而通常我們應該使用
Background
産生的context來建立我們自己的context對象。
有了parent之後就可以建立我們需要的context對象了,context包提供了三種context,分别是是普通context,逾時context以及帶值的context:
// 普通context,通常這樣調用ctx, cancel := context.WithCancel(context.Background())
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 帶逾時的context,逾時之後會自動close對象的Done,與調用CancelFunc的效果一樣
// WithDeadline 明确地設定一個d指定的系統時鐘時間,如果超過就觸發逾時
// WithTimeout 設定一個相對的逾時時間,也就是deadline設為timeout加上目前的系統時間
// 因為兩者事實上都依賴于系統時鐘,是以可能存在微小的誤差,是以官方不推薦把逾時間隔設定得太小
// 通常這樣調用ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
// 帶有值的context,沒有CancelFunc,是以它隻用于值的多goroutine傳遞和共享
// 通常這樣調用ctx := context.WithValue(context.Background(), "key", myValue)
func WithValue(parent Context, key, val interface{}) Context
對于會傳回
CancelFunc
的函數,我們必須要使用
defer cancel()
,否則靜态檢查例如
go vet
會報錯,理由是因為如果不用defer來終止context的話不能避免goroutine leak,對于帶有逾時的context來說cancel還可以停止計時器釋放對應的資源。另外多次調用cancel是無害的,是以及時一個context因為逾時而被取消,你依然可以對其使用cancel。是以我們應該把cancel的調用放在defer語句中。
上面是在主goroutine中的處理,對于傳入context的goroutine來說需要做一些結構上的改變:
func coroutine(ctx context.Context, data <-chan int) {
// setup something
for {
select {
case <-ctx.Done():
// 一些清理操作
return
case i := <-data:
go handle(ctx, i)
}
}
}
可以看見goroutine的主要邏輯結構需要由select包裹,首先檢查本次任務有沒有取消,沒有取消或者逾時就從chan裡讀取資料進行處理,如果需要啟動其他goroutine就把ctx傳遞下去。
golang的初學者可能會對這段代碼産生不少疑惑,但是等熟悉了goroutine+chan的使用後就會發現這隻是對既有模型的微調,十分便于遷移和修改。
雖然說了這麼多,實際上還都是些很抽象的概念,是以這一節舉幾個例子輔助了解。
首先是使用逾時context的例子,每個goroutine運作5秒,每隔一秒列印一段資訊,5秒後終止運作:
func coroutine(ctx context.Context, duration time.Duration, id int, wg *sync.WaitGroup) {
for {
select {
case <-ctx.Done():
fmt.Printf("goroutine %d finish\n", id)
wg.Done()
return
case <-time.After(duration):
fmt.Printf("message from goroutine %d\n", id)
}
}
}
func main() {
wg := &sync.WaitGroup{}
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
for i := 0; i < 5; i++ {
wg.Add(1)
go coroutine(ctx, 1 * time.Second, i, wg)
}
wg.Wait()
}
我們使用
WaitGroup
等待所有的goroutine執行完畢,在收到
<-ctx.Done()
的終止信号後使wg中需要等待的goroutine數量減一。因為context隻負責取消goroutine,不負責等待goroutine運作,是以需要配合一點輔助手段。如果運作程式你會得到類似如下結果(不同環境運作結果可能不同):
message from goroutine 0
message from goroutine 2
message from goroutine 4
message from goroutine 3
message from goroutine 1
message from goroutine 2
message from goroutine 4
message from goroutine 0
message from goroutine 1
message from goroutine 3
message from goroutine 3
message from goroutine 0
message from goroutine 4
message from goroutine 2
message from goroutine 1
message from goroutine 0
message from goroutine 2
message from goroutine 4
message from goroutine 3
message from goroutine 1
goroutine 0 finish
goroutine 3 finish
goroutine 1 finish
goroutine 2 finish
goroutine 4 finish
上一個例子中示範了逾時控制,下一個例子将會示範如何用普通context取消一個goroutine:
func main() {
// gen是一個生成器,傳回從1開始的遞增數字直到自身被取消
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
return
case dst <- n:
n++
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for n := range gen(ctx) {
fmt.Println(n)
// 生成到5時終止生成器運作
if n == 5 {
break
}
}
}
運作結果将會輸出1-5的數字,當生成5之後for循環終止,main退出前defer語句生效,終止goroutine的運作。
最後一個例子是如何在goroutine間共享變量的。
因為可能會被多個goroutine同時修改,是以我們的value必須保證并發安全,不過也可以換種思路,隻要保證對value的操作是并發安全的就可以了:
func main() {
var v int64
wg := sync.WaitGroup{}
ctx := context.WithValue(context.Background(), "myKey", &v)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(ctx context.Context, key string) {
// 取出來的是interface{},需要先斷言成我們需要的類型
value := ctx.Value(key).(*int64)
// 原子操作,并發安全
atomic.AddInt64(value, 1)
wg.Done()
}(ctx, "myKey")
}
wg.Wait()
// 類型斷言成*int64然後解引用
fmt.Println(*(ctx.Value("myKey").(*int64)))
}
運作結果會列印出10,因為有10個goroutine分别對v原子地加了一。
當然,引入類型斷言後代碼複雜度有所提升,但資料的共享卻友善了,你可以基于帶值的context為parent繼續建構可以取消或逾時的context,同時可以在其中分發資料而無需将其作為參數傳遞。
context包的使用就是這麼簡單,還有更多對于context的應用,這裡就不一一列舉了,希望各位讀者在以後的開發中能夠多加利用context包,寫出健壯的更優雅的代碼。
參考
context包官方文檔
官方部落格的介紹