天天看點

golang 使用 context 進行并發控制(轉)

1. 前言

context翻譯成中文是”上下文”,即它可以控制一組呈樹狀結構的goroutine,由于goroutine派生出子goroutine,而子goroutine又繼續派生新的goroutine,這種情況下使用WaitGroup就不太容易,因為子goroutine個數不容易确定,甚至如果需要定時取消又怎麼辦呢?。而使用context就可以很容易實作。比如一個網絡請求 Request,每個 Request 都需要開啟一個 goroutine 做一些事情,這些 goroutine 又可能會開啟其他的 goroutine

Context 對象是線程安全的,你可以把一個 Context 對象傳遞給任意個數的 gorotuine,對它執行取消操作時,所有 goroutine 都會接收到取消信号。

context 包主要是用來處理多個 goroutine 之間共享資料,及多個 goroutine 的管理。快速終止所有基于目前 context 派生出來的協程,隻是手動終止和定時終止。

2 Context 實作原理

context實際上隻定義了接口,凡是實作該接口的類都可稱為是一種context,官方包中實作了幾個常用的context,分别可用于不同的場景。

2.1 接口定義

源碼包中src/context/context.go:Context 定義了該接口:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
 
    Done() <-chan struct{}
 
    Err() error
 
    Value(key interface{}) interface{}
}      

基礎的context接口隻定義了4個方法,下面分别簡要說明一下:

Deadline()

該方法擷取設定的截止時間,傳回值 deadline 是截止時間,到了這個時間,Context 會自動發起取消請求,傳回值 ok 表示是否設定了截止時間。

Done() <-chan struct{}

該方法傳回一個channel,需要在select-case語句中使用,如case <-context.Done():。

當context關閉後,Done()傳回一個被關閉的管道,關閉的管道仍然是可讀的,據此goroutine可以收到關閉請求;當context還未關閉時,Done()傳回nil。

Err()

在Done() 之後,傳回context 取消的原因。

Value(key interface{}) interface{}

擷取 Context 上綁定的值,是一個鍵值對,通過 key 來擷取對應的值。

2.2 emptyCtx

context包中定義了一個空的context, 名為emptyCtx,用于context的根節點,空的context隻是簡單的實作了Context,本身不包含任何值,僅用于其他context的父節點。emptyCtx沒有逾時時間,不能取消,也不能存儲任何額外資訊,是以emptyCtx用來作為 context 樹的根節點。

emptyCtx類型定義如下代碼所示:

type emptyCtx int   // 定義一個int類型,通過實作context的四個方法來實作context接口
 
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}
 
func (*emptyCtx) Done() <-chan struct{} {
    return nil
}
 
func (*emptyCtx) Err() error {
    return nil
}
 
func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}      

context包中定義了兩個公用的 emptCtx 全局變量(background 、todo),分别通過調用 ​

​Background()​

​​ 和 ​

​TODO()​

​ 方法得到,但這兩個 context 在實作上是一樣的,Background() 實作代碼如下所示:

var background = new(emptyCtx)
func Background() Context {
    return background
}      

「Background和TODO方法差別:」

Background和TODO隻是用于不同場景下:Background通常被用于主函數、初始化以及測試中,作為一個頂層的context,也就是說一般我們建立的context都是基于Background;而TODO是在不确定使用什麼context的時候才會使用。

生成樹的函數

可以通過 「context。Background()」 擷取一個根節點 Context。

有了根節點後,再使用以下四個函數來生成 Context 樹:

  • 「WithCancel(parent Context)」:生成一個可取消的 Context。
  • 「WithDeadline(parent Context, d time.Time)」:生成一個可定時取消的 Context,參數 d 為定時取消的具體時間。
  • 「WithTimeout(parent Context, timeout time.Duration)」:生成一個可逾時取消的 Context,參數 timeout 用于設定多久後取消
  • 「WithValue(parent Context, key, val interface{})」:生成一個可攜帶 key-value 鍵值對的 Context。

context包中實作Context接口的struct,除了emptyCtx外,還有cancelCtx、timerCtx和valueCtx三種,正是基于這三種context執行個體,實作了上述4種類型的context。struct cancelCtx、timerCtx、valueCtx都繼承于Context,下面分别介紹這三個struct。

2.3 cancelCtx

源碼包中src/context/context.go:cancelCtx 定義了該類型context:

type cancelCtx struct {
    Context
 
    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}      

children中記錄了由此context派生的所有child,此context被cancel時會把其中的所有child都cancel掉。

cancelCtx與deadline和value無關,是以隻需要實作Done()和Err()外露接口即可。

2.3.1 Done()接口實作

按照Context定義,Done()接口隻需要傳回一個channel即可,對于cancelCtx來說隻需要傳回成員變量done即可。

這裡直接看下源碼,非常簡單:

func (c *cancelCtx) Done() <-chan struct{} {
    c.mu.Lock()
    if c.done == nil {
        c.done = make(chan struct{})
    }
    d := c.done
    c.mu.Unlock()
    return d
}      

由于cancelCtx沒有指定初始化函數,是以cancelCtx.done可能還未配置設定,是以需要考慮初始化。

cancelCtx.done會在context被cancel時關閉,是以cancelCtx.done的值一般經曆如下三個階段:nil –> chan struct{} –> closed chan。

2.3.2 Err()接口實作

按照Context定義,Err()隻需要傳回一個error告知context被關閉的原因。對于cancelCtx來說隻需要傳回成員變量err即可。

源碼如下:

func (c *cancelCtx) Err() error {
    c.mu.Lock()
    err := c.err
    c.mu.Unlock()
    return err
}      

2.3.3 cancel()接口實作

cancel()内部方法是了解cancelCtx的最關鍵的方法,其作用是關閉自己和其後代,其後代存儲在cancelCtx.children的map中,其中key值即後代對象,value值并沒有意義,這裡使用map隻是為了友善查詢而已。

cancel方法實作僞代碼如下所示:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    c.mu.Lock()
 
    c.err = err                          //設定一個error,說明關閉原因
    close(c.done)                     //将channel關閉,以此通知派生的context
 
    for child := range c.children {   //周遊所有children,逐個調用cancel方法
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()
 
    if removeFromParent {            //正常情況下,需要将自己從parent删除
        removeChild(c.Context, c)
    }
}      

實際上,WithCancel()傳回的第二個用于cancel context的方法正是此cancel()。

2.3.4 WithCancel()方法實作

WithCancel()方法作了三件事:

1.初始化一個cancelCtx執行個體

2.将cancelCtx執行個體添加到其父節點的children中(如果父節點也可以被cancel的話)

3.傳回cancelCtx執行個體和cancel()方法

其實作源碼如下所示:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)   //将自身添加到父節點
    return &c, func() { c.cancel(true, Canceled) }
}      

這裡将自身添加到父節點的過程有必要簡單說明一下:

如果父節點也支援cancel,也就是說其父節點肯定有children成員,那麼把新context添加到children裡即可;

如果父節點不支援cancel,就繼續向上查詢,直到找到一個支援cancel的節點,把新context添加到children裡;

如果所有的父節點均不支援cancel,則啟動一個協程等待父節點結束,然後再把目前context結束。

2.3.5 使用案例

一個典型的使用cancel context的例子如下所示:

package main
 
import (
    "fmt"
    "time"
    "context"
)
 
func HandelRequest(ctx context.Context) {
    go WriteRedis(ctx)                      // 子協程A建立子協程B
    go WriteDatabase(ctx)                   // 子協程A建立子協程C
    
    for {
        select {
        case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running")
            time.Sleep(2 * time.Second)
        }
    }
}
 
func WriteRedis(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("WriteRedis Done.")
            return
        default:
            fmt.Println("WriteRedis running")
            time.Sleep(2 * time.Second)
        }
    }
}
 
func WriteDatabase(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("WriteDatabase Done.")
            return
        default:
            fmt.Println("WriteDatabase running")
            time.Sleep(2 * time.Second)
        }
    }
}
 
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go HandelRequest(ctx)      // 主線程下建立子協程A
 
    time.Sleep(5 * time.Second)
    fmt.Println("It's time to stop all sub goroutines!")
    cancel()
 
    //Just for test whether sub goroutines exit or not
    time.Sleep(5 * time.Second)
}      

上面代碼中協程HandelRequest()用于處理某個請求,其又會建立兩個協程:WriteRedis()、WriteDatabase(),main協程建立context,并把context在各子協程間傳遞,main協程在适當的時機可以cancel掉所有子協程。

程式輸出如下所示:

WriteDatabase running
WriteRedis running
HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest running
HandelRequest running
WriteDatabase running
WriteRedis running
It's time to stop all sub goroutines!
WriteDatabase Done.
WriteRedis Done.
HandelRequest Done.      

2.4 timerCtx

源碼包中src/context/context.go:timerCtx 定義了該類型context:

type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.
 
    deadline time.Time
}      

timerCtx在cancelCtx基礎上增加了deadline用于标示自動cancel的最終時間,而timer就是一個觸發自動cancel的定時器。

由此,衍生出WithDeadline()和WithTimeout()。實作上這兩種類型實作原理一樣,隻不過使用語境不一樣:

  • deadline: 指定最後期限,比如context将2018.10.20 00:00:00之時自動結束
  • timeout: 指定最長存活時間,比如context将在30s後結束。

對于接口來說,timerCtx在cancelCtx基礎上還需要實作Deadline()和cancel()方法,其中cancel()方法是重寫的。

2.4.1 Deadline()接口實作

Deadline()方法僅僅是傳回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或WithTimeout()方法設定的。

2.4.2 cancel()接口實作

cancel()方法基本繼承cancelCtx,隻需要額外把timer關閉。

timerCtx被關閉後,timerCtx.cancelCtx.err将會存儲關閉原因:

  • 如果deadline到來之前手動關閉,則關閉原因與cancelCtx顯示一緻;
  • 如果deadline到來時自動關閉,則原因為:”context deadline exceeded”

2.4.3 WithDeadline()方法實作

WithDeadline()方法實作步驟如下:

  • 初始化一個timerCtx執行個體
  • 将timerCtx執行個體添加到其父節點的children中(如果父節點也可以被cancel的話)
  • 啟動定時器,定時器到期後會自動cancel本context
  • 傳回timerCtx執行個體和cancel()方法

也就是說,timerCtx類型的context不僅支援手動cancel,也會在定時器到來後自動cancel。

2.4.4 WithTimeout()方法實作

WithTimeout()實際調用了WithDeadline,二者實作原理一緻。

看代碼會非常清晰:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}      

2.4.5 典型使用案例

下面例子中使用WithTimeout()獲得一個context并在其子協程中傳遞:

package main
 
import (
    "fmt"
    "time"
    "context"
)
 
func HandelRequest(ctx context.Context) {
    go WriteRedis(ctx)
    go WriteDatabase(ctx)
    for {
        select {
        case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running")
            time.Sleep(2 * time.Second)
        }
    }
}
 
func WriteRedis(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("WriteRedis Done.")
            return
        default:
            fmt.Println("WriteRedis running")
            time.Sleep(2 * time.Second)
        }
    }
}
 
func WriteDatabase(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("WriteDatabase Done.")
            return
        default:
            fmt.Println("WriteDatabase running")
            time.Sleep(2 * time.Second)
        }
    }
}
 
func main() {
    ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second) // 逾時5S自動cancel
    go HandelRequest(ctx)
 
    time.Sleep(10 * time.Second)
}      

主協程中建立一個10s逾時的context,并将其傳遞給子協程,10s自動關閉context。程式輸出如下:

HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest Done.
WriteDatabase Done.
WriteRedis Done.      

2.5 valueCtx

源碼包中src/context/context.go:valueCtx 定義了該類型context:

type valueCtx struct {
    Context
    key, val interface{}
}      

valueCtx隻是在Context基礎上增加了一個key-value對,用于在各級協程間傳遞一些資料。

由于valueCtx既不需要cancel,也不需要deadline,那麼隻需要實作Value()接口即可。

2.5.1 Value()接口實作

由valueCtx資料結構定義可見,valueCtx.key和valueCtx.val分别代表其key和value值。 實作也很簡單:

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
        return c.val
    }
    return c.Context.Value(key)
}      

這裡有個細節需要關注一下,即目前context查找不到key時,會向父節點查找,如果查詢不到則最終傳回interface{}。也就是說,可以通過子context查詢到父的value值。

2.5.2 WithValue()方法實作

WithValue()實作也是非常的簡單, 僞代碼如下:

func WithValue(parent Context, key, val interface{}) Context {
    if key == nil {
        panic("nil key")
    }
    return &valueCtx{parent, key, val}
}      

2.5.3 使用案例

下面示例程式展示valueCtx的用法:

package main
 
import (
    "fmt"
    "time"
    "context"
)
 
func HandelRequest(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter"))
            time.Sleep(2 * time.Second)
        }
    }
}
 
func main() {
    ctx := context.WithValue(context.Background(), "parameter", "1")  // 傳遞值
    go HandelRequest(ctx)
 
    time.Sleep(10 * time.Second)
}      

上例main()中通過WithValue()方法獲得一個context,需要指定一個父context、key和value。然後通将該context傳遞給子協程HandelRequest,子協程可以讀取到context的key-value。

注意:本例中子協程無法自動結束,因為context是不支援cancle的,也就是說<-ctx.Done()永遠無法傳回。如果需要傳回,需要在建立context時指定一個可以cancel的context作為父節點,使用父節點的cancel()在适當的時機結束整個context。

3. 總結

Context僅僅是一個接口定義,根據實作的不同,可以衍生出不同的context類型;

cancelCtx實作了Context接口,通過WithCancel()建立cancelCtx執行個體;

timerCtx實作了Context接口,通過WithDeadline()和WithTimeout()建立timerCtx執行個體;

valueCtx實作了Context接口,通過WithValue()建立valueCtx執行個體;

三種context執行個體可互為父節點,進而可以組合成不同的應用形式;

4. Context 使用原則

Context 不要放在結構體中,需要以參數方式傳遞

Context 作為函數參數時,要放在第一位,作為第一個參數

使用 context。Background 函數生成根節點的 Context

Context 要傳值必要的值,不要什麼都傳

Context 是多協程安全的,可以在多個協程中使用

繼續閱讀