天天看點

goroutine排程

1、程序

一個程序包含可以由任何程序配置設定的公共資源。這些資源包括但不限于記憶體位址空間、檔案句柄、裝置和線程。

一個程序會包含下面一些屬性:

  • Process ID:程序ID
  • Process State:程序狀态
  • Process Priority:程序優先級
  • Program Counter:程式計數器
  • General purpose register:通用寄存器
  • List of open files:打開的檔案清單
  • List of open devices:打開的裝置清單
  • Protection information:保護資訊
  • List of the child process:子程序清單
  • Pending alarms:待定警告
  • Signals and signal handlers:信号和信号處理程式
  • Accounting information:記賬資訊

2、線程

線程是輕量級的程序,一個線程将在程序内的所有線程之間共享程序的資源,如代碼、資料、全局變量、檔案和記憶體位址空間。但是棧和寄存器不會共享,每個線程都有自己的棧和寄存器

線程的優點:

  • 提高系統的吞吐量
  • 提高響應能力
  • 由于屬性更少,上下文切換更快
  • 多核CPU的有效利用
  • 資源共享(代碼、資料、位址空間、檔案、全局變量)

3、使用者級線程

使用者級線程也稱為綠色線程,如:C 中的coroutine、Go 中的 goroutine 和 Ruby 中的 Fiber

goroutine排程

該程序維護一個記憶體位址空間,處理檔案,以及正在運作的應用程式的裝置和線程。作業系統排程程式決定哪些線程将在任何給定的 CPU 上接收時間

是以,與耗時和資源密集型的程序建立相比,在一個程序中建立多個使用者線程(goroutine)效率更高。

4、goroutine

在Go中使用者級線程被稱作Goroutine,在建立goroutine時需要做到:

  • 易于建立
  • 輕量級
  • 并發執行
  • 可擴充
  • 無限堆棧(最大堆棧大小在 64 位上為 1 GB,在 32 位上為 250 MB。)
  • 處理阻塞調用
  • 高效 (work stealing)

其中阻塞調用可能是下面一些原因:

  • 在channel中收發資料
  • 網絡IO調用
  • 阻塞的系統調用
  • 計時器
  • 互斥操作(Mutex)

為什麼go需要排程goroutine?

Go 使用稱為 goroutine 的使用者級線程,它比核心級線程更輕且更便宜。 例如,建立一個初始 goroutine 将占用 2KB 的堆棧大小,而核心級線程将占用 8KB 的堆棧大小。 還有,goroutine 比核心線程有更快的建立、銷毀和上下文切換,是以 go 排程器 需要退出來排程 goroutine。OS 不能排程使用者級線程,OS 隻知道核心級線程。 Go 排程器 将 goroutine 多路複用到核心級線程,這些線程将在不同的 CPU 核心上運作

什麼時候會排程goroutine?

如果有任何操作應該或将會影響 goroutine 的執行,比如 goroutine 的啟動、等待執行和阻塞調用等……

go排程 如何将 goroutine 多路複用到核心線程中?

1、1:1排程(1個線程對應一個goroutine)

  • 并行執行(每個線程可以在不同的核心上運作)
  • 可以工作但是代價太高
  • 記憶體至少〜32k(使用者堆棧和核心堆棧的記憶體)
  • 性能問題(系統調用)
  • 沒有無限堆棧

2、N:1排程(在單個核心線程上多路複用所有 goroutine)

  • 沒有并行性(即使有更多 CPU 核心可用,也隻能使用單個 CPU 核心)

我們看下下面的例子,隻為go配置設定了1個processer去處理2個goroutine:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    // 配置設定 1 個邏輯處理器供排程程式使用
    runtime.GOMAXPROCS(1)
    var wg sync.WaitGroup
    wg.Add(2)

    fmt.Println("Starting Goroutines")

    // 開一個go協程列印字母
    go func() {
        defer wg.Done()
        time.Sleep(time.Second)
        // 列印3次字母
        for count := 0; count < 3; count++ {
            for ch := 'a'; ch < 'a'+26; ch++ {
                fmt.Printf("%c ", ch)
            }
            fmt.Println()
        }
    }()

    // 開一個go協程列印數字
    go func() {
        defer wg.Done()
        // 列印3次數字
        for count := 0; count < 3; count++ {
            for n := 1; n <= 26; n++ {
                fmt.Printf("%d ", n)
            }
            fmt.Println()
        }
    }()

    // 等待傳回
    fmt.Println("Waiting To Finish")
    wg.Wait()
    fmt.Println("\nTerminating Program")
}      

看下結果:

go run main.go
Starting Goroutines
Waiting To Finish
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
a b c d e f g h i j k l m n o p q r s t u v w x y z 
a b c d e f g h i j k l m n o p q r s t u v w x y z 
a b c d e f g h i j k l m n o p q r s t u v      

可以看到這倆個goroutine是串行執行的,要麼先完成第一個goroutine,要麼先完成第二個goroutine,并不是并發執行的。

那如何去實作并發執行呢?

我們同樣設定runtime.GOMAXPROCS為1,但是在goroutine中我們在不同的時機加入阻塞goroutine的時間函數time.Sleep,我們看下會有什麼不同的結果。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    // 配置設定 1 個邏輯處理器供排程程式使用
    runtime.GOMAXPROCS(1)
    var wg sync.WaitGroup
    wg.Add(2)

    fmt.Println("Starting Goroutines")

    // 開一個go協程列印字母
    go func() {
        defer wg.Done()
        time.Sleep(time.Second)
        // 列印3次字母
        for count := 0; count < 3; count++ {
            for ch := 'a'; ch < 'a'+26; ch++ {
                if count == 0 {
                    time.Sleep(10 * time.Millisecond)
                }
                if count == 1 {
                    time.Sleep(30 * time.Millisecond)
                }
                if count == 2 {
                    time.Sleep(50 * time.Millisecond)
                }
                fmt.Printf("%c ", ch)
            }
            fmt.Println()
        }
    }()

    // 開一個go協程列印數字
    go func() {
        defer wg.Done()
        // 列印3次數字
        for count := 0; count < 3; count++ {
            for n := 1; n <= 26; n++ {
                if count == 0 {
                    time.Sleep(20 * time.Millisecond)
                }
                if count == 1 {
                    time.Sleep(40 * time.Millisecond)
                }
                if count == 2 {
                    time.Sleep(60 * time.Millisecond)
                }
                fmt.Printf("%d ", n)
            }
            fmt.Println()
        }
    }()

    // 等待傳回
    fmt.Println("Waiting To Finish")
    wg.Wait()
    fmt.Println("\nTerminating Program")
}      

看下結果:

go run main.go
Starting Goroutines
Waiting To Finish
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 a 12 b c d e 13 f g h i 14 j k l m 15 n o p 16 q r s t 17 u v w x 18 y z 
19 a b 20 c 21 d 22 e f 23 g 24 h 25 i j 26 
k l 1 m n 2 o p 3 q r 4 s t 5 u v 6 w x 7 y z 
8 a 9 b 10 c 11 d 12 e f 13 g 14 h 15 i 16 j 17 k l 18 m 19 n 20 o 21 p 22 q r 23 s 24 t 25 u 26      

通過上面的結果我們可以看到,當goroutine1阻塞時,go排程器會排程goroutine2執行。

我們可以得出:

  • 即使我們将 runtime.GOMAXPROCS(1) 設定為 1,程式也在并發運作
  • Running 狀态的 Goroutine 數量最大為 1,Block Goroutine 可以多于一個,其他所有 Goroutine 都處于 Runnable 狀态

3、線程池

  • 在需要時建立一個線程,這意味着如果有 goroutine 要運作但所有其他線程都忙,則建立一個線程
  • 一旦線程完成其執行而不是銷毀重用它
  • 這可以更快的建立goroutine,因為我們可以重用線程
  • 但是還有更多的記憶體消耗,性能問題,并且沒有無限堆棧。

4、M: N 線程共享運作隊列排程(GMP)

  • M代表系統線程的數量
  • N代表goroutine的數量
  • goroutine 的建立成本很低,我們可以完全控制 goroutine 的整個生命周期,因為它是在使用者空間中建立的
  • 建立一個作業系統線程很昂貴,我們無法控制它,但是使用多個線程我們可以實作并行
  • 在這個模型中,多個 goroutine 被多路複用到核心線程中

我們上面提到過導緻goroutine阻塞調用可能是下面一些原因:

  • 在channel中收發資料
  • 網絡IO調用
  • 阻塞的系統調用
  • 計時器
  • 互斥操作(Mutex)

下面看一些goroutine阻塞的例子:

package main

 import (
     "time"
     "fmt"
     "sync"
     "os"
     "net/http"
     "io/ioutil"
 )

 // 全局變量
 var worker int

 func writeToFile(wg *sync.WaitGroup,){
     defer wg.Done()
    
     file, _ := os.OpenFile("file.txt", os.O_RDWR|os.O_CREATE, 0755)             // 系統調用阻塞
     resp, _ := http.Get("https://blog.waterflow.link/articles/1662706601117") // 網絡IO阻塞
     body, _ := ioutil.ReadAll(resp.Body)                                        // 系統調用阻塞

     file.WriteString(string(body))
 }

 func workerCount(wg *sync.WaitGroup, m *sync.Mutex, ch chan string) { 
     // Lock() 給共享資源上鎖
     // 獨占通路狀态, 
     // 增加worker的值,
     // Unlock() 釋放鎖
     m.Lock()                                                                    // Mutex阻塞
     worker = worker + 1
     ch <- fmt.Sprintf("Worker %d is ready",worker)
     m.Unlock()
  
     // 傳回, 通知WaitGroup完成
     wg.Done()
 }

 func printWorker(wg *sync.WaitGroup, done chan bool, ch chan string){
    
     for i:=0;i<100;i++{
         fmt.Println(<-ch)                                               // Channel阻塞
     }
     wg.Done()
     done <-true
 }

 func main() {
    
     ch :=make(chan string)
     done :=make(chan bool)
    
     var mu sync.Mutex
    
     var wg sync.WaitGroup
    
     for i:=1;i<=100;i++{
         wg.Add(1)
         go workerCount(&wg,&mu,ch)
     }
    
     wg.Add(2)
     go writeToFile(&wg)
     go printWorker(&wg,done,ch)
    
     wg.Wait()
    
     <-done                                                             // Channel阻塞
    
     <-time.After(1*time.Second)                                        // Timer阻塞
     close(ch)
     close(done)
 }      

下面我們看看go排程器在上面這些例子中是如何工作的:

  • 如果一個 goroutine 在通道上被阻塞,則通道有等待隊列,所有阻塞的 goroutine 都列在等待隊列中,并且很容易跟蹤。 在阻塞調用之後,它們将被放入 schedular 的全局運作隊列中,OS Thread 将再次按照 FIFO 的順序選擇 goroutine。
  • goroutine排程
  1. M1,M2,M3嘗試從全局G隊列中擷取G
  2. M1擷取鎖并拿到G1,然後釋放鎖
  3. M3擷取鎖拿到G2,然後釋放鎖
  4. M2擷取鎖拿到G3,然後釋放鎖
  5. G1在ch1的channel中阻塞,然後添加到ch1的等待隊列。導緻M1空閑
  6. M1不能閑着,從全局隊列擷取鎖拿到G4,然後釋放鎖
  7. G3阻塞在ch2的channel中,然後被放到ch2的等待隊列。導緻M2空閑
  8. M2擷取鎖拿到G5,然後釋放鎖
  9. 此時G3在ch2結束阻塞,被放到全局隊列尾部等待執行
  10. G1在ch1結束阻塞,被放到全局隊列尾部等待執行
  11. G4,G5,G2執行完成
  12. M1,M2,M3重複步驟1-4
  • 互斥鎖、定時器和網絡 IO 使用相同的機制
  • 如果一個 goroutine 在系統調用中被阻塞,那麼情況就不同了,因為我們不知道核心空間發生了什麼。 通道是在使用者空間中建立的,是以我們可以完全控制它們,但在系統調用的情況下,我們沒法控制它們。
  • 阻塞系統調用不僅會阻塞 goroutine 還會阻塞核心線程。
  • 假設一個 goroutine 被安排在一個核心線程上的系統調用,當一個核心線程完成執行時,它将喚醒另一個核心線程(線程重用),該線程将拾取另一個 goroutine 并開始執行它。 這是一個理想的場景,但在實際情況下,我們不知道系統調用将花費多少時間,是以我們不能依賴核心線程來喚醒另一個線程,我們需要一些代碼級邏輯來決定何時 在系統調用的情況下喚醒另一個線程。 這個邏輯在 golang 中實作為 runtime·entersyscall()和 runtime·exitsyscall()。 這意味着核心線程的數量可以超過核心的數量。
  • 當對核心進行系統調用時,它有兩個關鍵點,一個是進入時機,另一個是退出時機。
  1. M1,M2試着從全局隊列拿G
  2. M1擷取鎖并拿到G1,然後釋放鎖
  3. M2擷取鎖并拿到G2,然後釋放鎖
  4. M2阻塞在系統調用,沒有可用的核心線程,是以go排程器建立一個新的線程M3
  5. M3擷取鎖并拿到G3,然後釋放鎖
  6. 此時M2結束阻塞狀态,重新把G2放到全局隊列(G2由阻塞變為可執行狀态)。M2雖然是空閑狀态,但是go排程器不會銷毀它,而是自旋發現新的可執行的goroutine。
  7. G1,G3執行結束
  8. M1,M3重複步驟1-3

作業系統可以支援多少核心線程?

在 Linux 核心中,此參數在檔案 /proc/sys/kernel/threads-max 中定義,該檔案用于特定核心。

​sh:~$ cat /proc/sys/kernel/threads-max 94751​

​ 這裡輸出94751表示核心最多可以執行94751個線程

每個 Go 程式可以支援多少個 goroutine?

排程中沒有内置對 goroutine 數量的限制。

每個 GO程式 可以支援多少個核心線程?

預設情況下,運作時将每個程式限制為最多 10,000 個線程。可以通過調用 runtime/debug 包中的 SetMaxThreads 函數來更改此值。

總結:

  1. 核心線程數可以多于核心數
  2. 輕量級 goroutine
  3. 處理 IO 和系統調用
  4. goroutine并行執行
  5. 不可擴充(所有核心級線程都嘗試使用互斥鎖通路全局運作隊列。是以,由于競争,這不容易擴充)

5、M:N 線程分布式運作隊列排程器

為了解決每個線程同時嘗試通路互斥鎖的可擴充問題,維護每個線程的本地運作隊列

  • 每個線程狀态(本地運作隊列)
  • 仍然有一個全局運作隊列
  • goroutine排程
  1. M1,M2,M3,M4掃描本地可運作隊列
  2. M1,M2,M3,M4從各自的本地隊列取出G4,G6,G1,G3

從上面的動圖可以看到:

  • 從本地隊列拿G是不需要加鎖的
  • 可運作 goroutine 的全局隊列需要鎖

結論:

  1. 輕量級 goroutine
  2. 處理 IO 和 SystemCalls
  3. goroutine 并行執行
  4. 可擴充
  5. 高效

如果線程數大于核心數,那麼會有什麼問題呢?

在分布式運作隊列排程中,我們知道每個線程都有自己的本地運作隊列,其中包含有關接下來将執行哪個 goroutine 的資訊。 同樣由于系統調用,線程數會增加,并且大多數時候它們的本地運作隊列是空的。 是以,如果線程數大于核心數,則每個線程必須掃描所有線程本地運作隊列,并且大部分時間它們是空的,是以如果線程過多,這個過程是耗時的并且解決方案 效率不高,是以我們需要将線程掃描限制為使用 M:P:N 線程模型求解的常數。

6、M:P: N 線程

  • P 代表處理器,它是運作 go 代碼所需的資源。 處理器結構詳細資訊 https://github.com/golang/go/blob/63e129ba1c458db23f0752d106ed088a2cf38360/src/runtime/runtime2.go#L601
  • M 代表工作線程或機器。 機器線程結構詳細資訊 https://github.com/golang/go/blob/63e129ba1c458db23f0752d106ed088a2cf38360/src/runtime/runtime2.go#L519
  • G 代表 goroutine。 Goroutine 結構細節 https://github.com/golang/go/blob/63e129ba1c458db23f0752d106ed088a2cf38360/src/runtime/runtime2.go#L407
  • 通常,P的數量與邏輯處理器的數量相同
  • 邏輯處理器與實體處理器不同(比如我的mac邏輯處理器是8,無力處理器是4)
  • 在啟動main goroutine之前建立P

如何檢查邏輯處理器的數量?

package main

 import (  
     "fmt"
     "runtime"
 )

 func main() {
     fmt.Println(runtime.NumCPU())
 }      

分布式 M:P:N 排程例子

goroutine排程
  1. M1,M2各自掃描P1,P2的隊列
  2. M1,M2從各自的P1,P2中取出G3,G1執行

在系統調用期間執行P的切換

goroutine排程
  1. M1,M2各自掃描P1,P2的隊列
  2. M1,M2從各自的P1,P2中取出G3,G1執行
  3. G1即将進入系統調用,是以在這之前G1會喚醒另一個線程M3,并将P2切換到M3
  4. M3掃描P2并取出G2運作
  5. 一旦G1變為非阻塞,它将被推送到全局隊列等待運作

在work-stealing期間,隻需要掃描固定數量的隊列,因為邏輯處理器的數量是有限的。

如何選擇下一個要運作的 goroutine ?

Go 排程器 将按以下順序檢查以選擇下一個要執行的 goroutine

  • 本地運作隊列
  • goroutine排程
  • 全局運作隊列
  1. M1,M2,M3各自掃描本地隊列P1,P2,P3
  2. M1,M2,M3各自從P1,P2,P3取出G3,G1,G5
  3. G5完成,M3掃描本地隊列P3發現空,然後掃描全局隊列
  4. M3将從全局隊列擷取一定數量的G(G6,G7),儲存到本地隊列P3
  5. 現在M3從本地隊列P3取出G6執行
  • Network poller
  1. M1,M2,M3各自掃描本地隊列P1,P2,P3
  2. M1,M2,M3各自從P1,P2,P3取出G3,G1,G6
  3. G6執行完成,M3掃描P3發現是空的,然後掃描全局隊列
  4. 但是全局隊列也是空的,然後就檢查網絡輪詢中已就緒的G
  5. 網絡輪詢中有一個已就緒的G2,是以M3取出G2并執行
  • Work Stealing
  1. M1,M2,M3各自掃描本地隊列P1,P2,P3
  2. M1,M2,M3各自從P1,P2,P3取出G3,G1,G6
  3. G6執行完成,M3掃描P3發現是空的,然後掃描全局隊列
  4. 但是全局隊列也是空的,然後就檢查網絡輪詢中已就緒的G
  5. 但是網絡輪詢中沒有已就緒的G,是以M3随機的從其他P中竊取一半的G到P3
  6. 如果随機選中的P中沒有要執行的G,就會重試4次,從其他P擷取

總結:

  • 輕量級 goroutine
  • 處理 IO 和系統調用
  • goroutine 的并行執行
  • 可擴充
  • 高效/工作竊取

Go 排程的局限性

  • FIFO 對局部性原則不利
  • 沒有 goroutine 優先級的概念(不像 Linux 核心)
  • 沒有強搶占 -> 沒有強公平或延遲保證
  • 它沒有意識到系統拓撲 -> 沒有真實的位置。有一個舊的 NUMA 感覺排程程式提案。此外,建議使用 LIFO 隊列,這樣 CPU 核心緩存中更有可能有資料。