天天看點

深入 Go 并發原語 — Channel 底層實作

作為 Go 并發原語的第一篇文章,一定繞不開 Go 的并發哲學。從 Tony Hoare 寫的 Communicating Sequential Processes 這篇文章說起,這篇經典論文算是 Go 語言并發原語的根基。

一. What is CSP

CSP 的全程是 Communicating Sequential Processes,直譯,通信順序程序。這一概念起源自 1978 年 ACM 期刊中 Charles Antony Richard Hoare 寫的經典同名論文。感興趣的讀者可以看 Reference 中的第一個連結看原文。在這篇文章中,Hoare 在文中用 CSP 來描述通信順序程序能力,姑且認為這是一個虛構的程式設計語言。該語言描述了并發過程之間的互動作用。從曆史上看,軟體的進步主要依靠硬體的改進,這些改進可以使 CPU 更快,記憶體更大。Hoare 認識到,想通過硬體提高使得代碼運作速度快 10 倍,需要付出 10 倍以上的機器資源。這并沒有從根本改善問題。

1. 術語和一些例子

盡管并發相對于傳統的順序程式設計具有許多優勢,但由于其會出錯的性質,它未能獲得廣泛的歡迎。Hoare 借助 CSP 引入了一種精确的理論,可以在數學上保證程式擺脫并發的常見問題。Hoare 在他的 Learning CSP(這是計算機科學中引用第三多的神書!)一書中,使用“程序微積分”來表明可以處理死鎖和不确定性,就像它們是普通程序中的最終事件一樣。程序微積分是一種對并發系統進行數學化模組化的方式,并且提供了代數法則來進行這些系統的變換來分析它們的不同屬性,并發和效率。

為了防止資料被多線程破壞,Hoare 提出了臨界區的概念。程序進入臨界區後可以獲得對共享資料的通路。在進入臨界區之前,所有其他的程序必須驗證和更新這一共享變量的值。退出臨界區時,程序必須再次驗證所有程序具有相同的值。

保持資料完整性的另一種技術是通過使用互斥信号量或互斥量。互斥鎖是信号量的特定子類,它僅允許一個程序一次通路該變量。信号量是一個受限制的通路變量,它是防止并發中競争的經典解決方案。其他嘗試通路該互斥鎖的程序将被阻止,并且必須等待直到目前程序釋放該互斥鎖。釋放互斥鎖後,隻有一個等待的程序可以通路該變量,所有其他程序繼續等待。

1970年代初期,Hoare 基于互斥量的概念開發了一種稱為螢幕的概念。根據 IBM 編寫的 Java 程式設計語言 CSP 教程:

“A monitor is a body of code whose access is guarded by a mutex. Any process wishing to execute this code must acquire the associated mutex at the top of the code block and release it at the bottom. Because only one thread can own a mutex at a given time, this effectively ensures that only the owing thread can execute a monitor block of code.”

monitor 可以幫助防止資料被破壞和線程死鎖。在 CSP 論文中為了說明清楚程序之間的通信,Hoare 利用 ?和 !号代表了輸入和輸出。!代表發送輸入到一個程序,?号代表讀取一個程序的輸出。每個指令需要指定具體是一個輸出變量(從一個程序中讀取一個變量的情況),還是目的地(将輸入發送到一個程序的情況)。一個程序的輸出應該直接流向另一個程序的輸入。

深入 Go 并發原語 — Channel 底層實作

上圖是從 CSP 文章中截圖的一些例子,Hoare 簡單的舉了下面這個例子:

Go

[c:character; west?c ~ east!c]      

上述代碼的意思是讀取 west 輸出的所有字元,然後把它們一個個的輸出到 east 中。這個過程不斷的重複,直到 west 終止。從描述上看,這一特性完完全全是 channel 的雛形。

2. 哲學家問題

文章的最後,回到了經典的哲學家問題。

深入 Go 并發原語 — Channel 底層實作

在哲學家問題中,Hoare 将 philosopher 的行為描述如下:

Go

PHIL = *[... during ith lifetime ... --->,
THINK;
room!enter( );
fork(0!pickup( ); fork((/+ 1) rood 5)!pickup( );
EAT;
fork(i)!putdown( ); fork((/+ 1) mod 5)!putdown( );
room!exit( )
]      

每個叉子由坐在兩邊的哲學家使用或者放下:

Go

FORK =
*[phil(0?pickup( )--* phil(0?putdown( )
0phil((i - 1)rood 5)?pickup( ) --* phil((/- l) raod 5)?putdown( )
]      

整個哲學家在房間中的行為可以描述為:

Go

ROOM = occupancy:integer; occupancy .--- 0;
,[(i:0..4)phil(0?enter ( ) --* occupancy .--- occupancy + l
11(i:0..4)phil(0?exit ( ) --~ occupancy .--- occupancy - l
]      

決定如何向等待的程序配置設定資源的任務稱為排程。Hoare 将排程分為兩個事件:

  • processes 請求資源
  • 将資源配置設定給 processes

那麼這個哲學家問題可以轉換成 PHIL 和 FORK 這兩個元件并發的過程:

Go

[room::ROOM I [fork( i:0..4)::FORK I Iphil( i:0..4)::PHIL].      

從請求到授予資源的時間就是等待時間。在 CSP 中,有幾種技術可以防止無限的等待時間。

  • 限制資源使用并提高資源可用性。
  • 先進先出(FIFO)将資源配置設定給等待時間最長的程序。
  • 面包店算法​​Carnegie Melon. Bakery Algorithm​​

3. 缺陷

在确定性程式中,如果環境恒定,結果将是相同的。 由于并發基于非确定性,是以環境不會影響程式。給定所選的路徑,程式則可以運作幾次并收到不同的結果。為了確定并發程式的準确性,程式員必須能夠在整體水準上考慮其程式的執行。

但是,盡管 Hoare 引入了正式的方法,但仍然缺少任何驗證正确程式的證明方法。CSP 隻能發現已知問題,而不能發現未知問題。雖然基于 CSP 的商業應用程式(例如ConAn)可以檢測到錯誤的存在,但是不能檢測沒有錯誤的情況,(無法驗證正确性)。盡管 CSP 為我們提供了編寫可以避免常見并發錯誤的程式的工具,但是正确程式的證明仍然是 CSP 中尚未解決的領域。

4. 未來

CSP 在生物學和化學領域具有巨大的潛力,可以對自然界中的複雜系統進行模組化。 由于該行業面臨許多現存的邏輯問題,是以尚未在行業中廣泛使用。在關于 CSP 開發 25 周年的會議上,Hoare 指出,盡管有許多由 Microsoft 資助的研究項目,但比爾·蓋茨(Bill Gates)忽略了 Microsoft 何時能夠将 CSP 的研究成果商業化的​​問題​​。

Hoare 提醒他的聽衆,動态過程領域仍然需要更多的研究。目前,計算機科學界陷入了順序思維的範式。随着 Hoare 建立正式的并發方法的基礎,科學界已做好準備成為并行程式設計的下一個革命。

5. Go 并發哲學

在 Go 語言釋出之前,很少有語言從底層為并發原語提供支援。大多數語言還是支援共享和記憶體通路同步到 CSP 的消息傳遞方法。Go 語言算是最早将 CSP 原則納入其核心的語言之一。記憶體通路同步的方式并不是不好,隻是在高并發的場景下有時候難以正确的使用,特别是在超大型,巨型的程式中。基于此,并發能力被認為是 Go 語言天生優勢之一。追其根本,還是因為 Go 基于 CSP 創造出來的一系列易讀,友善編寫的并發原語。

Go 語言除了 CSP 并發原語以外,還支援通過記憶體通路同步。sync 與其他包中的結構體與方法可以讓開發者建立 WaitGroup,互斥鎖和讀寫鎖,cond,once,sync.Pool。在 Go 語言的官方 FAQ 中,描述了如何選擇這些并發原語:

為了尊重 mutex,sync 包實作了 mutex,但是我們希望 Go 語言的程式設計風格将會激勵人們嘗試更高等級的技巧。尤其是考慮建構你的程式,以便一次隻有一個 goroutine 負責某個特定的資料。

不要通過共享記憶體進行通信。建議,通過通信來共享記憶體。(Do not communicate by sharing memory; instead, share memory by communicating)這是 Go 語言并發的哲學座右銘。相對于使用 sync.Mutex 這樣的并發原語。雖然大多數鎖的問題可以通過 channel 或者傳統的鎖兩種方式之一解決,但是 Go 語言核心團隊更加推薦使用 CSP 的方式。

深入 Go 并發原語 — Channel 底層實作

關于如何選擇并發原語的問題,本文作為第一篇文章必然需要解釋清楚。Go 中的并發原語主要分為 2 大類,一個是 sync 包裡面的,另一個是 channel。sync 包裡面主要是 WaitGroup,互斥鎖和讀寫鎖,cond,once,sync.Pool 這一類。在 2 種情況下推薦使用 sync 包:

  • 對性能要求極高的臨界區
  • 保護某個結構内部狀态和完整性

關于保護某個結構内部的狀态和完整性。例如 Go 源碼中如下代碼:

Go

var sum struct {
  sync.Mutex
  i int
}

//export Add
func Add(x int) {
  defer func() {
    recover()
  }()
  sum.Lock()
  sum.i += x
  sum.Unlock()
  var p *int
  *p = 2
}      

sum 這個結構體不想将内部的變量暴露在結構體之外,是以使用 sync.Mutex 來保護線程安全。

相對于 sync 包,channel 也有 2 種情況:

  • 輸出資料給其他使用方
  • 組合多個邏輯

輸出資料給其他使用方的目的是轉移資料的使用權。并發安全的實質是保證同時隻有一個并發上下文擁有資料的所有權。channel 可以很友善的将資料所有權轉給其他使用方。另一個優勢是組合型。如果使用 sync 裡面的鎖,想實作組合多個邏輯并且保證并發安全,是比較困難的。但是使用 channel + select 實作組合邏輯實在太友善了。以上就是 CSP 的基本概念和何時選擇 channel 的時機。下一章從 channel 基本資料結構開始詳細分析 channel 底層源碼實作。

以下代碼基于 Go 1.16

二. 基本資料結構

channel 的底層源碼和相關實作在 src/runtime/chan.go 中。

Go

type hchan struct {
  qcount   uint           // 隊列中所有資料總數
  dataqsiz uint           // 環形隊列的 size
  buf      unsafe.Pointer // 指向 dataqsiz 長度的數組
  elemsize uint16         // 元素大小
  closed   uint32
  elemtype *_type         // 元素類型
  sendx    uint           // 已發送的元素在環形隊列中的位置
  recvx    uint           // 已接收的元素在環形隊列中的位置
  recvq    waitq          // 接收者的等待隊列
  sendq    waitq          // 發送者的等待隊列

  lock mutex
}      

lock 鎖保護 hchan 中的所有字段,以及此通道上被阻塞的 sudogs 中的多個字段。持有 lock 的時候,禁止更改另一個 G 的狀态(特别是不要使 G 狀态變成ready),因為這會因為堆棧 shrinking 而發生死鎖。

深入 Go 并發原語 — Channel 底層實作

recvq 和 sendq 是等待隊列,waitq 是一個雙向連結清單:

Go

type waitq struct {
  first *sudog
  last  *sudog
}      

channel 最核心的資料結構是 sudog。sudog 代表了一個在等待隊列中的 g。sudog 是 Go 中非常重要的資料結構,因為 g 與同步對象關系是多對多的。一個 g 可以出現在許多等待隊列上,是以一個 g 可能有很多sudog。并且多個 g 可能正在等待同一個同步對象,是以一個對象可能有許多 sudog。sudog 是從特殊池中配置設定出來的。使用 acquireSudog 和 releaseSudog 配置設定和釋放它們。

Go

type sudog struct {

  g *g

  next *sudog
  prev *sudog
  elem unsafe.Pointer // 指向資料 (可能指向棧)

  acquiretime int64
  releasetime int64
  ticket      uint32

  isSelect bool
  success bool

  parent   *sudog     // semaRoot 二叉樹
  waitlink *sudog     // g.waiting 清單或者 semaRoot
  waittail *sudog     // semaRoot
  c        *hchan     // channel
}      

sudog 中所有字段都受 hchan.lock 保護。acquiretime、releasetime、ticket 這三個字段永遠不會被同時通路。對 channel 來說,waitlink 隻由 g 使用。對 semaphores 來說,隻有在持有 semaRoot 鎖的時候才能通路這三個字段。isSelect 表示 g 是否被選擇,g.selectDone 必須進行 CAS 才能在被喚醒的競争中勝出。success 表示 channel c 上的通信是否成功。如果 goroutine 在 channel c 上傳了一個值而被喚醒,則為 true;如果因為 c 關閉而被喚醒,則為 false。

三. 建立 Channel

建立 channel 常見代碼:

Go

ch := make(chan int)      

編譯器編譯上述代碼,在檢查 ir 節點時,根據節點 op 不同類型,進行不同的檢查,如下源碼:

Go

func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {
  switch n.Op() {
  default:
    ir.Dump("walk", n)
    base.Fatalf("walkExpr: switch 1 unknown op %+v", n.Op())
    panic("unreachable")

  case ir.OMAKECHAN:
    n := n.(*ir.MakeExpr)
    return walkMakeChan(n, init)

  ......
}      

編譯器會檢查每一種類型,walkExpr1() 的實作就是一個 switch-case,函數末尾沒有 return,因為每一個 case 都會 return 或者傳回 panic。這樣做是為了與存在類型斷言的情況中傳回的内容做區分。walk 具體處理 OMAKECHAN 類型節點的邏輯如下:

Go

func walkMakeChan(n *ir.MakeExpr, init *ir.Nodes) ir.Node {
  size := n.Len
  fnname := "makechan64"
  argtype := types.Types[types.TINT64]

  if size.Type().IsKind(types.TIDEAL) || size.Type().Size() <= types.Types[types.TUINT].Size() {
    fnname = "makechan"
    argtype = types.Types[types.TINT]
  }

  return mkcall1(chanfn(fnname, 1, n.Type()), n.Type(), init, reflectdata.TypePtr(n.Type()), typecheck.Conv(size, argtype))
}      

上述代碼預設調用 makechan64() 函數。類型檢查時如果 TIDEAL 大小在 int 範圍内。将 TUINT 或 TUINTPTR 轉換為 TINT 時出現大小溢出的情況,将在運作時在 makechan 中進行檢查。如果在 make 函數中傳入的 channel size 大小在 int 範圍内,推薦使用 makechan()。因為 makechan() 在 32 位的平台上更快,用的記憶體更少。

makechan64() 和 makechan() 函數方法原型如下:

Go

func makechan64(chanType *byte, size int64) (hchan chan any)
func makechan(chanType *byte, size int) (hchan chan any)      

makechan64() 方法隻是判斷一下傳入的入參 size 是否還在 int 範圍之内:

Go

func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}      

建立 channel 的主要實作在 makechan() 函數中:

Go

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // 編譯器檢查資料項大小不能超過 64KB
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    // 檢查對齊是否正确
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    // 緩沖區大小檢查,判斷是否溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:
        // 隊列或者元素大小為 zero 時
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race 競争檢查利用這個位址來進行同步操作
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // 元素不包含指針時。一次配置設定 hchan 和 buf 的記憶體。
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素包含指針時
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    // 設定屬性
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}      

上面這段 makechan() 代碼主要目的是生成 *hchan 對象。重點關注 switch-case 中的 3 種情況:

  • 當隊列或者元素大小為 0 時,調用 mallocgc() 在堆上為 channel 開辟一段大小為 hchanSize 的記憶體空間。
  • 當元素類型不是指針類型時,調用 mallocgc() 在堆上開辟為 channel 和底層 buf 緩沖區數組開辟一段大小為 hchanSize + mem 連續的記憶體空間。
  • 預設情況元素類型中有指針類型,調用 mallocgc() 在堆上分别為 channel 和 buf 緩沖區配置設定記憶體。

完成第一步的記憶體配置設定之後,再就是 hchan 資料結構其他字段的初始化和 lock 的初始化。值得說明的一點是,當存儲在 buf 中的元素不包含指針時,Hchan 中也不包含 GC 關心的指針。buf 指向一段相同元素類型的記憶體,elemtype 固定不變。SudoG 是從它們自己的線程中引用的,是以垃圾回收的時候無法回收它們。受到垃圾回收器的限制,指針類型的緩沖 buf 需要單獨配置設定記憶體。官方在這裡加了一個 TODO,垃圾回收的時候這段代碼邏輯需要重新考慮。

就是因為 channel 的建立全部調用的 mallocgc(),在堆上開辟的記憶體空間,channel 本身會被 GC 自動回收。有了這一性質,是以才有了下文關閉 channel 中優雅關閉的方法。

四. 發送資料

向 channel 中發送資料常見代碼:

Go

ch <- 1      

編譯器編譯上述代碼,在檢查 ir 節點時,根據節點 op 不同類型,進行不同的檢查,如下源碼:

Go

func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {
  switch n.Op() {
  default:
    ir.Dump("walk", n)
    base.Fatalf("walkExpr: switch 1 unknown op %+v", n.Op())
    panic("unreachable")

  case ir.OSEND:
    n := n.(*ir.SendStmt)
    return walkSend(n, init)

  ......
}      

walkExpr1() 函數在建立 channel 提到了,這裡不再贅述。操作類型是 OSEND,對應調用 walkSend() 函數:

Go

func walkSend(n *ir.SendStmt, init *ir.Nodes) ir.Node {
  n1 := n.Value
  n1 = typecheck.AssignConv(n1, n.Chan.Type().Elem(), "chan send")
  n1 = walkExpr(n1, init)
  n1 = typecheck.NodAddr(n1)
  return mkcall1(chanfn("chansend1", 2, n.Chan.Type()), nil, init, n.Chan, n1)
}

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}      

walkSend() 函數中主要邏輯調用了 chansend1(),而 chansend1() 隻是 chansend() 的“外殼”。是以 channel 發送資料的核心實作在 chansend() 中。根據 channel 的阻塞和喚醒,又可以分為 2 部分邏輯代碼。接下來筆者講 chansend() 代碼拆成 4 部分詳細分析。

1. 異常檢查

chansend() 函數一開始先進行異常檢查:

Go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 判斷 channel 是否為 nil
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // 簡易快速的檢查
    if !block && c.closed == 0 && full(c) {
        return false
    }
......
}      

chansend() 一上來對 channel 進行檢查,如果被 GC 回收了會變為 nil。朝一個為 nil 的 channel 發送資料會發生阻塞。gopark 會引發以 waitReasonChanSendNilChan 為原因的休眠,并抛出 unreachable 的 fatal error。當 channel 不為 nil,再開始檢查在沒有擷取鎖的情況下會導緻發送失敗的非阻塞操作。

當 channel 不為 nil,并且 channel 沒有 close 時,還需要檢查此時 channel 是否做好發送的準備,即判斷 full(c)

Go

func full(c *hchan) bool {
  if c.dataqsiz == 0 {
    // 假設指針讀取是近似原子性的
    return c.recvq.first == nil
  }
  // 假設讀取 uint 是近似原子性的
  return c.qcount == c.dataqsiz
}      

full() 方法作用是判斷在 channel 上發送是否會阻塞(即通道已滿)。它讀取單個位元組大小的可變狀态(recvq.first 和 qcount),盡管答案可能在一瞬間是 true,但在調用函數收到傳回值時,正确的結果可能發生了更改。值得注意的是 dataqsiz 字段,它在建立完 channel 以後是不可變的,是以它可以安全的在任意時刻讀取。

回到 chansend() 異常檢查中。一個已經 close 的 channel 是不可能從“準備發送”的狀态變為“未準備好發送”的狀态。是以在檢查完 channel 是否 close 以後,就算 channel close 了,也不影響此處檢查的結果。可能有讀者疑惑,“能不能把檢查順序倒一倒?先檢查是否 full(),再檢查是否 close?”。這樣倒過來确實能保證檢查 full() 的時候,channel 沒有 close。但是這種做法也沒有實質性的改變。channel 依舊可以在檢查完 close 以後再關閉。其實我們依賴的是 chanrecv() 和 closechan() 這兩個方法在鎖釋放後,它們更新這個線程 c.close 和 full() 的結果視圖。

2. 同步發送

channel 異常狀态檢查以後,接下來的代碼是發送的邏輯。

Go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......

  lock(&c.lock)

  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }

  if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
  }

......

}      

在發送之前,先上鎖,保證線程安全。并再一次檢查 channel 是否關閉。如果關閉則抛出 panic。加鎖成功并且 channel 未關閉,開始發送。如果有正在阻塞等待的接收方,通過 dequeue() 取出頭部第一個非空的 sudog,調用 send() 函數:

Go

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}      

send() 函數主要完成了 2 件事:

  1. 調用 sendDirect() 函數将資料拷貝到了接收變量的記憶體位址上
  2. 調用 goready() 将等待接收的阻塞 goroutine 的狀态從 Gwaiting 或者 Gscanwaiting 改變成 Grunnable。下一輪排程時會喚醒這個接收的 goroutine。
深入 Go 并發原語 — Channel 底層實作

這裡重點說說 goready() 的實作。了解了它的源碼,就能明白為什麼往 channel 中發送資料并非立即可以從接收方擷取到。

Go

func goready(gp *g, traceskip int) {
  systemstack(func() {
    ready(gp, traceskip, true)
  })
}

func ready(gp *g, traceskip int, next bool) {
......

  casgstatus(gp, _Gwaiting, _Grunnable)
  runqput(_g_.m.p.ptr(), gp, next)
  wakep()
  releasem(mp)
}      

在 runqput() 函數的作用是把 g 綁定到本地可運作的隊列中。此處 next 傳入的是 true,将 g 插入到 runnext 插槽中,等待下次排程便立即運作。因為這一點導緻了雖然 goroutine 保證了線程安全,但是在讀取資料方面比數組慢了幾百納秒。

Read Channel Slice
Time x * 100 * nanosecond
Thread safe Yes No

是以在寫測試用例的某些時候,需要考慮到這個微弱的延遲,可以适當加 sleep()。再比如刷 LeetCode 題目的時候,并非無腦使用 goroutine 就能帶來 runtime 的提升,例如 ​​509. Fibonacci Number​​​,感興趣的同學可以用 goroutine 來寫一寫這道題,筆者這裡實作了​​goroutine 解法​​,性能方面完全不如數組的解法。

3. 異步發送

如果初始化 channel 時建立的帶緩沖區的異步 Channel,當接收者隊列為空時,這是會進入到異步發送邏輯:

Go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......

  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      racenotify(c, c.sendx, nil)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
  
......
}      

如果 qcount 還沒有滿,則調用 chanbuf() 擷取 sendx 索引的元素指針值。調用 typedmemmove() 方法将發送的值拷貝到緩沖區 buf 中。拷貝完成,需要維護 sendx 索引下标值和 qcount 個數。這裡将 buf 緩沖區設計成環形的,索引值如果到了隊尾,下一個位置重新回到隊頭。

深入 Go 并發原語 — Channel 底層實作

至此,兩種直接發送的邏輯分析完了,接下來是發送時 channel 阻塞的情況。

4. 阻塞發送

當 channel 處于打開狀态,但是沒有接收者,并且沒有 buf 緩沖隊列或者 buf 隊列已滿,這時 channel 會進入阻塞發送。

Go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......

  if !block {
    unlock(&c.lock)
    return false
  }
  
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
  c.sendq.enqueue(mysg)
  atomic.Store8(&gp.parkingOnChan, 1)
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  KeepAlive(ep)
......
}      
  • 調用 getg() 方法擷取目前 goroutine 的指針,用于綁定給一個 sudog。
  • 調用 acquireSudog() 方法擷取一個 sudog,可能是建立的 sudog,也有可能是從緩存中擷取的。設定好 sudog 要發送的資料和狀态。比如發送的 Channel、是否在 select 中和待發送資料的記憶體位址等等。
  • 調用 c.sendq.enqueue 方法将配置好的 sudog 加入待發送的等待隊列。
  • 設定原子信号。當棧要 shrink 收縮時,這個标記代表目前 goroutine 還 parking 停在某個 channel 中。在 g 狀态變更與設定 activeStackChans 狀态這兩個時間點之間的時間視窗進行棧 shrink 收縮是不安全的,是以需要設定這個原子信号。
  • 調用 gopark 方法挂起目前 goroutine,狀态為 waitReasonChanSend,阻塞等待 channel。
  • 最後,KeepAlive() 確定發送的值保持活動狀态,直到接收者将其複制出來。 sudog 具有指向堆棧對象的指針,但 sudog 不能被當做堆棧跟蹤器的 root。發送的數值是配置設定在堆上,這樣可以避免被 GC 回收。
深入 Go 并發原語 — Channel 底層實作

這裡提一下 sudog 的二級緩存複用體系。在 acquireSudog() 方法中:

Go

func acquireSudog() *sudog {
  mp := acquirem()
  pp := mp.p.ptr()
  // 如果本地緩存為空
  if len(pp.sudogcache) == 0 {
    lock(&sched.sudoglock)
    // 首先嘗試将全局中央緩存存一部分到本地
    for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
      s := sched.sudogcache
      sched.sudogcache = s.next
      s.next = nil
      pp.sudogcache = append(pp.sudogcache, s)
    }
    unlock(&sched.sudoglock)
    // 如果全局中央緩存是空的,則 allocate 一個新的
    if len(pp.sudogcache) == 0 {
      pp.sudogcache = append(pp.sudogcache, new(sudog))
    }
  }
  // 從尾部提取,并調整本地緩存
  n := len(pp.sudogcache)
  s := pp.sudogcache[n-1]
  pp.sudogcache[n-1] = nil
  pp.sudogcache = pp.sudogcache[:n-1]
  if s.elem != nil {
    throw("acquireSudog: found s.elem != nil in cache")
  }
  releasem(mp)
  return s
}      

上述代碼涉及到 2 個新的重要的結構體,由于這 2 個結構體特别複雜,暫時此處隻展示和 acquireSudog() 有關的部分:

Go

type p struct {
......
  sudogcache []*sudog
  sudogbuf   [128]*sudog
......
}

type schedt struct {
......
  sudoglock  mutex
  sudogcache *sudog
......
}      

sched.sudogcache 是全局中央緩存,可以認為它是“一級緩存”,它會在 GC 垃圾回收執行 clearpools 被清理。p.sudogcache 可以認為它是“二級緩存”,是本地緩存不會被 GC 清理掉。

chansend 最後的代碼邏輯是當 goroutine 喚醒以後,解除阻塞的狀态:

Go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......

  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  closed := !mysg.success
  gp.param = nil
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  mysg.c = nil
  releaseSudog(mysg)
  if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
  return true
}      

sudog 算是對 g 的一種封裝,裡面包含了 g,要發送的資料以及相關的狀态。goroutine 被喚醒後會完成 channel 的阻塞資料發送。發送完最後進行基本的參數檢查,解除 channel 的綁定并釋放 sudog。

Go

func releaseSudog(s *sudog) {
  if s.elem != nil {
    throw("runtime: sudog with non-nil elem")
  }
  if s.isSelect {
    throw("runtime: sudog with non-false isSelect")
  }
  if s.next != nil {
    throw("runtime: sudog with non-nil next")
  }
  if s.prev != nil {
    throw("runtime: sudog with non-nil prev")
  }
  if s.waitlink != nil {
    throw("runtime: sudog with non-nil waitlink")
  }
  if s.c != nil {
    throw("runtime: sudog with non-nil c")
  }
  gp := getg()
  if gp.param != nil {
    throw("runtime: releaseSudog with non-nil gp.param")
  }
  // 防止 rescheduling 到了其他的 P
  mp := acquirem() 
  pp := mp.p.ptr()
  // 如果本地緩存已滿
  if len(pp.sudogcache) == cap(pp.sudogcache) {
    // 轉移一半本地緩存到全局中央緩存中
    var first, last *sudog
    for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
      n := len(pp.sudogcache)
      p := pp.sudogcache[n-1]
      pp.sudogcache[n-1] = nil
      pp.sudogcache = pp.sudogcache[:n-1]
      if first == nil {
        first = p
      } else {
        last.next = p
      }
      last = p
    }
    lock(&sched.sudoglock)
    // 将提取的連結清單挂載到全局中央緩存中
    last.next = sched.sudogcache
    sched.sudogcache = first
    unlock(&sched.sudoglock)
  }
  pp.sudogcache = append(pp.sudogcache, s)
  releasem(mp)
}      

releaseSudog() 雖然釋放了 sudog 的記憶體,但是它會被 p.sudogcache 這個“二級緩存”緩存起來。

chansend() 函數最後傳回 true 表示成功向 Channel 發送了資料。

5. 小結

關于 channel 發送的源碼實作已經分析完了,針對 channel 各個狀态做一個小結。

Channel Status Result
Write nil 阻塞
Write 打開但填滿 阻塞
Write 打開但未滿 成功寫入值
Write 關閉 panic
Write 隻讀 Compile Error

channel 發送過程中包含 2 次有關 goroutine 排程過程:

  1. 當接收隊列中存在 sudog 可以直接發送資料時,執行 ​

    ​goready()​

    ​将 g 插入 runnext 插槽中,狀态從 Gwaiting 或者 Gscanwaiting 改變成 Grunnable,等待下次排程便立即運作。
  2. 當 channel 阻塞時,執行 ​

    ​gopark()​

    ​ 将 g 阻塞,讓出 cpu 的使用權。

需要強調的是,通道并不提供跨 goroutine 的資料通路保護機制。如果通過通道傳輸資料的一份副本,那麼每個 goroutine 都持有一份副本,各自對自己的副本做修改是安全的。當傳輸的是指向資料的指針時,如果讀和寫是由不同的 goroutine 完成的,那麼每個 goroutine 依舊需要額外的同步操作。

五. 接收資料

從 channel 中接收資料常見代碼:

Go

tmp := <-ch
tmp, ok := <-ch      

先看等号左邊指派一個值的情況,編譯器編譯上述代碼,在檢查 ir 節點時,根據節點 op 不同類型,進行不同的檢查,如下源碼:

Go

// walkAssign walks an OAS (AssignExpr) or OASOP (AssignOpExpr) node.
func walkAssign(init *ir.Nodes, n ir.Node) ir.Node {
......

  switch as.Y.Op() {
  default:
    as.Y = walkExpr(as.Y, init)

  case ir.ORECV:
    // x = <-c; as.Left is x, as.Right.Left is c.
    // order.stmt made sure x is addressable.
    recv := as.Y.(*ir.UnaryExpr)
    recv.X = walkExpr(recv.X, init)

    n1 := typecheck.NodAddr(as.X)
    r := recv.X // the channel
    return mkcall1(chanfn("chanrecv1", 2, r.Type()), nil, init, r, n1)
    
......
}      

as 是入參 ir 節點強制轉化成 AssignStmt 類型。AssignStmt 這個類型是指派的一個說明:

Go

type AssignStmt struct {
  miniStmt
  X   Node
  Def bool
  Y   Node
}      

Y 是等号右邊的值,它是 Node 類型,裡面包含 op 類型。walkAssign 是檢查指派語句,如果 Y.Op() 是 ir.ORECV 類型,說明是 channel 接收的過程。調用 chanrecv1() 函數。as.X 是指派語句左邊的元素,它是接收 channel 中的值,是以它必須是可尋址的。

當從 channel 中讀取資料等号左邊是 2 個值的時候,編譯器在 walkExpr1 中檢查這個指派語句:

Go

func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {
  switch n.Op() {
  default:
    ir.Dump("walk", n)
    base.Fatalf("walkExpr: switch 1 unknown op %+v", n.Op())
    panic("unreachable")
......

  case ir.OAS2RECV:
    n := n.(*ir.AssignListStmt)
    return walkAssignRecv(init, n)
    
......
}      

n.Op() 是 ir.OAS2RECV 類型,将 n 強轉成 AssignListStmt 類型:

Go

type AssignListStmt struct {
  miniStmt
  Lhs Nodes
  Def bool
  Rhs Nodes
}      

AssignListStmt 和 AssignStmt 作用一樣,隻是 AssignListStmt 表示等号兩邊指派語句不再是一個對象,而是多個。回到 walkExpr1() 中,如果是 ir.OAS2RECV 類型,調用 walkAssignRecv() 繼續檢查。

Go

func walkAssignRecv(init *ir.Nodes, n *ir.AssignListStmt) ir.Node {
  init.Append(ir.TakeInit(n)...)
  r := n.Rhs[0].(*ir.UnaryExpr) // recv
  walkExprListSafe(n.Lhs, init)
  r.X = walkExpr(r.X, init)
  var n1 ir.Node
  if ir.IsBlank(n.Lhs[0]) {
    n1 = typecheck.NodNil()
  } else {
    n1 = typecheck.NodAddr(n.Lhs[0])
  }
  fn := chanfn("chanrecv2", 2, r.X.Type())
  ok := n.Lhs[1]
  call := mkcall1(fn, types.Types[types.TBOOL], init, r.X, n1)
  return typecheck.Stmt(ir.NewAssignStmt(base.Pos, ok, call))
}      

Lhs[0] 是實際接收 channel 值的對象,Lhs[1] 是指派語句左邊第二個 bool 值。指派語句右邊由于隻有一個 channel,是以這裡 Rhs 也隻用到了 Rhs[0]。

Go

//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
  chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
  _, received = chanrecv(c, elem, true)
  return
}      

綜合上述的分析,2 種不同的 channel 接收方式會轉換成 runtime.chanrecv1 和 runtime.chanrecv2 兩種不同函數的調用,但是最終核心邏輯還是在 runtime.chanrecv 中。

1. 異常檢查

chanrecv() 函數一開始先進行異常檢查:

Go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if debugChan {
    print("chanrecv: chan=", c, "\n")
  }

  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }

  // 簡易快速的檢查
  if !block && empty(c) {
    if atomic.Load(&c.closed) == 0 {
      return
    }
    if empty(c) {
      // channel 不可逆的關閉了且為空
      if raceenabled {
        raceacquire(c.raceaddr())
      }
      if ep != nil {
        typedmemclr(c.elemtype, ep)
      }
      return true, false
    }
  }      

chanrecv() 一上來對 channel 進行檢查,如果被 GC 回收了會變為 nil。從一個為 nil 的 channel 中接收資料會發生阻塞。gopark 會引發以 waitReasonChanReceiveNilChan 為原因的休眠,并抛出 unreachable 的 fatal error。當 channel 不為 nil,再開始檢查在沒有擷取鎖的情況下會導緻接收失敗的非阻塞操作。

這裡進行的簡易快速的檢查,檢查中狀态不能發生變化。這一點和 chansend() 函數有差別。在 chansend() 簡易快速的檢查中,改變順序對檢查結果無太大影響,但是此處如果檢查過程中狀态發生變化,如果發生了 racing,檢查結果會出現完全相反的錯誤的結果。例如以下這種情況:channel 在第一個和第二個 if 檢查時是打開的且非空,于是在第二個 if 裡面 return。但是 return 的瞬間, channel 關閉且空。這樣判斷出來認為 channel 是打開的且非空。明顯是錯誤的結果,實際上 channel 是關閉且空的。同理檢查是否為空的時候也會發生狀态反轉。為了防止錯誤的檢查結果,c.closed 和 empty() 都必須使用原子檢查。

Go

func empty(c *hchan) bool {
  // c.dataqsiz 是不可變的
  if c.dataqsiz == 0 {
    return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
  }
  return atomic.Loaduint(&c.qcount) == 0
}      

這裡總共檢查了 2 次 empty(),因為第一次檢查時, channel 可能還沒有關閉,但是第二次檢查的時候關閉了,在 2 次檢查之間可能有待接收的資料到達了。是以需要 2 次 empty() 檢查。

不過就算按照上述源碼檢查,細心的讀者可能還會舉出一個反例,例如,關閉一個已經阻塞的同步的 channel,最開始的 !block && empty(c) 為 false,會跳過這個檢查。這種情況不能算在正常 chanrecv() 裡面。上述是不擷取鎖的情況檢查會接收失敗的情況。接下來在擷取鎖的情況下再次檢查一遍異常情況。

Go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
  lock(&c.lock)

  if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
      raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)
    }
    return true, false
  }
......      

如果 channel 已經關閉且不存在緩存資料了,則清理 ep 指針中的資料并傳回。這裡也是從已經關閉的 channel 中讀資料,讀出來的是該類型零值的原因。

2. 同步接收

同 chansend 邏輯類似,檢查完異常情況,緊接着是同步接收。

Go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......

  if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }
......      

在 channel 的發送隊列中找到了等待發送的 goroutine。取出隊頭等待的 goroutine。如果緩沖區的大小為 0,則直接從發送方接收值。否則,對應緩沖區滿的情況,從隊列的頭部接收資料,發送者的值添加到隊列的末尾(此時隊列已滿,是以兩者都映射到緩沖區中的同一個下标)。同步接收的核心邏輯見下面 recv() 函數:

Go

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if raceenabled {
      racesync(c, sg)
    }
    if ep != nil {
      // 從 sender 裡面拷貝資料
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
      // 這裡對應 buf 滿的情況
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      racenotify(c, c.recvx, nil)
      racenotify(c, c.recvx, sg)
    }
    // 将資料從 buf 中拷貝到接收者記憶體位址中
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // 将資料從 sender 中拷貝到 buf 中
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}      

需要注意的是由于有發送者在等待,是以如果存在緩沖區,那麼緩沖區一定是滿的。這個情況對應發送階段阻塞發送的情況,如果緩沖區還有空位,發送的資料直接放入緩沖區,隻有當緩沖區滿了,才會打包成 sudog,插入到 sendq 隊列中等待排程。注意了解這一情況。

接收時主要分為 2 種情況,有緩沖且 buf 滿和無緩沖的情況:

  • 無緩沖。ep 發送資料不為 nil,調用 recvDirect() 将發送隊列中 sudog 存儲的 ep 資料直接拷貝到接收者的記憶體位址中。
深入 Go 并發原語 — Channel 底層實作
  • 有緩沖并且 buf 滿。有 2 次 copy 操作,先将隊列中 recvx 索引下标的資料拷貝到接收方的記憶體位址,再将發送隊列頭的資料拷貝到緩沖區中,釋放一個 sudog 阻塞的 goroutine。

    有緩沖且 buf 滿的情況需要注意,取資料從緩沖隊列頭取出,發送的資料放在隊列尾部,由于 buf 裝滿,取出的 recvx 指針和發送的 sendx 指針指向相同的下标。

深入 Go 并發原語 — Channel 底層實作

最後調用 goready() 将等待接收的阻塞 goroutine 的狀态從 Gwaiting 或者 Gscanwaiting 改變成 Grunnable。下一輪排程時會喚醒這個發送的 goroutine。這部分邏輯和同步發送中一緻,關于 goready() 底層實作的代碼不在贅述。

3. 異步接收

如果 Channel 的緩沖區中包含一些資料時,從 Channel 中接收資料會直接從緩沖區中 recvx 的索引位置中取出資料進行處理:

Go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......

  if c.qcount > 0 {
    // 直接從隊列中接收
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      racenotify(c, c.recvx, nil)
    }
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }

  if !block {
    unlock(&c.lock)
    return false, false
  }
......      

上述代碼比較簡單,如果接收資料的記憶體位址 ep 不為空,則調用 runtime.typedmemmove() 将緩沖區内的資料拷貝到記憶體中,并通過 typedmemclr() 清除隊列中的資料。

深入 Go 并發原語 — Channel 底層實作

維護 recvx 下标,如果移動到了環形隊列的隊尾,下标需要回到隊頭。最後減少 qcount 計數器并釋放持有 Channel 的鎖。

4. 阻塞接收

如果 channel 發送隊列上沒有待發送的 goroutine,并且緩沖區也沒有資料時,将會進入到最後一個階段阻塞接收:

Go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......

  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
  c.recvq.enqueue(mysg)
  atomic.Store8(&gp.parkingOnChan, 1)
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
......      
  • 調用 getg() 方法擷取目前 goroutine 的指針,用于綁定給一個 sudog。
  • 調用 acquireSudog() 方法擷取一個 sudog,可能是建立的 sudog,也有可能是從緩存中擷取的。設定好 sudog 要發送的資料和狀态。比如發送的 Channel、是否在 select 中和待發送資料的記憶體位址等等。
  • 調用 c.recvq.enqueue 方法将配置好的 sudog 加入待發送的等待隊列。
  • 設定原子信号。當棧要 shrink 收縮時,這個标記代表目前 goroutine 還 parking 停在某個 channel 中。在 g 狀态變更與設定 activeStackChans 狀态這兩個時間點之間的時間視窗進行棧 shrink 收縮是不安全的,是以需要設定這個原子信号。
  • 調用 gopark 方法挂起目前 goroutine,狀态為 waitReasonChanReceive,阻塞等待 channel。
深入 Go 并發原語 — Channel 底層實作

上面這段代碼與 chansend() 中阻塞發送幾乎完全一緻,差別在于最後一步沒有 KeepAlive(ep)。

Go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......

  // 被喚醒
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  success := mysg.success
  gp.param = nil
  mysg.c = nil
  releaseSudog(mysg)
  return true, success
}      

goroutine 被喚醒後會完成 channel 的阻塞資料接收。接收完最後進行基本的參數檢查,解除 channel 的綁定并釋放 sudog。

5. 小結

關于 channel 接收的源碼實作已經分析完了,針對 channel 各個狀态做一個小結。

Channel status Result
Read nil 阻塞
Read 打開且非空 讀取到值
Read 打開但為空 阻塞
Read 關閉 <預設值>, false
Read 隻讀 Compile Error

chanrecv 的傳回值有幾種情況:

Go

tmp, ok := <-ch      
Channel status Selected Received
nil false false
打開且非空 true true
打開但為空 false false
關閉且傳回值是零值 true false

received 值會傳遞給讀取 channel 外部的 bool 值 ok,selected 值不會被外部使用。

channel 接收過程中包含 2 次有關 goroutine 排程過程:

  1. 當 channel 為 nil 時,執行 gopark() 挂起目前的 goroutine。
  2. 當發送隊列中存在 sudog 可以直接接收資料時,執行 goready()将 g 插入 runnext 插槽中,狀态從 Gwaiting 或者 Gscanwaiting 改變成 Grunnable,等待下次排程便立即運作。
  3. 當 channel 緩沖區為空,且沒有發送者時,這時 channel 阻塞,執行 gopark() 将 g 阻塞,讓出 cpu 的使用權并等待排程器的排程。

六. 關閉 Channel

關于 channel 常見代碼:

Go

close(ch)      

編譯器會将其轉換為 runtime.closechan() 方法。

1. 異常檢查

Go

func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }

  if raceenabled {
    callerpc := getcallerpc()
    racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
    racerelease(c.raceaddr())
  }
  
  c.closed = 1
......
}      

關閉一個 channel 有 2 點需要注意,當 Channel 是一個 nil 空指針或者關閉一個已經關閉的 channel 時,Go 語言運作時都會直接 panic。上述 2 種情況都不存在時,标記 channel 狀态為 close。

2. 釋放所有 readers 和 writers

關閉 channel 的主要工作是釋放所有的 readers 和 writers。

Go

func closechan(c *hchan) {
......
  var glist gList

  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = unsafe.Pointer(sg)
    sg.success = false
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
......
}      

上述代碼是回收接收者的 sudog。将所有的接收者 readers 的 sudog 等待隊列(recvq)加入到待清除隊列 glist 中。注意這裡是先回收接收者。就算從一個 close 的 channel 中讀取值,不會發生 panic,頂多讀到一個預設零值。

Go

func closechan(c *hchan) {
......

  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = unsafe.Pointer(sg)
    sg.success = false
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  unlock(&c.lock)
......
}      

再回收發送者 writers。回收步驟和回收接收者是完全一緻的,将發送者的等待隊列 sendq 中的 sudog 放入待清除隊列 glist 中。注意這裡可能會産生 panic。在第四章發送資料中分析過,往一個 close 的 channel 中發送資料,會産生 panic,這裡不再贅述。

深入 Go 并發原語 — Channel 底層實作

3. 協程排程

最後一步更改 goroutine 的狀态。

Go

func closechan(c *hchan) {
......
  for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
  }
......
}      

最後會為所有被阻塞的 goroutine 調用 goready 觸發排程。将所有 glist 中的 goroutine 狀态從 _Gwaiting 設定為 _Grunnable 狀态,等待排程器的排程。

4. 優雅關閉

“Channel 有幾種優雅的關閉方法?” 這種問題常常出現在面試題中,究其原因是因為 Channel 建立容易,但是關閉“不易”:

  • 在不改變 Channel 自身狀态的條件下,無法知道它是否已經關閉。“不易”之一,關閉時機未知。
  • 如果一個 Channel 已經關閉,重複關閉 Channel 會導緻 panic。“不易”之二,不能無腦關閉。
  • 往一個 close 的 Channel 内寫資料,也會導緻 panic。“不易”之三,寫資料之前也需要關注是否 close 的狀态。
Channel Status Result
close nil panic
close 打開且非空 關閉 Channel;讀取成功,直到 Channel 耗盡資料,然後讀取産生值的預設值
close 打開但為空 關閉 Channel;讀到生産者的預設值
close 關閉 panic
close 隻讀 Compile Error

那究竟什麼時候關閉 Channel 呢?由上面三個“不易”,可以濃縮為 2 點:

  • 不能簡單的從消費者側關閉 Channel。
  • 如果有多個生産者,它們不能關閉 Channel。

解釋一下這 2 個問題。第一個問題,消費者不知道 Channel 何時該關閉。如果關閉了已經關閉的 Channel 會導緻 panic。而且分布式應用通常有多個消費者,每個消費者的行為一緻,這麼多消費者都嘗試關閉 Channel 必然會導緻 panic。第二個問題,如果有多個生産者往 Channel 内寫入資料,這些生産者的行為邏輯也都一緻,如果其中一個生産者關閉了 Channel,其他的生産者還在往裡寫,這個時候會 panic。是以為了防止 panic,必須解決上面這 2 個問題。

關閉 Channel 的方式就 2 種:

  • Context
  • done channel

Context 的方式在本篇文章不詳細展開,詳細的可以檢視筆者 Context 的那篇文章。本節聊聊 done channel 的做法。假設有多個生産者,有多個消費者。在生産者和消費者之間增加一個額外的輔助控制 channel,用來傳遞關閉信号。

Go

type session struct {
  done     chan struct{}
  doneOnce sync.Once
  data     chan int
}

func (sess *session) Serve() {
  go sess.loopRead()
  sess.loopWrite()
}

func (sess *session) loopRead() {
  defer func() {
    if err := recover(); err != nil {
      sess.doneOnce.Do(func() { close(sess.done) })
    }
  }()

  var err error
  for {
    select {
    case <-sess.done:
      return
    default:
    }

    if err == io.ErrUnexpectedEOF || err == io.EOF {
      goto failed
    }
  }
failed:
  sess.doneOnce.Do(func() { close(sess.done) })
}

func (sess *session) loopWrite() {
  defer func() {
    if err := recover(); err != nil {
      sess.doneOnce.Do(func() { close(sess.done) })
    }
  }()

  var err error
  for {
    select {
    case <-sess.done:
      return
    case sess.data <- rand.Intn(100):
    }
    
    if err != nil {
      goto done
    }
  }
done:
  if err != nil {
    log("sess: loop write failed: %v, %s", err, sess)
  }
}

func (sess *session) ForceClose() {
  sess.doneOnce.Do(func() { close(sess.done) })
}      

消費者側發送關閉 done channel,由于消費者有多個,如果每一個都關閉 done channel,會導緻 panic。是以這裡用 doneOnce.Do() 保證隻會關閉 done channel 一次。這解決了第一個問題。生産者收到 done channel 的信号以後自動退出。多個生産者退出時間不同,但是最終肯定都會退出。當生産者全部退出以後,data channel 最終沒有引用,會被 gc 回收。這也解決了第二個問題,生産者不會去關閉 data channel,防止出現 panic。

深入 Go 并發原語 — Channel 底層實作