天天看點

golang标準庫 context的使用

本文索引

  • 問題引入
  • context包簡介
  • 示例

goroutine為我們提供了輕量級的并發實作,作為golang最大的亮點之一更是備受推崇。

goroutine的簡單固然有利于我們的開發,但簡單總是有代價的,考慮如下例子:

func httpDo(req *http.Request, resp *http.Response) {
  for {
    select {
    case <-time.After(5 * time.Second):
      // 從req讀取資料然後發送給resp

    // 其他的一些邏輯(如果有的話)
    }
  }
}

func startListener() {
  // start http listener
  for {
    req, resp := HTTPListener.Accept()
    go httpDo(req, resp)
  }
}
           

上面的例子中,goroutine

httpDo

每隔5秒讀取一次請求資料并發送給響應連結,

startListener

則每收到一個請求就啟動一個goroutine去處理,雖然是僞代碼,不過你已經發現了這是golang處理請求等并發任務時的慣用模型。

看着不是很簡單嗎,簡單而又強大。确實如此,但有一個小問題。假如我的

startListener

崩潰了或者需要重新啟動,這時前面那些連結都需要斷開重連,那麼我們應該怎麼停止那些goroutine呢?

答案是做不到。原因很簡單,當我們使用

go func()

啟動一個goroutine後,除了

channel

sync

包中的同步手段之外,我們沒有任何可以控制goroutine的方法。簡單的說,除非goroutine在函數體内return或者主goroutine終止運作,否則我們是不能通過外部手段幹擾goroutine使其終止的。是以在上述例子中那些goroutine無法終止,這會造成goroutine leak。開頭已經說過,goroutine足夠輕量,通常對于一個函數體不是死循環的goroutine來說我們大可不必關心它的退出操作,然而對于例子中的goroutine來說它會持續運作下去,雖然每個goroutine隻占用很少的資源,但如果數量足夠大的話被浪費的資源是相當驚人的,而一個長時間運作的程式必然因為得不到釋放的資源而出問題。更為緻命的是這種leak的goroutine可能還會造成邏輯上的錯誤進而引發更嚴重的問題。

當然,一點簡單的改造就可以避免問題,這也是goroutine的強大之處。前面我們提到

channel

等同步手段可以間接地控制goroutine,是以我們可以利用一個空

chan

來達到終止所有goroutine的目的:

func httpDo(req *http.Request, resp *http.Response, done <-chan struct{}) {
  for {
    select {
    case <-done:
      // 避免goroutine leak
      return
    case <-time.After(5 * time.Second):
      // 從req讀取資料然後發送給resp

    // 其他的一些邏輯(如果有的話)
    }
  }
}

func startListener() {
  // start http listener

  done := make(chan struct{})
  defer close(done)
  for {
    req, resp := HTTPListener.Accept()
    go httpDo(req, resp, done)
  }
}
           

修改過的程式我們使用一個

chan struct{}

變量進行控制,當

startListener

退出時(無論正常結束還是panic)done都會關閉,關閉後的

chan

會傳回對應類型0值,于是goroutine的select會收到done關閉的信号,随後跟着退出,goroutine leak被避免。

當然,這麼做不夠優雅,畢竟當

startListener

這樣的函數增多後我們不得不每次都寫大量重複的代碼,這樣會讓開發變得乏味。

是以golang1.7引入了

context

包用來優雅地退出goroutine。

golang為了實作優雅地退出goroutine,在1.7引入了`context`。雖然名字叫“上下文”(context)不過其實隻是我們在上一節例子的包裝。

context.Context

是一個接口:

type Context interface {
	// 傳回逾時時間(duration加上建立context對象時的時間),如果已經逾時ok為true
	// 傳回的時間也可以是自己設定的time.Time
	Deadline() (deadline time.Time, ok bool)

	// done信号,和上一節的做法一樣,這裡進行了一些包裝
	Done() <-chan struct{}

	// 如果Done未被關閉就傳回nil。
	// 否則傳回相應的錯誤,比如調用了cancel()會傳回Canceled;逾時會傳回DeadlineExceeded
	Err() error

	// 可以給context設定一些值,使用方法和map類似,key需要支援==比較操作,value需要是并發安全的
	Value(key interface{}) interface{}
}
           

實作了Context接口的對象都是并發安全的(如果你自己實作了這個接口也必須確定并發安全)。

context的使用很簡單,首先在需要産生goroutine的函數中建立一個context對象,然後将其作為goroutine的第一個參數傳入,例如

go func(ctx context.Context) {} (ctx)

,如果在goroutine裡還會運作新的goroutine,那麼就繼續傳遞這個context對象。

如此一來最初的那個context對象就被稱為parent, 其餘goroutine中的被稱為關聯context,通過這種關系我們就可以把相關的goroutine聯系在一起。

對于一個作為parent的context對象來說它也必須基于一個parent來建立,是以context提供了兩個建立空context的函數:

func Background() Context
func TODO() Context
           

兩者都傳回一個空context,一個context不會被取消(cancel),也不會逾時。它們唯一的差別是

TODO

表示你的代碼正在準備使用context但仍然需要一些調整,這回告訴靜态代碼分析工具

go vet

不彙報某些context的使用錯誤,而通常我們應該使用

Background

産生的context來建立我們自己的context對象。

有了parent之後就可以建立我們需要的context對象了,context包提供了三種context,分别是是普通context,逾時context以及帶值的context:

// 普通context,通常這樣調用ctx, cancel := context.WithCancel(context.Background())
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// 帶逾時的context,逾時之後會自動close對象的Done,與調用CancelFunc的效果一樣
// WithDeadline 明确地設定一個d指定的系統時鐘時間,如果超過就觸發逾時
// WithTimeout 設定一個相對的逾時時間,也就是deadline設為timeout加上目前的系統時間
// 因為兩者事實上都依賴于系統時鐘,是以可能存在微小的誤差,是以官方不推薦把逾時間隔設定得太小
// 通常這樣調用ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

// 帶有值的context,沒有CancelFunc,是以它隻用于值的多goroutine傳遞和共享
// 通常這樣調用ctx := context.WithValue(context.Background(), "key", myValue)
func WithValue(parent Context, key, val interface{}) Context
           

對于會傳回

CancelFunc

的函數,我們必須要使用

defer cancel()

,否則靜态檢查例如

go vet

會報錯,理由是因為如果不用defer來終止context的話不能避免goroutine leak,對于帶有逾時的context來說cancel還可以停止計時器釋放對應的資源。另外多次調用cancel是無害的,是以及時一個context因為逾時而被取消,你依然可以對其使用cancel。是以我們應該把cancel的調用放在defer語句中。

上面是在主goroutine中的處理,對于傳入context的goroutine來說需要做一些結構上的改變:

func coroutine(ctx context.Context, data <-chan int) {
  // setup something
  for {
    select {
    case <-ctx.Done():
      // 一些清理操作
      return
    case i := <-data:
      go handle(ctx, i)
    }
  }
}
           

可以看見goroutine的主要邏輯結構需要由select包裹,首先檢查本次任務有沒有取消,沒有取消或者逾時就從chan裡讀取資料進行處理,如果需要啟動其他goroutine就把ctx傳遞下去。

golang的初學者可能會對這段代碼産生不少疑惑,但是等熟悉了goroutine+chan的使用後就會發現這隻是對既有模型的微調,十分便于遷移和修改。

雖然說了這麼多,實際上還都是些很抽象的概念,是以這一節舉幾個例子輔助了解。

首先是使用逾時context的例子,每個goroutine運作5秒,每隔一秒列印一段資訊,5秒後終止運作:

func coroutine(ctx context.Context, duration time.Duration, id int, wg *sync.WaitGroup) {
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("goroutine %d finish\n", id)
			wg.Done()
			return
		case <-time.After(duration):
			fmt.Printf("message from goroutine %d\n", id)
		}
	}
}

func main() {
	wg := &sync.WaitGroup{}
	ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
	defer cancel()

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go coroutine(ctx, 1 * time.Second, i, wg)
	}

	wg.Wait()
}
           

我們使用

WaitGroup

等待所有的goroutine執行完畢,在收到

<-ctx.Done()

的終止信号後使wg中需要等待的goroutine數量減一。因為context隻負責取消goroutine,不負責等待goroutine運作,是以需要配合一點輔助手段。如果運作程式你會得到類似如下結果(不同環境運作結果可能不同):

message from goroutine 0
message from goroutine 2
message from goroutine 4
message from goroutine 3
message from goroutine 1
message from goroutine 2
message from goroutine 4
message from goroutine 0
message from goroutine 1
message from goroutine 3
message from goroutine 3
message from goroutine 0
message from goroutine 4
message from goroutine 2
message from goroutine 1
message from goroutine 0
message from goroutine 2
message from goroutine 4
message from goroutine 3
message from goroutine 1
goroutine 0 finish
goroutine 3 finish
goroutine 1 finish
goroutine 2 finish
goroutine 4 finish
           

上一個例子中示範了逾時控制,下一個例子将會示範如何用普通context取消一個goroutine:

func main() {
	// gen是一個生成器,傳回從1開始的遞增數字直到自身被取消
	gen := func(ctx context.Context) <-chan int {
		dst := make(chan int)
		n := 1
		go func() {
			for {
				select {
				case <-ctx.Done():
					return
				case dst <- n:
					n++
				}
			}
		}()
		return dst
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for n := range gen(ctx) {
		fmt.Println(n)
		// 生成到5時終止生成器運作
		if n == 5 {
			break
		}
	}
}
           

運作結果将會輸出1-5的數字,當生成5之後for循環終止,main退出前defer語句生效,終止goroutine的運作。

最後一個例子是如何在goroutine間共享變量的。

因為可能會被多個goroutine同時修改,是以我們的value必須保證并發安全,不過也可以換種思路,隻要保證對value的操作是并發安全的就可以了:

func main() {
	var v int64
	wg := sync.WaitGroup{}
	ctx := context.WithValue(context.Background(), "myKey", &v)

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(ctx context.Context, key string) {
			// 取出來的是interface{},需要先斷言成我們需要的類型
			value := ctx.Value(key).(*int64)
			// 原子操作,并發安全
			atomic.AddInt64(value, 1)
			wg.Done()
		}(ctx, "myKey")
	}

	wg.Wait()
	// 類型斷言成*int64然後解引用
	fmt.Println(*(ctx.Value("myKey").(*int64)))
}
           

運作結果會列印出10,因為有10個goroutine分别對v原子地加了一。

當然,引入類型斷言後代碼複雜度有所提升,但資料的共享卻友善了,你可以基于帶值的context為parent繼續建構可以取消或逾時的context,同時可以在其中分發資料而無需将其作為參數傳遞。

context包的使用就是這麼簡單,還有更多對于context的應用,這裡就不一一列舉了,希望各位讀者在以後的開發中能夠多加利用context包,寫出健壯的更優雅的代碼。

參考

context包官方文檔

官方部落格的介紹