天天看點

golang中context詳解三(應用場景)

作者:幹飯人小羽
golang中context詳解三(應用場景)

golang中Context的使用場景

context在Go1.7之後就進入标準庫中了。它主要的用處如果用一句話來說,是在于控制goroutine的生命周期。當一個計算任務被goroutine承接了之後,由于某種原因(逾時,或者強制退出)我們希望中止這個goroutine的計算任務,那麼就用得到這個Context了。

關于Context的四種結構,CancelContext,TimeoutContext,DeadLineContext,ValueContext的使用在這一篇快速掌握 Golang context 包已經說的很明白了。

本文主要來盤一盤golang中context的一些使用場景:

場景一:RPC調用

在主goroutine上有4個RPC,RPC2/3/4是并行請求的,我們這裡希望在RPC2請求失敗之後,直接傳回錯誤,并且讓RPC3/4停止繼續計算。這個時候,就使用的到Context。

這個的具體實作如下面的代碼。

package main

import (
	"context"
	"github.com/pkg/errors"
	"sync"
)

func Rpc(ctx context.Context, url string) error {
	result := make(chan int)
	err := make(chan error)

	go func() {
		// 進行RPC調用,并且傳回是否成功,成功通過result傳遞成功資訊,錯誤通過error傳遞錯誤資訊
		isSuccess := true
		if isSuccess {
			result <- 1
		} else {
			err <- errors.New("some error happen")
		}
	}()

	select {
		case <- ctx.Done():
			// 其他RPC調用調用失敗
			return ctx.Err()
		case e := <- err:
			// 本RPC調用失敗,傳回錯誤資訊
			return e
		case <- result:
			// 本RPC調用成功,不傳回錯誤資訊
			return nil
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	// RPC1調用
	err := Rpc(ctx, "http://rpc_1_url")
	if err != nil {
		return
	}

	wg := sync.WaitGroup{}

	// RPC2調用
	wg.Add(1)
	go func(){
		defer wg.Done()
		err := Rpc(ctx, "http://rpc_2_url")
		if err != nil {
			cancel()
		}
	}()

	// RPC3調用
	wg.Add(1)
	go func(){
		defer wg.Done()
		err := Rpc(ctx, "http://rpc_3_url")
		if err != nil {
			cancel()
		}
	}()

	// RPC4調用
	wg.Add(1)
	go func(){
		defer wg.Done()
		err := Rpc(ctx, "http://rpc_4_url")
		if err != nil {
			cancel()
		}
	}()

	wg.Wait()
}

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778           

當然我這裡使用了waitGroup來保證main函數在所有RPC調用完成之後才退出。

在Rpc函數中,第一個參數是一個CancelContext, 這個Context形象的說,就是一個傳話筒,在建立CancelContext的時候,傳回了一個聽聲器(ctx)和話筒(cancel函數)。所有的goroutine都拿着這個聽聲器(ctx),當主goroutine想要告訴所有goroutine要結束的時候,通過cancel函數把結束的資訊告訴給所有的goroutine。當然所有的goroutine都需要内置處理這個聽聲器結束信号的邏輯(ctx->Done())。我們可以看Rpc函數内部,通過一個select來判斷ctx的done和目前的rpc調用哪個先結束。

這個waitGroup和其中一個RPC調用就通知所有RPC的邏輯,其實有一個包已經幫我們做好了。errorGroup。具體這個errorGroup包的使用可以看這個包的test例子。

有人可能會擔心我們這裡的cancel()會被多次調用,context包的cancel調用是幂等的。可以放心多次調用。

我們這裡不妨品一下,這裡的Rpc函數,實際上我們的這個例子裡面是一個“阻塞式”的請求,這個請求如果是使用http.Get或者http.Post來實作,實際上Rpc函數的Goroutine結束了,内部的那個實際的http.Get卻沒有結束。是以,需要了解下,這裡的函數最好是“非阻塞”的,比如是http.Do,然後可以通過某種方式進行中斷。比如像這篇文章Cancel http.Request using Context中的這個例子:

func httpRequest(
  ctx context.Context,
  client *http.Client,
  req *http.Request,
  respChan chan []byte,
  errChan chan error
) {
  req = req.WithContext(ctx)
  tr := &http.Transport{}
  client.Transport = tr
  go func() {
    resp, err := client.Do(req)
    if err != nil {
      errChan <- err
    }
    if resp != nil {
      defer resp.Body.Close()
      respData, err := ioutil.ReadAll(resp.Body)
      if err != nil {
        errChan <- err
      }
      respChan <- respData
    } else {
      errChan <- errors.New("HTTP request failed")
    }
  }()
  for {
    select {
    case <-ctx.Done():
      tr.CancelRequest(req)
      errChan <- errors.New("HTTP request cancelled")
      return
    case <-errChan:
      tr.CancelRequest(req)
      return
    }
  }
}

123456789101112131415161718192021222324252627282930313233343536373839           

它使用了http.Client.Do,然後接收到ctx.Done的時候,通過調用transport.CancelRequest來進行結束。

我們還可以參考net/dail/DialContext

換而言之,如果你希望你實作的包是“可中止/可控制”的,那麼你在你包實作的函數裡面,最好是能接收一個Context函數,并且處理了Context.Done。

場景二:PipeLine

pipeline模式就是流水線模型,流水線上的幾個勞工,有n個産品,一個一個産品進行組裝。其實pipeline模型的實作和Context并無關系,沒有context我們也能用chan實作pipeline模型。但是對于整條流水線的控制,則是需要使用上Context的。這篇文章Pipeline Patterns in Go的例子是非常好的說明。這裡就大緻對這個代碼進行下說明。

runSimplePipeline的流水線勞工有三個,lineListSource負責将參數一個個分割進行傳輸,lineParser負責将字元串處理成int64,sink根據具體的值判斷這個資料是否可用。他們所有的傳回值基本上都有兩個chan,一個用于傳遞資料,一個用于傳遞錯誤。(<-chan string, <-chan error)輸入基本上也都有兩個值,一個是Context,用于傳聲控制的,一個是(in <-chan)輸入産品的。

我們可以看到,這三個勞工的具體函數裡面,都使用switch處理了case <-ctx.Done()。這個就是生産線上的指令控制。

func lineParser(ctx context.Context, base int, in <-chan string) (
	<-chan int64, <-chan error, error) {
	...
	go func() {
		defer close(out)
		defer close(errc)

		for line := range in {

			n, err := strconv.ParseInt(line, base, 64)
			if err != nil {
				errc <- err
				return
			}

			select {
			case out <- n:
			case <-ctx.Done():
				return
			}
		}
	}()
	return out, errc, nil
}

12345678910111213141516171819202122232425           

場景三:逾時請求

我們發送RPC請求的時候,往往希望對這個請求進行一個逾時的限制。當一個RPC請求超過10s的請求,自動斷開。當然我們使用CancelContext,也能實作這個功能(開啟一個新的goroutine,這個goroutine拿着cancel函數,當時間到了,就調用cancel函數)。

鑒于這個需求是非常常見的,context包也實作了這個需求:timerCtx。具體執行個體化的方法是 WithDeadline 和 WithTimeout。

具體的timerCtx裡面的邏輯也就是通過time.AfterFunc來調用ctx.cancel的。

官方的例子:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err()) // prints "context deadline exceeded"
    }
}

1234567891011121314151617181920           

在http的用戶端裡面加上timeout也是一個常見的辦法

uri := "https://httpbin.org/delay/3"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
	log.Fatalf("http.NewRequest() failed with '%s'\n", err)
}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
	log.Fatalf("http.DefaultClient.Do() failed with:\n'%s'\n", err)
}
defer resp.Body.Close()
1234567891011121314           

在http服務端設定一個timeout如何做呢?

package main

import (
	"net/http"
	"time"
)

func test(w http.ResponseWriter, r *http.Request) {
	time.Sleep(20 * time.Second)
	w.Write([]byte("test"))
}


func main() {
	http.HandleFunc("/", test)
	timeoutHandler := http.TimeoutHandler(http.DefaultServeMux, 5 * time.Second, "timeout")
	http.ListenAndServe(":8080", timeoutHandler)
}

12345678910111213141516171819           

我們看看TimeoutHandler的内部,本質上也是通過context.WithTimeout來做處理。

func (h *timeoutHandler) ServeHTTP(w ResponseWriter, r *Request) {
  ...
		ctx, cancelCtx = context.WithTimeout(r.Context(), h.dt)
		defer cancelCtx()
	...
	go func() {
    ...
		h.handler.ServeHTTP(tw, r)
	}()
	select {
    ...
	case <-ctx.Done():
		...
	}
}

12345678910111213141516           

場景四:HTTP伺服器的request互相傳遞資料

context還提供了valueCtx的資料結構。

這個valueCtx最經常使用的場景就是在一個http伺服器中,在request中傳遞一個特定值,比如有一個中間件,做cookie驗證,然後把驗證後的使用者名存放在request中。

我們可以看到,官方的request裡面是包含了Context的,并且提供了WithContext的方法進行context的替換。

package main

import (
	"net/http"
	"context"
)

type FooKey string

var UserName = FooKey("user-name")
var UserId = FooKey("user-id")

func foo(next http.HandlerFunc) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		ctx := context.WithValue(r.Context(), UserId, "1")
		ctx2 := context.WithValue(ctx, UserName, "yejianfeng")
		next(w, r.WithContext(ctx2))
	}
}

func GetUserName(context context.Context) string {
	if ret, ok := context.Value(UserName).(string); ok {
		return ret
	}
	return ""
}

func GetUserId(context context.Context) string {
	if ret, ok := context.Value(UserId).(string); ok {
		return ret
	}
	return ""
}

func test(w http.ResponseWriter, r *http.Request) {
	w.Write([]byte("welcome: "))
	w.Write([]byte(GetUserId(r.Context())))
	w.Write([]byte(" "))
	w.Write([]byte(GetUserName(r.Context())))
}

func main() {
	http.Handle("/", foo(test))
	http.ListenAndServe(":8080", nil)
}

12345678910111213141516171819202122232425262728293031323334353637383940414243444546           

在使用ValueCtx的時候需要注意一點,這裡的key不應該設定成為普通的String或者Int類型,為了防止不同的中間件對這個key的覆寫。最好的情況是每個中間件使用一個自定義的key類型,比如這裡的FooKey,而且擷取Value的邏輯盡量也抽取出來作為一個函數,放在這個middleware的同包中。這樣,就會有效避免不同包設定相同的key的沖突問題了。