這篇 blog 緊接我的上篇 blog - go| 感受并發程式設計的樂趣 前篇
.
學習了
ccmouse - googl工程師 在 慕課網 - 搭建并行處理管道,感受GO語言魅力, 獲益匪淺, 也想把這份程式設計的快樂傳遞給大家.
強烈推薦一下ccmouse大大的課程, 總能讓我生出 Google工程師果然就是不一樣 之感, 每次都能從簡單的 hello world 開始, 一步步 coding 到教程的主題, 并在過程中給予充分的理由 -- 為什麼要一步步變複雜. 同時也會親身 踩坑 示範, 幹貨滿滿.
内容提要:
- go 實作完整外部排序
- go 實作叢集版(web版)外部排序
另外, ccmouse大大關于語言學習的方法也值得借鑒:
- 首先, 學習一下語言文法的要點
- 立刻找一個不那麼簡單的項目來做, 邊做邊查文檔/stackoverflow
先來看看完整外部排序的設計圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM0ITMvw1dvwlMvwlM3VWaWV2Zh1WaDdTJwlmc0N3LcRnbllmcv1yb0VXYvwlMyd2bNV2Zh1Wa-cGcq5SN4MjNhJzNwczYxQGMmlzMtkTOzcjN18CXzV2Zh1WafRWYvxGc19CXvlmL1h2cuFWaq5ycldWYtlWLkF2bsBXdvw1LcpDc0RHaiojIsJye.jpg)
涉及到的功能大部分在上一章都有講到, 整體流程:
- 從檔案中讀取資料: 注意這裡使用了 chunk 的設計, 将檔案進行分塊讀取, 而且 chunkSize 的設計很巧妙, 同時支援 全文讀取 和 chunk讀取
- 對讀取到的 chunk 的資料進行記憶體排序(快排)
- 通過遞歸, 對排序後的 chunk 進行二路歸并
- 将歸并後的資料寫入到檔案中
從協程的角度來看待整個流程:
- goroutine1 進行 chunk讀取, 寫入 channel
- goroutine2 進行 記憶體排序, 排序後資料寫入 channel
- goroutine3 進行 二路歸并, 歸并的過程中, 資料不斷寫入到 channel
- goroutine4 進行 檔案寫入, 将 channel 中的資料寫入到檔案
注意這裡:
- goroutine1-4 可能是多個協程, 可能某一時刻是同一個協程, go 底層會有任務隊列(runq)進行協程排程
- 可以通過資料流的角度來思考這個問題: 資料是怎麼在 檔案/channel/協程 之間進行流轉的.
- 測試很重要, 示例中就先使用了small 資料進行測試, 檢查程式的正确性, 再調整到 large 資料
- 日志很重要, 可以幫助我們擷取到程式的更多資訊, 比如 debug/性能調優
關于性能:
- 并行 最終受限于 cpu 核數, 即 N 核cpu最多同時運作 N 個線程
- 協程間的搶占會帶來性能損耗, 同理還有 程序/線程 的排程
- 協程+channel的機制友善并發程式設計擴充, 相對于單機記憶體操作自然性能要低一些
package main
import (
"io"
"encoding/binary"
"os"
"bufio"
"sort"
"fmt"
"time"
)
var startTime time.Time
func main() {
fileIn := "small.in"
fileOut := "small.out"
p := createPipeline(fileIn, 512, 4) // 按照cpu核數設定節點數, 減少協程間搶占帶來性能損耗
writeToFile(p, fileOut)
printFile(fileOut, -1)
startTime = time.Now() // 添加日志
fileIn = "large.in"
fileOut = "large.out"
p = createPipeline(fileIn, 800000000, 4)
writeToFile(p, fileOut)
printFile(fileOut, 100)
}
func createPipeline(filename string, fileSize, chunkCount int) <-chan int {
chunkSize := fileSize / chunkCount // fileSize/8/chunkCount = int/chunk, 這裡簡單處理, 設定為可以整除的參數
sortResults := []<-chan int{} // 傳遞給 mergeN() 的已排序切片
for i := 0; i < chunkCount; i++ {
file, err := os.Open(filename) // 為什麼沒有用 defer file.close() ? 因為需要在函數外去關閉掉, 比較麻煩, 這裡暫時省略
if err != nil {
panic(err)
}
file.Seek(int64(i*chunkSize), 0) // 定位到每個 chunk 的起始位置
s := readerChunk(bufio.NewReader(file), chunkSize)
sortResults = append(sortResults, memSort(s))
}
return mergeN(sortResults...)
}
func writeToFile(ch <-chan int, filename string) {
file, err := os.Create(filename)
if err != nil {
panic(err)
}
defer file.Close()
writer := bufio.NewWriter(file)
defer writer.Flush() // defer 是 LIFO
for v := range ch {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(v))
writer.Write(buffer)
}
}
func printFile(filename string, count int) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
p := readerChunk(file, -1) // -1 的作用展現出來了, 這裡就可以讀取全部檔案
if count == -1 {
for v := range p {
fmt.Println(v)
}
} else {
n := 0
for v := range p {
fmt.Println(v)
n++
if n >= count {
break
}
}
}
}
// 遞歸解決兩兩歸并
func mergeN(ins ...<-chan int) <-chan int {
if len(ins) == 1 {
return ins[0]
}
m := len(ins) / 2
// ins[0..m) + ins[m..end)
return merge(mergeN(ins[:m]...),
mergeN(ins[m:]...))
}
func merge(in1, in2 <-chan int) <-chan int {
out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
go func() {
// 歸并的過程要處理某個通道可能沒有資料的情況, 代碼非常值得一讀
v1, ok1 := <-in1
v2, ok2 := <-in2
for ok1 || ok2 {
if !ok2 || (ok1 && v1 <= v2) {
out <- v1
v1, ok1 = <-in1
} else {
out <- v2
v2, ok2 = <-in2
}
}
close(out)
fmt.Println("merge done: ", time.Now().Sub(startTime))
}()
return out
}
// 添加 chunk 來讀取檔案,
func readerChunk(reader io.Reader, chunkSize int) <-chan int {
out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
bytesRead := 0
go func() {
buffer := make([]byte, 8) // int: 64bit -> 8byte
for {
n, err := reader.Read(buffer)
bytesRead += n
if n > 0 { // 可能資料不足 8byte
v := int(binary.BigEndian.Uint64(buffer))
out <- v
}
// 使用 -1 表示不添加 chunk 大小限制
// 使用是 >=, 讀取區間是 [0, chunkSize)
if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) {
break
}
}
close(out)
}()
return out
}
func memSort(in <-chan int) <-chan int {
out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
go func() {
// read into memory
a := []int{}
for v := range in {
a = append(a, v)
}
fmt.Println("read into memory: ", time.Now().Sub(startTime))
// sort
sort.Ints(a)
fmt.Println("sort done: ", time.Now().Sub(startTime))
// output
for _, v := range a {
out <- v
}
close(out)
}()
return out
}
網絡版的設計:
網絡版隻是在完整外排序的版本上, 新增了從網絡讀寫資料, 并相應修改 pipeline 即可
package main
import (
"net"
"bufio"
"encoding/binary"
"os"
"strconv"
"time"
"fmt"
"sort"
"io"
)
var startTime time.Time
func main() {
startTime = time.Now()
// 測試 net server
//netPipeline("small.in", 512, 4) // 按照cpu核數設定節點數, 減少協程間搶占帶來性能損耗
//time.Sleep(time.Hour)
// 測試 small
p := netPipeline("small.in", 512, 4)
writeToFile(p, "small.out")
printFile("small.out", -1)
// 測試 large
//p := netPipeline("small.in", 512, 4)
//writeToFile(p, "small.out")
//printFile("small.out", -1)
}
func netPipeline(filename string, fileSize, chunkCount int) <-chan int {
chunkSize := fileSize / chunkCount // fileSize/8/chunkCount = int/chunk, 這裡簡單處理, 設定為可以整除的參數
sortAddr := []string{}
for i := 0; i < chunkCount; i++ {
file, err := os.Open(filename) // 為什麼沒有用 defer file.close() ? 因為需要在函數外去關閉掉, 比較麻煩, 這裡暫時省略
if err != nil {
panic(err)
}
file.Seek(int64(i*chunkSize), 0) // 定位到每個 chunk 的起始位置
s := readerChunk(bufio.NewReader(file), chunkSize)
addr := ":" + strconv.Itoa(7000 + i) // 設定不同端口号來設定不同的 server
netSink(addr, memSort(s)) // 注意 pipeline 的設計思路是建立執行流程, 真正開始執行在 pipeline 建立之後
sortAddr = append(sortAddr, addr)
}
//return nil // 測試 net server
sortResults := []<-chan int{}
for _,addr := range sortAddr {
sortResults = append(sortResults, netSource(addr))
}
return mergeN(sortResults...)
}
func writeToFile(ch <-chan int, filename string) {
file, err := os.Create(filename)
if err != nil {
panic(err)
}
defer file.Close()
writer := bufio.NewWriter(file)
defer writer.Flush() // defer 是 LIFO
for v := range ch {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(v))
writer.Write(buffer)
}
}
func printFile(filename string, count int) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
p := readerChunk(file, -1) // -1 的作用展現出來了, 這裡就可以讀取全部檔案
if count == -1 {
for v := range p {
fmt.Println(v)
}
} else {
n := 0
for v := range p {
fmt.Println(v)
n++
if n >= count {
break
}
}
}
}
func netSink(addr string, in <-chan int) {
listener, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
go func() {
defer listener.Close()
conn, err := listener.Accept() // 通常 accept() 要放到 for{} 中不斷的接收請求, 這裡隻處理一次就關閉了
if err != nil {
panic(err)
}
defer conn.Close()
writer := bufio.NewWriter(conn)
defer writer.Flush() // 别忘了 flush buffer
for v := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(v))
writer.Write(buffer)
}
}()
}
func netSource(addr string) <-chan int {
out := make(chan int)
go func() {
conn, err := net.Dial("tcp", addr)
if err != nil {
panic(err)
}
defer conn.Close()
r := readerChunk(bufio.NewReader(conn), -1)
for v := range r {
out <- v
}
close(out)
}()
return out
}
// 遞歸解決兩兩歸并
func mergeN(ins ...<-chan int) <-chan int {
if len(ins) == 1 {
return ins[0]
}
m := len(ins) / 2
// ins[0..m) + ins[m..end)
return merge(mergeN(ins[:m]...),
mergeN(ins[m:]...))
}
func merge(in1, in2 <-chan int) <-chan int {
out := make(chan int, 1024)
go func() {
// 歸并的過程要處理某個通道可能沒有資料的情況, 代碼非常值得一讀
v1, ok1 := <-in1
v2, ok2 := <-in2
for ok1 || ok2 {
if !ok2 || (ok1 && v1 <= v2) {
out <- v1
v1, ok1 = <-in1
} else {
out <- v2
v2, ok2 = <-in2
}
}
close(out)
fmt.Println("merge done: ", time.Now().Sub(startTime))
}()
return out
}
// 添加 chunk 來讀取檔案,
func readerChunk(reader io.Reader, chunkSize int) <-chan int {
out := make(chan int, 1024) // 性能優化, 給 channel 添加 buffer, 而不是收一個就發一個
bytesRead := 0
go func() {
buffer := make([]byte, 8) // int: 64bit -> 8byte
for {
n, err := reader.Read(buffer)
bytesRead += n
if n > 0 { // 可能資料不足 8byte
v := int(binary.BigEndian.Uint64(buffer))
out <- v
}
// 使用 -1 表示不添加 chunk 大小限制
// 使用是 >=, 讀取區間是 [0, chunkSize)
if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) {
break
}
}
close(out)
}()
return out
}
func memSort(in <-chan int) <-chan int {
out := make(chan int, 1024)
go func() {
// read into memory
a := []int{}
for v := range in {
a = append(a, v)
}
fmt.Println("read into memory: ", time.Now().Sub(startTime))
// sort
sort.Ints(a)
fmt.Println("sort done: ", time.Now().Sub(startTime))
// output
for _, v := range a {
out <- v
}
close(out)
}()
return out
}
寫在最後
go 的「強制」在程式設計方面感覺優點大于缺點:
- 強制代碼風格: 讀/寫代碼都輕松了不少
- 強制類型檢查: 出錯時的錯誤提示非常友好
書寫過程中, 基本根據編譯器提示, 就可以把大部分 bug 清理掉.
go語言三大特色:
- 面向接口, 比如示例中的 Reader/Writer, 進而可以輕松添加 buffer 進行性能優化
- 函數式, go語言中函數式一等公民
- 并發程式設計: go + channel
再次推薦一下 go, 給想要寫 并發程式設計 的程式汪, 就如 ccmouse大大的教程所說:
感受并發程式設計的樂趣
資源推薦:
- 首推 ccmouse大大的視訊教程:
- 同時推薦看完一本書 go并發程式設計 : 講解并發程式設計相關知識和 go并發程式設計原理 上面非常透徹, 幾個實戰的項目也适合 ccmouse大大推薦的學習方式 -- 不那麼簡單的項目來練手