天天看點

go| 感受并發程式設計的樂趣 後篇

這篇 blog 緊接我的上篇 blog - go| 感受并發程式設計的樂趣 前篇

.

學習了

ccmouse - googl工程師 慕課網 - 搭建并行處理管道,感受GO語言魅力

, 獲益匪淺, 也想把這份程式設計的快樂傳遞給大家.

強烈推薦一下ccmouse大大的課程, 總能讓我生出 Google工程師果然就是不一樣 之感, 每次都能從簡單的 hello world 開始, 一步步 coding 到教程的主題, 并在過程中給予充分的理由 -- 為什麼要一步步變複雜. 同時也會親身 踩坑 示範, 幹貨滿滿.

内容提要:

  • go 實作完整外部排序
  • go 實作叢集版(web版)外部排序

另外, ccmouse大大關于語言學習的方法也值得借鑒:

  • 首先, 學習一下語言文法的要點
  • 立刻找一個不那麼簡單的項目來做, 邊做邊查文檔/stackoverflow

先來看看完整外部排序的設計圖:

go| 感受并發程式設計的樂趣 後篇

涉及到的功能大部分在上一章都有講到, 整體流程:

  • 從檔案中讀取資料: 注意這裡使用了 chunk 的設計, 将檔案進行分塊讀取, 而且 chunkSize 的設計很巧妙, 同時支援 全文讀取 和 chunk讀取
  • 對讀取到的 chunk 的資料進行記憶體排序(快排)
  • 通過遞歸, 對排序後的 chunk 進行二路歸并
  • 将歸并後的資料寫入到檔案中

從協程的角度來看待整個流程:

  • goroutine1 進行 chunk讀取, 寫入 channel
  • goroutine2 進行 記憶體排序, 排序後資料寫入 channel
  • goroutine3 進行 二路歸并, 歸并的過程中, 資料不斷寫入到 channel
  • goroutine4 進行 檔案寫入, 将 channel 中的資料寫入到檔案

注意這裡:

  • goroutine1-4 可能是多個協程, 可能某一時刻是同一個協程, go 底層會有任務隊列(runq)進行協程排程
  • 可以通過資料流的角度來思考這個問題: 資料是怎麼在 檔案/channel/協程 之間進行流轉的.
  • 測試很重要, 示例中就先使用了small 資料進行測試, 檢查程式的正确性, 再調整到 large 資料
  • 日志很重要, 可以幫助我們擷取到程式的更多資訊, 比如 debug/性能調優

關于性能:

  • 并行 最終受限于 cpu 核數, 即 N 核cpu最多同時運作 N 個線程
  • 協程間的搶占會帶來性能損耗, 同理還有 程序/線程 的排程
  • 協程+channel的機制友善并發程式設計擴充, 相對于單機記憶體操作自然性能要低一些
package main

import (
    "io"
    "encoding/binary"
    "os"
    "bufio"
    "sort"
    "fmt"
    "time"
)

var startTime time.Time

func main() {
    fileIn := "small.in"
    fileOut := "small.out"
    p := createPipeline(fileIn, 512, 4) // 按照cpu核數設定節點數, 減少協程間搶占帶來性能損耗
    writeToFile(p, fileOut)
    printFile(fileOut, -1)

    startTime = time.Now() // 添加日志
    fileIn = "large.in"
    fileOut = "large.out"
    p = createPipeline(fileIn, 800000000, 4)
    writeToFile(p, fileOut)
    printFile(fileOut, 100)
}

func createPipeline(filename string, fileSize, chunkCount int) <-chan int {
    chunkSize := fileSize / chunkCount // fileSize/8/chunkCount = int/chunk, 這裡簡單處理, 設定為可以整除的參數
    sortResults := []<-chan int{}      // 傳遞給 mergeN() 的已排序切片
    for i := 0; i < chunkCount; i++ {
        file, err := os.Open(filename) // 為什麼沒有用 defer file.close() ? 因為需要在函數外去關閉掉, 比較麻煩, 這裡暫時省略
        if err != nil {
            panic(err)
        }
        file.Seek(int64(i*chunkSize), 0) // 定位到每個 chunk 的起始位置
        s := readerChunk(bufio.NewReader(file), chunkSize)
        sortResults = append(sortResults, memSort(s))
    }
    return mergeN(sortResults...)
}

func writeToFile(ch <-chan int, filename string) {
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    writer := bufio.NewWriter(file)
    defer writer.Flush() // defer 是 LIFO

    for v := range ch {
        buffer := make([]byte, 8)
        binary.BigEndian.PutUint64(buffer, uint64(v))
        writer.Write(buffer)
    }
}

func printFile(filename string, count int) {
    file, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    p := readerChunk(file, -1) // -1 的作用展現出來了, 這裡就可以讀取全部檔案
    if count == -1 {
        for v := range p {
            fmt.Println(v)
        }
    } else {
        n := 0
        for v := range p {
            fmt.Println(v)
            n++
            if n >= count {
                break
            }
        }
    }
}

// 遞歸解決兩兩歸并
func mergeN(ins ...<-chan int) <-chan int {
    if len(ins) == 1 {
        return ins[0]
    }
    m := len(ins) / 2
    // ins[0..m) + ins[m..end)
    return merge(mergeN(ins[:m]...),
        mergeN(ins[m:]...))
}

func merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
    go func() {
        // 歸并的過程要處理某個通道可能沒有資料的情況, 代碼非常值得一讀
        v1, ok1 := <-in1
        v2, ok2 := <-in2
        for ok1 || ok2 {
            if !ok2 || (ok1 && v1 <= v2) {
                out <- v1
                v1, ok1 = <-in1
            } else {
                out <- v2
                v2, ok2 = <-in2
            }
        }
        close(out)
        fmt.Println("merge done: ", time.Now().Sub(startTime))
    }()
    return out
}

// 添加 chunk 來讀取檔案,
func readerChunk(reader io.Reader, chunkSize int) <-chan int {
    out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
    bytesRead := 0
    go func() {
        buffer := make([]byte, 8) // int: 64bit -> 8byte
        for {
            n, err := reader.Read(buffer)
            bytesRead += n
            if n > 0 { // 可能資料不足 8byte
                v := int(binary.BigEndian.Uint64(buffer))
                out <- v
            }
            // 使用 -1 表示不添加 chunk 大小限制
            // 使用是 >=, 讀取區間是 [0, chunkSize)
            if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) {
                break
            }
        }
        close(out)
    }()
    return out
}

func memSort(in <-chan int) <-chan int {
    out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
    go func() {
        // read into memory
        a := []int{}
        for v := range in {
            a = append(a, v)
        }
        fmt.Println("read into memory: ", time.Now().Sub(startTime))
        // sort
        sort.Ints(a)
        fmt.Println("sort done: ", time.Now().Sub(startTime))
        // output
        for _, v := range a {
            out <- v
        }
        close(out)
    }()
    return out
}           

網絡版的設計:

go| 感受并發程式設計的樂趣 後篇

網絡版隻是在完整外排序的版本上, 新增了從網絡讀寫資料, 并相應修改 pipeline 即可

package main

import (
    "net"
    "bufio"
    "encoding/binary"
    "os"
    "strconv"
    "time"
    "fmt"
    "sort"
    "io"
)

var startTime time.Time

func main() {
    startTime = time.Now()

    // 測試 net server
    //netPipeline("small.in", 512, 4) // 按照cpu核數設定節點數, 減少協程間搶占帶來性能損耗
    //time.Sleep(time.Hour)

    // 測試 small
    p := netPipeline("small.in", 512, 4)
    writeToFile(p, "small.out")
    printFile("small.out", -1)

    // 測試 large
    //p := netPipeline("small.in", 512, 4)
    //writeToFile(p, "small.out")
    //printFile("small.out", -1)
}

func netPipeline(filename string, fileSize, chunkCount int) <-chan int {
    chunkSize := fileSize / chunkCount // fileSize/8/chunkCount = int/chunk, 這裡簡單處理, 設定為可以整除的參數
    sortAddr := []string{}
    for i := 0; i < chunkCount; i++ {
        file, err := os.Open(filename) // 為什麼沒有用 defer file.close() ? 因為需要在函數外去關閉掉, 比較麻煩, 這裡暫時省略
        if err != nil {
            panic(err)
        }
        file.Seek(int64(i*chunkSize), 0) // 定位到每個 chunk 的起始位置
        s := readerChunk(bufio.NewReader(file), chunkSize)

        addr := ":" + strconv.Itoa(7000 + i) // 設定不同端口号來設定不同的 server
        netSink(addr, memSort(s)) // 注意 pipeline 的設計思路是建立執行流程, 真正開始執行在 pipeline 建立之後
        sortAddr = append(sortAddr, addr)
    }

    //return nil // 測試 net server

    sortResults := []<-chan int{}
    for _,addr := range sortAddr {
        sortResults = append(sortResults, netSource(addr))
    }
    return mergeN(sortResults...)
}

func writeToFile(ch <-chan int, filename string) {
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    writer := bufio.NewWriter(file)
    defer writer.Flush() // defer 是 LIFO

    for v := range ch {
        buffer := make([]byte, 8)
        binary.BigEndian.PutUint64(buffer, uint64(v))
        writer.Write(buffer)
    }
}

func printFile(filename string, count int) {
    file, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    p := readerChunk(file, -1) // -1 的作用展現出來了, 這裡就可以讀取全部檔案
    if count == -1 {
        for v := range p {
            fmt.Println(v)
        }
    } else {
        n := 0
        for v := range p {
            fmt.Println(v)
            n++
            if n >= count {
                break
            }
        }
    }
}

func netSink(addr string, in <-chan int) {
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        panic(err)
    }
    go func() {
        defer listener.Close()
        conn, err := listener.Accept() // 通常 accept() 要放到 for{} 中不斷的接收請求, 這裡隻處理一次就關閉了
        if err != nil {
            panic(err)
        }
        defer conn.Close()
        writer := bufio.NewWriter(conn)
        defer writer.Flush() // 别忘了 flush buffer
        for v := range in {
            buffer := make([]byte, 8)
            binary.BigEndian.PutUint64(buffer, uint64(v))
            writer.Write(buffer)
        }
    }()
}

func netSource(addr string) <-chan int {
    out := make(chan int)
    go func() {
        conn, err := net.Dial("tcp", addr)
        if err != nil {
            panic(err)
        }
        defer conn.Close()
        r := readerChunk(bufio.NewReader(conn), -1)
        for v := range r {
            out <- v
        }
        close(out)
    }()
    return out
}

// 遞歸解決兩兩歸并
func mergeN(ins ...<-chan int) <-chan int {
    if len(ins) == 1 {
        return ins[0]
    }
    m := len(ins) / 2
    // ins[0..m) + ins[m..end)
    return merge(mergeN(ins[:m]...),
        mergeN(ins[m:]...))
}

func merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        // 歸并的過程要處理某個通道可能沒有資料的情況, 代碼非常值得一讀
        v1, ok1 := <-in1
        v2, ok2 := <-in2
        for ok1 || ok2 {
            if !ok2 || (ok1 && v1 <= v2) {
                out <- v1
                v1, ok1 = <-in1
            } else {
                out <- v2
                v2, ok2 = <-in2
            }
        }
        close(out)
        fmt.Println("merge done: ", time.Now().Sub(startTime))
    }()
    return out
}

// 添加 chunk 來讀取檔案,
func readerChunk(reader io.Reader, chunkSize int) <-chan int {
    out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
    bytesRead := 0
    go func() {
        buffer := make([]byte, 8) // int: 64bit -> 8byte
        for {
            n, err := reader.Read(buffer)
            bytesRead += n
            if n > 0 { // 可能資料不足 8byte
                v := int(binary.BigEndian.Uint64(buffer))
                out <- v
            }
            // 使用 -1 表示不添加 chunk 大小限制
            // 使用是 >=, 讀取區間是 [0, chunkSize)
            if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) {
                break
            }
        }
        close(out)
    }()
    return out
}

func memSort(in <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        // read into memory
        a := []int{}
        for v := range in {
            a = append(a, v)
        }
        fmt.Println("read into memory: ", time.Now().Sub(startTime))
        // sort
        sort.Ints(a)
        fmt.Println("sort done: ", time.Now().Sub(startTime))
        // output
        for _, v := range a {
            out <- v
        }
        close(out)
    }()
    return out
}           

寫在最後

go 的「強制」在程式設計方面感覺優點大于缺點:

  • 強制代碼風格: 讀/寫代碼都輕松了不少
  • 強制類型檢查: 出錯時的錯誤提示非常友好

書寫過程中, 基本根據編譯器提示, 就可以把大部分 bug 清理掉.

go語言三大特色:

  • 面向接口, 比如示例中的 Reader/Writer, 進而可以輕松添加 buffer 進行性能優化
  • 函數式, go語言中函數式一等公民
  • 并發程式設計: go + channel

再次推薦一下 go, 給想要寫 并發程式設計 的程式汪, 就如 ccmouse大大的教程所說:

感受并發程式設計的樂趣

資源推薦:

  • 首推 ccmouse大大的視訊教程:
  • 同時推薦看完一本書 go并發程式設計 : 講解并發程式設計相關知識和 go并發程式設計原理 上面非常透徹, 幾個實戰的項目也适合 ccmouse大大推薦的學習方式 -- 不那麼簡單的項目來練手