天天看點

Go語言基礎6 - 并發概述

概述

我們将用幾節來學習Go語言基礎,本文結構如下:

1. 并發
  通過通信共享記憶體
  Go程
  信道
  信道中的信道
  并行化
  可能洩露的緩沖區
2. 錯誤
  Panic
  恢複           

複制

1. 并發

1.1 通過通信共享記憶體

在并發程式設計中,為實作對共享變量的正确通路需要精确的控制,這在多數環境下都很困難。

實際上,多個獨立執行的線程從不會主動共享。Go語言另辟蹊徑,它将共享的值通過信道傳遞, 在任意給定的時間點,隻有一個Go程能夠通路該值,資料競争從設計上就被杜絕了。

例如,引用計數通過為整數變量添加互斥鎖來很好地實作。 取而代之的是,通過信道來控制通路能夠讓你寫出更簡潔的程式。

Go将它簡化為一句口号:

不要通過共享記憶體來通信,而應通過通信來共享記憶體。

1.2 Go程

Go程具有簡單的模型:

  • 它是與其它Go程并發運作在同一位址空間的函數。
  • 它是輕量級的, 消耗幾乎就隻有棧空間的配置設定。
  • 而且棧最開始是非常小的,是以它們很廉價, 僅在需要時才會随着堆空間的配置設定(和釋放)而變化。

Go程在多線程作業系統上可實作多路複用,是以若一個線程阻塞,比如說等待I/O, 那麼其它的線程就會運作。

Go程的設計隐藏了線程建立和管理的諸多複雜性。

在函數或方法前添加 go 關鍵字能夠在新的Go程中調用它。當調用完成後, 該Go程也會安靜地退出,示例:

go list.Sort()  // 并發運作 list.Sort,無需等它結束。           

複制

函數字面在Go程調用中非常有用。

備注:可了解 為匿名函數的調用。下面的方法先聲明了一個匿名方法,然後立即調用。

func Announce(message string, delay time.Duration) {
      go func() {
        time.Sleep(delay)
        fmt.Println(message)
    }()  // 注意括号 - 必須調用該函數。
}           

複制

在Go中,函數字面都是閉包:其實作在保證了函數内引用變量的生命周期與函數的活動時間相同。

1.3 信道( chan )

1.3.1 格式: make(chan int)

信道與映射一樣,也需要通過 make 來配置設定記憶體,make 後的傳回值是對底層資料結構的引用。

若提供了一個可選的整數形參,它就會為該信道設定緩沖區大小。

緩沖區大小的預設值是零,表示不帶緩沖的或同步的信道。

示例:

ci := make(chan int)            // 整數類型的無緩沖信道
cj := make(chan int, 0)         // 整數類型的無緩沖信道
cs := make(chan *os.File, 100)  // 指向檔案指針的帶緩沖信道           

複制

無緩沖信道在通信時會同步交換資料,它能確定(兩個Go程的)計算處于确定狀态。

1.3.2 阻塞等待Go程( 無緩沖區的示例 )

示例:使用 go 程,在背景啟動了排序操作,等待排序完成。

c := make(chan int)  // 配置設定一個信道
// 在Go程中啟動排序。當它完成後,在信道上發送信号。
go func() {
    list.Sort()
    c <- 1  // 發送信号,什麼值無所謂。
}()
doSomethingForAWhile()
<-c   // 等待排序結束,丢棄發來的值。           

複制

  • 接收者在收到資料前會一直阻塞。
  • 若信道是不帶緩沖的,那麼在接收者收到值前, 發送者會一直阻塞;
  • 若信道是帶緩沖的,則發送者僅在值被複制到緩沖區前阻塞;
  • 若緩沖區已滿,發送者會一直等待直到某個接收者取出一個值為止。

1.3.3 控制吞吐量的例子( 帶緩沖的示例 )

帶緩沖的信道可被用作信号量。例如限制吞吐量。

示例:

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {               
    sem <- 1 // 等待活動隊列清空。            #2  占據,阻塞
    process(r)  // 可能需要很長時間。
    <-sem    // 完成;使下一個請求可以運作。    #3  解除占據
  }

  func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // 無需等待 handle 結束。 #1 每個請求對應一個 Go程
    }
  }           

複制

上面的例子中:

  • 進入的請求 req 會被傳遞給 handle。
  • handle 中 #2 等待一個信号繼續(當緩沖區滿時)
  • handle 中 #3 後,發送信号,使得 被阻塞的另一個 go程 開始進入到process
  • 信道緩沖區的容量決定了同時調用 process 的數量上限

備注:

這個示例一次開始了全部多個go程,然後根據緩沖區大小阻塞等待,當緩沖區可以進入時繼續進行。

1.3.4 繼續改良的例子( 采用匿名方法 )

若請求來得很快, 上面的程式就會無限地消耗資源。為了彌補這種不足,我們可以通過修改 Serve 來限制建立Go程:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // 這兒有Bug,解釋見下。
            <-sem
        }()
    }
  }           

複制

Bug出現在Go的 for 循環中,該循環變量在每次疊代時會被重用,是以 req 變量會在所有的Go程間共享,這不是我們想要的。我們需要確定 req 對于每個Go程來說都是唯一的。

有一種方法能夠做到,就是将 req 的值作為實參傳入到該Go程的閉包中:

func Serve(queue chan *Request) {

for req := range queue {

sem <- 1

go func(req *Request) {

process(req)

<-sem

}(req)

}

}

閉包的處理

比較前後兩個版本,觀察該閉包聲明和運作中的差别。 另一種解決方案就是以相同的名字建立新的變量,如例中所示:

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // 為該Go程建立 req 的新執行個體。
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}           

複制

它的寫法看起來有點奇怪

req := req           

複制

但在Go中這樣做是合法且慣用的。你用相同的名字獲得了該變量的一個新的版本, 以此來局部地刻意屏蔽循環變量,使它對每個Go程保持唯一。

1.3.5 固定資料的go程,同時讀取

另一種管理資源的好方法:

  • 啟動固定數量的 handle Go程,一起從請求信道中讀取資料。
  • Go程的數量限制了同時調用 process 的數量。
  • Serve 同樣會接收一個通知退出的信道, 在啟動所有Go程後,它将阻塞并暫停從信道中接收消息。

    func handle(queue chan *Request) {

    for r := range queue {

    process(r)

    }

    }

    func Serve(clientRequests chan *Request, quit chan bool) {

    // 啟動處理程式,固定數量

    for i := 0; i < MaxOutstanding; i++ {

    go handle(clientRequests)

    }

    <-quit // 等待通知退出。

    }

1.4 信道中的信道

這種特性通常被用來實作安全、并行的多路分解。

在上一節的例子中,handle 是個非常理想化的請求處理程式, 但我們并未定義它所處理的請求類型。若該類型包含一個可用于回複的信道, 那麼每一個用戶端都能為其回應提供自己的路徑。以下為 Request 類型的大概定義。

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}           

複制

用戶端提供了一個函數及其實參,此外在請求對象中還有個接收應答的信道。

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 發送請求
clientRequests <- request
// 等待回應
fmt.Printf("answer: %d\n", <-request.resultChan)           

複制

服務端的處理

On the server side, the handler function is the only thing that changes.

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}           

複制

1.5 并行化

這些設計的另一個應用是在多CPU核心上實作并行計算。如果計算過程能夠被分為幾塊 可獨立執行的過程,它就可以在每塊計算結束時向信道發送信号,進而實作并行處理。

1.6 可能洩露的緩沖區

--

2. 錯誤

Go語言具有多值傳回特性, 使得它可以在傳回正常的值,和詳細的錯誤描述。

按照約定,錯誤的類型通常為 error,這是一個内建的簡單接口。

type error interface {
    Error() string
}           

複制

庫的編寫者通過更豐富的底層模型可以輕松實作這個接口,這樣不僅能看見錯誤, 還能提供一些上下文。

例如,os.Open 可傳回一個 os.PathError。

/* 定義結構體 */
// PathError 記錄一個錯誤以及産生該錯誤的路徑和操作。
type PathError struct {
    Op string    // "open"、"unlink" 等等。
    Path string  // 相關聯的檔案。
    Err error    // 由系統調用傳回。
}

/* 實作 Error接口 */
func (e *PathError) Error() string {
    return e.Op + " " + e.Path + ": " + e.Err.Error()
}           

複制

這樣,PathError的 Error 會生成如下錯誤資訊:

open /etc/passwx: no such file or directory           

複制

錯誤字元串應盡可能地指明它們的來源,解釋清楚錯誤的情況。

若調用者想知道更多細節,可使用類型選擇或者類型斷言來檢視特定錯誤,和處理。

for try := 0; try < 2; try++ {
    file, err = os.Create(filename)
    if err == nil {
        return
    }
    if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
        deleteTempFiles()  // 恢複一些空間。
        continue
    }
    return
}           

複制

上面的第5行,即第2條 if 是另一種類型斷言。

若它失敗, ok 将為 false,而 e 則為nil. 若它成功, ok 将為 true

2.1 Panic

有時程式就是不能繼續運作。為此,可以使用内建的 panic 函數,它會産生一個運作時錯誤并終止程式。

該函數接受一個任意類型的實參(一般為字元串),并在程式終止時列印輸出。格式:

Panic( 字元串 )           

複制

實際使用中,庫函數應避免 panic。若問題可以被屏蔽或解決, 最好就是讓程式繼續運作,而不是終止。

一個反例的情況就是初始化中: 若某個庫真的不能讓自己工作,那就觸發Panic 吧,比如

var user = os.Getenv("USER")

    func init() {
      if user == "" {
      panic("no value for $USER")
    }
}           

複制

2.2 恢複

當 panic 被調用後, 程式将立刻終止目前函數的執行,并開始回溯Go程的棧,運作任何被推遲的函數。 若回溯到達Go程棧的頂端,程式就會終止。

//  我自己畫的不太嚴謹的圖例,幫助了解。
 //  假如在 main 函數裡調用了 方法1,在 方法1 裡又調用了 方法2
  |                              |
  |                              |
  | #4           方法2            |        // 假如在這裡觸發了 Panic
  | #3           方法2的defer     |    //在 defer 時,仍然有機會調用 recover函數來恢複
  | #2      方法1                |
  | #1   main                    |    //到這裡就程式終止了
  -------------------------------           

複制

不過我們可以用内建的 recover 函數來 取回Go程的控制權限 并使其恢複正常執行。

調用 recover函數 将停止回溯過程,它的傳回值是錯誤資訊(實際是調用 panic 函數時的參數)。

由于在回溯時,隻有被推遲的函數( defer )在運作,是以 recover 隻能在被推遲(defer)的函數中才有效。

在 Go程 内通過 recover 來終止失敗的Go程,而無需讓整個程式崩潰。

先看示例代碼:

func server(workChan <-chan *Work) {
    for work := range workChan {
        go safelyDo(work)
    }
}

func safelyDo(work *Work) {
    defer func() {
        if err := recover(); err != nil {
            log.Println("work failed:", err)
        }
    }()
    do(work)
}           

複制

在此例中,若 do(work) 觸發了Panic,其結果就會被記錄(列印輸出), 而該Go程會被幹淨利落地結束,不會幹擾到其它Go程。我們無需在推遲的閉包中做任何事情, recover 會處理好這一切。