天天看點

深入golang之---goroutine并發控制與通信通知多個子goroutine退出運作控制并發的方法參考連結

原文首發于我的個人部落格:深入golang之—goroutine并發控制與通信

開發go程式的時候,時常需要使用goroutine并發處理任務,有時候這些goroutine是互相獨立的,而有的時候,多個goroutine之間常常是需要同步與通信的。另一種情況,主goroutine需要控制它所屬的子goroutine,總結起來,實作多個goroutine間的同步與通信大緻有:

- 全局共享變量

- channel通信(CSP模型)

- Context包

本文章通過goroutine同步與通信的一個典型場景-通知子goroutine退出運作,來深入講解下golang的控制并發。

通知多個子goroutine退出運作

goroutine作為go語言的并發利器,不僅性能強勁而且使用友善:隻需要一個關鍵字go即可将普通函數并發執行,且goroutine占用記憶體極小(一個goroutine隻占2KB的記憶體),是以開發go程式的時候很多開發者常常會使用這個并發工具,獨立的并發任務比較簡單,隻需要用go關鍵字修飾函數就可以啟用一個goroutine直接運作;但是,實際的并發場景常常是需要進行協程間的同步與通信,以及精确控制子goroutine開始和結束,其中一個典型場景就是主程序通知名下所有子goroutine優雅退出運作。

由于goroutine的退出機制設計是,goroutine退出隻能由本身控制,不允許從外部強制結束該goroutine。隻有兩種情況例外,那就是main函數結束或者程式崩潰結束運作;是以,要實作主程序控制子goroutine的開始和結束,必須借助其它工具來實作。

控制并發的方法

實作控制并發的方式,大緻可分成以下三類:

- 全局共享變量

- channel通信

- Context包

全局共享變量

這是最簡單的實作控制并發的方式,實作步驟是:

1. 聲明一個全局變量;

2. 所有子goroutine共享這個變量,并不斷輪詢這個變量檢查是否有更新;

3. 在主程序中變更該全局變量;

4. 子goroutine檢測到全局變量更新,執行相應的邏輯。

示例如下:

package main

import (
    "fmt"
    "time"
)

func main() {
    running := true
    f := func() {
        for running {
            fmt.Println("sub proc running...")
            time.Sleep * time.Second)
        }
        fmt.Println("sub proc exit")
    }
    go f()
    go f()
    go f()
    time.Sleep * time.Second)
    running = false
    time.Sleep * time.Second)
    fmt.Println("main proc exit")
}
           

全局變量的優勢是簡單友善,不需要過多繁雜的操作,通過一個變量就可以控制所有子goroutine的開始和結束;缺點是功能有限,由于架構所緻,該全局變量隻能是多讀一寫,否則會出現資料同步問題,當然也可以通過給全局變量加鎖來解決這個問題,但那就增加了複雜度,另外這種方式不适合用于子goroutine間的通信,因為全局變量可以傳遞的資訊很小;還有就是主程序無法等待所有子goroutine退出,因為這種方式隻能是單向通知,是以這種方法隻适用于非常簡單的邏輯且并發量不太大的場景,一旦邏輯稍微複雜一點,這種方法就有點捉襟見肘。

channel通信

另一種更為通用且靈活的實作控制并發的方式是使用channel進行通信。

首先,我們先來了解下什麼是golang中的channel:Channel是Go中的一個核心類型,你可以把它看成一個管道,通過它并發核心單元就可以發送或者接收資料進行通訊(communication)。

要想了解 channel 要先知道 CSP 模型:

CSP 是 Communicating Sequential Process 的簡稱,中文可以叫做通信順序程序,是一種并發程式設計模型,由 Tony Hoare 于 1977 年提出。簡單來說,CSP 模型由并發執行的實體(線程或者程序)所組成,實體之間通過發送消息進行通信,這裡發送消息時使用的就是通道,或者叫 channel。CSP 模型的關鍵是關注 channel,而不關注發送消息的實體。Go 語言實作了 CSP 部分理論,goroutine 對應 CSP 中并發執行的實體,channel 也就對應着 CSP 中的 channel。

也就是說,CSP 描述這樣一種并發模型:多個Process 使用一個 Channel 進行通信, 這個 Channel 連結的 Process 通常是匿名的,消息傳遞通常是同步的(有别于 Actor Model)。

先來看示例代碼:

package main
import (
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)
func consumer(stop <-chan bool) {
    for {
        select {
        case <-stop:
            fmt.Println("exit sub goroutine")
            return
        default:
            fmt.Println("running...")
            time.Sleep * time.Millisecond)
        }
    }
}
func main() {
        stop := make(chan bool)
        var wg sync.WaitGroup
        // Spawn example consumers
        for i :=; i <; i++ {
            wg.Add)
            go func(stop <-chan bool) {
                defer wg.Done()
                consumer(stop)
            }(stop)
        }
        waitForSignal()
        close(stop)
        fmt.Println("stopping all jobs!")
        wg.Wait()
}
func waitForSignal() {
    sigs := make(chan os.Signal)
    signal.Notify(sigs, os.Interrupt)
    signal.Notify(sigs, syscall.SIGTERM)
    <-sigs
}
           

這裡可以實作優雅等待所有子goroutine完全結束之後主程序才結束退出,借助了标準庫sync裡的Waitgroup,這是一種控制并發的方式,可以實作對多goroutine的等待,官方文檔是這樣描述的:

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for.

Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

簡單來講,它的源碼裡實作了一個類似計數器的結構,記錄每一個在它那裡注冊過的協程,然後每一個協程完成任務之後需要到它那裡登出,然後在主程序那裡可以等待直至所有協程完成任務退出。

使用步驟:

1. 建立一個Waitgroup的執行個體wg;

2. 在每個goroutine啟動的時候,調用wg.Add(1)注冊;

3. 在每個goroutine完成任務後退出之前,調用wg.Done()登出。

4. 在等待所有goroutine的地方調用wg.Wait()阻塞程序,知道所有goroutine都完成任務調用wg.Done()登出之後,Wait()方法會傳回。

該示例程式是一種golang的select+channel的典型用法,我們來稍微深入一點分析一下這種典型用法:

channel

首先了解下channel,可以了解為管道,它的主要功能點是:

  1. 隊列存儲資料
  2. 阻塞和喚醒goroutine

channel 實作集中在檔案 runtime/chan.go 中,channel底層資料結構是這樣的:

type hchan struct {
    qcount   uint           // 隊列中資料個數
    dataqsiz uint           // channel 大小
    buf      unsafe.Pointer // 存放資料的環形數組
    elemsize uint16         // channel 中資料類型的大小
    closed   uint32         // 表示 channel 是否關閉
    elemtype *_type // 元素資料類型
    sendx    uint   // send 的數組索引
    recvx    uint   // recv 的數組索引
    recvq    waitq  // 由 recv 行為(也就是 <-ch)阻塞在 channel 上的 goroutine 隊列
    sendq    waitq  // 由 send 行為 (也就是 ch<-) 阻塞在 channel 上的 goroutine 隊列

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}
           

從源碼可以看出它其實就是一個隊列加一個鎖(輕量),代碼本身不複雜,但涉及到上下文很多細節,故而不易通讀,有興趣的同學可以去看一下,我的建議是,從上面總結的兩個功能點出發,一個是 ring buffer,用于存資料; 一個是存放操作(讀寫)該channel的goroutine 的隊列。

  • buf是一個通用指針,用于存儲資料,看源碼時重點關注對這個變量的讀寫
  • recvq 是讀操作阻塞在 channel 的 goroutine 清單,sendq 是寫操作阻塞在 channel 的 goroutine 清單。清單的實作是 sudog,其實就是一個對 g 的結構的封裝,看源碼時重點關注,是怎樣通過這兩個變量阻塞和喚醒goroutine的

由于涉及源碼較多,這裡就不再深入。

select

然後是select機制,golang 的 select 機制可以了解為是在語言層面實作了和 select, poll, epoll 相似的功能:監聽多個描述符的讀/寫等事件,一旦某個描述符就緒(一般是讀或者寫事件發生了),就能夠将發生的事件通知給關心的應用程式去處理該事件。 golang 的 select 機制是,監聽多個channel,每一個 case 是一個事件,可以是讀事件也可以是寫事件,随機選擇一個執行,可以設定default,它的作用是:當監聽的多個事件都阻塞住會執行default的邏輯。

select的源碼在runtime/select.go ,看的時候建議是重點關注 pollorder 和 lockorder

  • pollorder儲存的是scase的序号,亂序是為了之後執行時的随機性。
  • lockorder儲存了所有case中channel的位址,這裡按照位址大小堆排了一下lockorder對應的這片連續記憶體。對chan排序是為了去重,保證之後對所有channel上鎖時不會重複上鎖。

因為我對這部分源碼研究得也不是很深,故而點到為止即可,有興趣的可以去看看源碼啦!

具體到demo代碼:consumer為協程的具體代碼,裡面是隻有一個不斷輪詢channel變量stop的循環,是以主程序是通過stop來通知子協程何時該結束運作的,在main方法中,close掉stop之後,讀取已關閉的channel會立刻傳回該channel資料類型的零值,是以子goroutine裡的<-stop操作會馬上傳回,然後退出運作。

事實上,通過channel控制子goroutine的方法可以總結為:循環監聽一個channel,一般來說是for循環裡放一個select監聽channel以達到通知子goroutine的效果。再借助Waitgroup,主程序可以等待所有協程優雅退出後再結束自己的運作,這就通過channel實作了優雅控制goroutine并發的開始和結束。

channel通信控制基于CSP模型,相比于傳統的線程與鎖并發模型,避免了大量的加鎖解鎖的性能消耗,而又比Actor模型更加靈活,使用Actor模型時,負責通訊的媒介與執行單元是緊耦合的–每個Actor都有一個信箱。而使用CSP模型,channel是第一對象,可以被獨立地建立,寫入和讀出資料,更容易進行擴充。

殺器Context

Context通常被譯作上下文,它是一個比較抽象的概念。在讨論鍊式調用技術時也經常會提到上下文。一般了解為程式單元的一個運作狀态、現場、快照,而翻譯中上下又很好地诠釋了其本質,上下則是存在上下層的傳遞,上會把内容傳遞給下。在Go語言中,程式單元也就指的是Goroutine。

每個Goroutine在執行之前,都要先知道程式目前的執行狀态,通常将這些執行狀态封裝在一個Context變量中,傳遞給要執行的Goroutine中。上下文則幾乎已經成為傳遞與請求同生存周期變量的标準方法。在網絡程式設計下,當接收到一個網絡請求Request,在處理這個Request的goroutine中,可能需要在目前gorutine繼續開啟多個新的Goroutine來擷取資料與邏輯處理(例如通路資料庫、RPC服務等),即一個請求Request,會需要多個Goroutine中處理。而這些Goroutine可能需要共享Request的一些資訊;同時當Request被取消或者逾時的時候,所有從這個Request建立的所有Goroutine也應該被結束。

context在go1.7之後被引入到标準庫中,1.7之前的go版本使用context需要安裝golang.org/x/net/context包,關于golang context的更詳細說明,可參考官方文檔:context

Context初試

Context的建立和調用關系是層層遞進的,也就是我們通常所說的鍊式調用,類似資料結構裡的樹,從根節點開始,每一次調用就衍生一個葉子節點。首先,生成根節點,使用context.Background方法生成,而後可以進行鍊式調用使用context包裡的各類方法,context包裡的所有方法:

- func Background() Context

- func TODO() Context

- func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

- func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

- func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

- func WithValue(parent Context, key, val interface{}) Context

這裡僅以WithCancel和WithValue方法為例來實作控制并發和通信:

話不多說,上碼:

package main

import (
    "context"
    "crypto/md5"
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
    "time"
)

type favContextKey string

func main() {
    wg := &sync.WaitGroup{}
    values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"}
    ctx, cancel := context.WithCancel(context.Background())

    for _, url := range values {
        wg.Add)
        subCtx := context.WithValue(ctx, favContextKey("url"), url)
        go reqURL(subCtx, wg)
    }

    go func() {
        time.Sleep(time.Second *)
        cancel()
    }()

    wg.Wait()
    fmt.Println("exit main goroutine")
}

func reqURL(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    url, _ := ctx.Value(favContextKey("url")).(string)
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("stop getting url:%s\n", url)
            return
        default:
            r, err := http.Get(url)
            if r.StatusCode == http.StatusOK && err == nil {
                body, _ := ioutil.ReadAll(r.Body)
                subCtx := context.WithValue(ctx, favContextKey("resp"), fmt.Sprintf("%s%x", url, md5.Sum(body)))
                wg.Add)
                go showResp(subCtx, wg)
            }
            r.Body.Close()
            //啟動子goroutine是為了不阻塞目前goroutine,這裡在實際場景中可以去執行其他邏輯,這裡為了友善直接sleep一秒
            // doSometing()
            time.Sleep(time.Second *)
        }
    }
}

func showResp(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop showing resp")
            return
        default:
            //子goroutine裡一般會處理一些IO任務,如讀寫資料庫或者rpc調用,這裡為了友善直接把資料列印
            fmt.Println("printing ", ctx.Value(favContextKey("resp")))
            time.Sleep(time.Second *)
        }
    }
}
           

前面我們說過Context就是設計用來解決那種多個goroutine處理一個Request且這多個goroutine需要共享Request的一些資訊的場景,以上是一個簡單模拟上述過程的demo。

首先調用context.Background()生成根節點,然後調用withCancel方法,傳入根節點,得到新的子Context以及根節點的cancel方法(通知所有子節點結束運作),這裡要注意:該方法也傳回了一個Context,這是一個新的子節點,與初始傳入的根節點不是同一個執行個體了,但是每一個子節點裡會儲存從最初的根節點到本節點的鍊路資訊 ,才能實作鍊式。

程式的reqURL方法接收一個url,然後通過http請求該url獲得response,然後在目前goroutine裡再啟動一個子groutine把response列印出來,然後從ReqURL開始Context樹往下衍生葉子節點(每一個鍊式調用新産生的ctx),中間每個ctx都可以通過WithValue方式傳值(實作通信),而每一個子goroutine都能通過Value方法從父goroutine取值,實作協程間的通信,每個子ctx可以調用Done方法檢測是否有父節點調用cancel方法通知子節點退出運作,根節點的cancel調用會沿着鍊路通知到每一個子節點,是以實作了強并發控制,流程如圖:

深入golang之---goroutine并發控制與通信通知多個子goroutine退出運作控制并發的方法參考連結

該demo結合前面說的WaitGroup實作了優雅并發控制和通信,關于WaitGroup的原理和使用前文已做解析,這裡便不再贅述,當然,實際的應用場景不會這麼簡單,處理Request的goroutine啟動多個子goroutine大多是處理IO密集的任務如讀寫資料庫或rpc調用,然後在主goroutine中繼續執行其他邏輯,這裡為了友善講解做了最簡單的處理。

Context作為golang中并發控制和通信的大殺器,被廣泛應用,一些使用go開發http服務的同學如果閱讀過這些很多 web framework的源碼就知道,Context在web framework随處可見,因為http請求處理就是一個典型的鍊式過程以及并發場景,是以很多web framework都會借助Context實作鍊式調用的邏輯。有興趣可以讀一下context包的源碼,會發現Context的實作其實是結合了Mutex鎖和channel而實作的,其實并發、同步的很多進階元件萬變不離其宗,都是通過最底層的資料結構組裝起來的,隻要知曉了最基礎的概念,上遊的架構也可以一目了然。

context使用規範

最後,Context雖然是神器,但開發者使用也要遵循基本法,以下是一些Context使用的規範:

- Do not store Contexts inside a struct type; instead, pass a Context explicitly to each function that needs it. The Context should be the first parameter, typically named ctx;不要把Context存在一個結構體當中,顯式地傳入函數。Context變量需要作為第一個參數使用,一般命名為ctx;

  • Do not pass a nil Context, even if a function permits it. Pass context.TODO if you are unsure about which Context to use;即使方法允許,也不要傳入一個nil的Context,如果你不确定你要用什麼Context的時候傳一個context.TODO;
  • Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions;使用context的Value相關方法隻應該用于在程式和接口中傳遞的和請求相關的中繼資料,不要用它來傳遞一些可選的參數;
  • The same Context may be passed to functions running in different goroutines; Contexts are safe for simultaneous use by multiple goroutines;同樣的Context可以用來傳遞到不同的goroutine中,Context在多個goroutine中是安全的;

參考連結

  • [1] https://deepzz.com/post/golang-context-package-notes.html
  • [2] http://www.flysnow.org/2017/05/12/go-in-action-go-context.html
  • [3] https://golang.org/pkg/context/
  • [4]http://www.moye.me/2017/05/05/go-concurrency-patterns/