天天看點

「後端」Go 高性能程式設計技法

作者:架構思考
代碼的穩健、可讀和高效是我們每一個 coder 的共同追求。本文将結合 Go 語言特性,為書寫效率更高的代碼,從常用資料結構、記憶體管理和并發,三個方面給出相關建議。話不多說,讓我們一起學習 Go 高性能程式設計的技法吧。

常用資料結構

1.反射雖好,切莫貪杯

标準庫 reflect 為 Go 語言提供了運作時動态擷取對象的類型和值以及動态建立對象的能力。反射可以幫助抽象和簡化代碼,提高開發效率。

Go 語言标準庫以及很多開源軟體中都使用了 Go 語言的反射能力,例如用于序列化和反序列化的 json、ORM 架構 gorm、xorm 等。

1.1 優先使用 strconv 而不是 fmt

基本資料類型與字元串之間的轉換,優先使用 strconv 而不是 fmt,因為前者性能更佳。

// Bad
for i := 0; i < b.N; i++ {
 s := fmt.Sprint(rand.Int())
}

BenchmarkFmtSprint-4    143 ns/op    2 allocs/op

// Good
for i := 0; i < b.N; i++ {
 s := strconv.Itoa(rand.Int())
}

BenchmarkStrconv-4    64.2 ns/op    1 allocs/op
           

為什麼性能上會有兩倍多的差距,因為 fmt 實作上利用反射來達到範型的效果,在運作時進行類型的動态判斷,是以帶來了一定的性能損耗。

1.2 少量的重複不比反射差

有時,我們需要一些工具函數。比如從 uint64 切片過濾掉指定的元素。

利用反射,我們可以實作一個類型泛化支援擴充的切片過濾函數。

// DeleteSliceElms 從切片中過濾指定元素。注意:不修改原切片。
func DeleteSliceElms(i interface{}, elms ...interface{}) interface{} {
 // 建構 map set。
 m := make(map[interface{}]struct{}, len(elms))
 for _, v := range elms {
  m[v] = struct{}{}
 }
 // 建立新切片,過濾掉指定元素。
 v := reflect.ValueOf(i)
 t := reflect.MakeSlice(reflect.TypeOf(i), 0, v.Len())
 for i := 0; i < v.Len(); i++ {
  if _, ok := m[v.Index(i).Interface()]; !ok {
   t = reflect.Append(t, v.Index(i))
  }
 }
 return t.Interface()
}
           

很多時候,我們可能隻需要操作一個類型的切片,利用反射實作的類型泛化擴充的能力壓根沒用上。退一步說,如果我們真地需要對 uint64 以外類型的切片進行過濾,拷貝一次代碼又何妨呢?可以肯定的是,絕大部份場景,根本不會對所有類型的切片進行過濾,那麼反射帶來好處我們并沒有充分享受,但卻要為其帶來的性能成本買單。

// DeleteU64liceElms 從 []uint64 過濾指定元素。注意:不修改原切片。
func DeleteU64liceElms(i []uint64, elms ...uint64) []uint64 {
 // 建構 map set。
 m := make(map[uint64]struct{}, len(elms))
 for _, v := range elms {
  m[v] = struct{}{}
 }
 // 建立新切片,過濾掉指定元素。
 t := make([]uint64, 0, len(i))
 for _, v := range i {
  if _, ok := m[v]; !ok {
   t = append(t, v)
  }
 }
 return t
}
           

下面看一下二者的性能對比。

func BenchmarkDeleteSliceElms(b *testing.B) {
 slice := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}
 elms := []interface{}{uint64(1), uint64(3), uint64(5), uint64(7), uint64(9)}
 for i := 0; i < b.N; i++ {
  _ = DeleteSliceElms(slice, elms...)
 }
}

func BenchmarkDeleteU64liceElms(b *testing.B) {
 slice := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}
 elms := []uint64{1, 3, 5, 7, 9}
 for i := 0; i < b.N; i++ {
  _ = DeleteU64liceElms(slice, elms...)
 }
}
           

運作上面的基準測試。

go test -bench=. -benchmem main/reflect 
goos: darwin
goarch: amd64
pkg: main/reflect
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkDeleteSliceElms-12              1226868               978.2 ns/op           296 B/op         16 allocs/op
BenchmarkDeleteU64liceElms-12            8249469               145.3 ns/op            80 B/op          1 allocs/op
PASS
ok      main/reflect    3.809s
           

可以看到,反射涉及了額外的類型判斷和大量的記憶體配置設定,導緻其對性能的影響非常明顯。随着切片元素的遞增,每一次判斷元素是否在 map 中,因為 map 的 key 是不确定的類型,會發生變量逃逸,觸發堆記憶體的配置設定。是以,可預見的是當元素數量增加時,性能差異會越來大。

當使用反射時,請問一下自己,我真地需要它嗎?

1.3 慎用 binary.Read 和 binary.Write

binary.Read 和 binary.Write 使用反射并且很慢。如果有需要用到這兩個函數的地方,我們應該手動實作這兩個函數的相關功能,而不是直接去使用它們。

encoding/binary 包實作了數字和位元組序列之間的簡單轉換以及 varints 的編碼和解碼。varints 是一種使用可變位元組表示整數的方法。其中數值本身越小,其所占用的位元組數越少。Protocol Buffers 對整數采用的便是這種編碼方式。

其中數字與位元組序列的轉換可以用如下三個函數:

// Read 從結構化二進制資料 r 讀取到 data。data 必須是指向固定大小值的指針或固定大小值的切片。
func Read(r io.Reader, order ByteOrder, data interface{}) error
// Write 将 data 的二進制表示形式寫入 w。data 必須是固定大小的值或固定大小值的切片,或指向此類資料的指針。
func Write(w io.Writer, order ByteOrder, data interface{}) error
// Size 傳回 Wirte 函數将 v 寫入到 w 中的位元組數。
func Size(v interface{}) int
           

下面以我們熟知的 C 标準庫函數 ntohl() 函數為例,看看 Go 利用 binary 包如何實作。

// Ntohl 将網絡位元組序的 uint32 轉為主機位元組序。
func Ntohl(bys []byte) uint32 {
 r := bytes.NewReader(bys)
 err = binary.Read(buf, binary.BigEndian, &num)
}

// 如将 IP 127.0.0.1 網絡位元組序解析到 uint32
fmt.Println(Ntohl([]byte{0x7f, 0, 0, 0x1})) // 2130706433 <nil>
           

如果我們針對 uint32 類型手動實作一個 ntohl() 呢?

func NtohlNotUseBinary(bys []byte) uint32 {
 return uint32(bys[3]) | uint32(bys[2])<<8 | uint32(bys[1])<<16 | uint32(bys[0])<<24
}

// 如将 IP 127.0.0.1 網絡位元組序解析到 uint32
fmt.Println(NtohlNotUseBinary([]byte{0x7f, 0, 0, 0x1})) // 2130706433
           

該函數也是參考了 encoding/binary 包針對大端位元組序将位元組序列轉為 uint32 類型時的實作。

下面看下剝去反射前後二者的性能差異。

func BenchmarkNtohl(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _, _ = Ntohl([]byte{0x7f, 0, 0, 0x1})
 }
}

func BenchmarkNtohlNotUseBinary(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = NtohlNotUseBinary([]byte{0x7f, 0, 0, 0x1})
 }
}
           

運作上面的基準測試,結果如下:

go test -bench=BenchmarkNtohl.* -benchmem main/reflect
goos: darwin
goarch: amd64
pkg: main/reflect
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkNtohl-12                       13026195                81.96 ns/op           60 B/op          4 allocs/op
BenchmarkNtohlNotUseBinary-12           1000000000               0.2511 ns/op          0 B/op          0 allocs/op
PASS
ok      main/reflect    1.841s
           

可見使用反射實作的 encoding/binary 包的性能相較于針對具體類型實作的版本,性能差異非常大。

2.避免重複的字元串到位元組切片的轉換

不要反複從固定字元串建立位元組 slice,因為重複的切片初始化會帶來性能損耗。相反,請執行一次轉換并捕獲結果。

// Bad
for i := 0; i < b.N; i++ {
 w.Write([]byte("Hello world"))
}

BenchmarkBad-4   50000000   22.2 ns/op

// Good
data := []byte("Hello world")
for i := 0; i < b.N; i++ {
 w.Write(data)
}

BenchmarkGood-4  500000000   3.25 ns/op
           

3.指定容器容量

盡可能指定容器容量,以便為容器預先配置設定記憶體。這将在後續添加元素時減少通過複制來調整容器大小。

3.1 指定 map 容量提示

在盡可能的情況下,在使用 make() 初始化的時候提供容量資訊。

make(map[T1]T2, hint)
           

向 make() 提供容量提示會在初始化時嘗試調整 map 的大小,這将減少在将元素添加到 map 時為 map 重新配置設定記憶體。

注意,與 slice 不同。map capacity 提示并不保證完全的搶占式配置設定,而是用于估計所需的 hashmap bucket 的數量。是以,在将元素添加到 map 時,甚至在指定 map 容量時,仍可能發生配置設定。

// Bad
m := make(map[string]os.FileInfo)

files, _ := ioutil.ReadDir("./files")
for _, f := range files {
    m[f.Name()] = f
}
// m 是在沒有大小提示的情況下建立的; 在運作時可能會有更多配置設定。

// Good
files, _ := ioutil.ReadDir("./files")

m := make(map[string]os.FileInfo, len(files))
for _, f := range files {
    m[f.Name()] = f
}
// m 是有大小提示建立的;在運作時可能會有更少的配置設定。
           

3.2 指定切片容量

在盡可能的情況下,在使用 make() 初始化切片時提供容量資訊,特别是在追加切片時。

make([]T, length, capacity)
           

與 map 不同,slice capacity 不是一個提示:編譯器将為提供給 make() 的 slice 的容量配置設定足夠的記憶體,這意味着後續的 append() 操作将導緻零配置設定(直到 slice 的長度與容量比對,在此之後,任何 append 都可能調整大小以容納其他元素)。

const size = 1000000

// Bad
for n := 0; n < b.N; n++ {
 data := make([]int, 0)
   for k := 0; k < size; k++ {
     data = append(data, k)
  }
}

BenchmarkBad-4    219    5202179 ns/op

// Good
for n := 0; n < b.N; n++ {
 data := make([]int, 0, size)
   for k := 0; k < size; k++ {
     data = append(data, k)
  }
}

BenchmarkGood-4   706    1528934 ns/op
           

執行基準測試:

go test -bench=^BenchmarkJoinStr -benchmem 
BenchmarkJoinStrWithOperator-8    66930670    17.81 ns/op    0 B/op    0 allocs/op
BenchmarkJoinStrWithSprintf-8      7032921    166.0 ns/op    64 B/op   4 allocs/op
           

4.字元串拼接方式的選擇

4.1 行内拼接字元串推薦使用運算符+

行内拼接字元串為了書寫友善快捷,最常用的兩個方法是:

  • 運算符+
  • fmt.Sprintf()

行内字元串的拼接,主要追求的是代碼的簡潔可讀。fmt.Sprintf() 能夠接收不同類型的入參,通過格式化輸出完成字元串的拼接,使用非常友善。但因其底層實作使用了反射,性能上會有所損耗。

運算符 + 隻能簡單地完成字元串之間的拼接,非字元串類型的變量需要單獨做類型轉換。行内拼接字元串不會産生記憶體配置設定,也不涉及類型地動态轉換,是以性能上優于fmt.Sprintf()。

從性能出發,兼顧易用可讀,如果待拼接的變量不涉及類型轉換且數量較少(<=5),行内拼接字元串推薦使用運算符 +,反之使用 fmt.Sprintf()。

下面看下二者的性能對比。

// Good
func BenchmarkJoinStrWithOperator(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  _ = s1 + s2 + s3
 }
}

// Bad
func BenchmarkJoinStrWithSprintf(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  _ = fmt.Sprintf("%s%s%s", s1, s2, s3)
 }
}
           

執行基準測試結果如下:

go test -bench=^BenchmarkJoinStr -benchmem .
BenchmarkJoinStrWithOperator-8    70638928    17.53 ns/op     0 B/op    0 allocs/op
BenchmarkJoinStrWithSprintf-8      7520017    157.2 ns/op    64 B/op    4 allocs/op
           

4.2 非行内拼接字元串推薦使用 strings.Builder

字元串拼接還有其他的方式,比如strings.Join()、strings.Builder、bytes.Buffer和byte[],這幾種不适合行内使用。當待拼接字元串數量較多時可考慮使用。

先看下其性能測試的對比。

func BenchmarkJoinStrWithStringsJoin(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  _ = strings.Join([]string{s1, s2, s3}, "")
 }
}

func BenchmarkJoinStrWithStringsBuilder(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  var builder strings.Builder
  _, _ = builder.WriteString(s1)
  _, _ = builder.WriteString(s2)
  _, _ = builder.WriteString(s3)
 }
}

func BenchmarkJoinStrWithBytesBuffer(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  var buffer bytes.Buffer
  _, _ = buffer.WriteString(s1)
  _, _ = buffer.WriteString(s2)
  _, _ = buffer.WriteString(s3)
 }
}

func BenchmarkJoinStrWithByteSlice(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  var bys []byte
  bys= append(bys, s1...)
  bys= append(bys, s2...)
  _ = append(bys, s3...)
 }
}

func BenchmarkJoinStrWithByteSlicePreAlloc(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  bys:= make([]byte, 0, 9)
  bys= append(bys, s1...)
  bys= append(bys, s2...)
  _ = append(bys, s3...)
 }
}
           

基準測試結果如下:

go test -bench=^BenchmarkJoinStr .
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkJoinStrWithStringsJoin-8               31543916                36.39 ns/op
BenchmarkJoinStrWithStringsBuilder-8            30079785                40.60 ns/op
BenchmarkJoinStrWithBytesBuffer-8               31663521                39.58 ns/op
BenchmarkJoinStrWithByteSlice-8                 30748495                37.34 ns/op
BenchmarkJoinStrWithByteSlicePreAlloc-8         665341896               1.813 ns/op
           

從結果可以看出,strings.Join()、strings.Builder、bytes.Buffer和byte[] 的性能相近。如果結果字元串的長度是可預知的,使用 byte[] 且預先配置設定容量的拼接方式性能最佳。

是以如果對性能要求非常嚴格,或待拼接的字元串數量足夠多時,建議使用 byte[] 預先配置設定容量這種方式。

綜合易用性和性能,一般推薦使用strings.Builder來拼接字元串。

string.Builder也提供了預配置設定記憶體的方式 Grow:

func BenchmarkJoinStrWithStringsBuilderPreAlloc(b *testing.B) {
 s1, s2, s3 := "foo", "bar", "baz"
 for i := 0; i < b.N; i++ {
  var builder strings.Builder
  builder.Grow(9)
  _, _ = builder.WriteString(s1)
  _, _ = builder.WriteString(s2)
  _, _ = builder.WriteString(s3)
 }
}
           

使用了 Grow 優化後的版本的性能測試結果如下。可以看出相較于不預先配置設定空間的方式,性能提升了很多。

BenchmarkJoinStrWithStringsBuilderPreAlloc-8    60079003                20.95 ns/op
           

5.周遊 []struct{} 使用下标而不是 range

Go 中周遊切片或數組有兩種方式,一種是通過下标,一種是 range。二者在功能上沒有差別,但是在性能上會有差別嗎?

5.1 []int

首先看一下周遊基本類型切片時二者的性能差别,以 []int 為例。

// genRandomIntSlice 生成指定長度的随機 []int 切片
func genRandomIntSlice(n int) []int {
 rand.Seed(time.Now().UnixNano())
 nums := make([]int, 0, n)
 for i := 0; i < n; i++ {
  nums = append(nums, rand.Int())
 }
 return nums
}

func BenchmarkIndexIntSlice(b *testing.B) {
 nums := genRandomIntSlice(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for k := 0; k < len(nums); k++ {
   tmp = nums[k]
  }
  _ = tmp
 }
}

func BenchmarkRangeIntSlice(b *testing.B) {
 nums := genRandomIntSlice(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for _, num := range nums {
   tmp = num
  }
  _ = tmp
 }
}
           

運作測試結果如下:

go test -bench=IntSlice$ .
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkIndexIntSlice-8         5043324               236.2 ns/op
BenchmarkRangeIntSlice-8         5076255               239.1 ns/op
           

genRandomIntSlice() 函數用于生成指定長度元素類型為 int 的切片。從最終的結果可以看到,周遊 []int 類型的切片,下标與 range 周遊性能幾乎沒有差別。

5.2 []struct{}

那麼對于稍微複雜一點的 []struct 類型呢?

type Item struct {
 id  int
 val [1024]byte
}

func BenchmarkIndexStructSlice(b *testing.B) {
 var items [1024]Item
 for i := 0; i < b.N; i++ {
  var tmp int
  for j := 0; j < len(items); j++ {
   tmp = items[j].id
  }
  _ = tmp
 }
}

func BenchmarkRangeIndexStructSlice(b *testing.B) {
 var items [1024]Item
 for i := 0; i < b.N; i++ {
  var tmp int
  for k := range items {
   tmp = items[k].id
  }
  _ = tmp
 }
}

func BenchmarkRangeStructSlice(b *testing.B) {
 var items [1024]Item
 for i := 0; i < b.N; i++ {
  var tmp int
  for _, item := range items {
   tmp = item.id
  }
  _ = tmp
 }
}
           

運作測試結果如下:

go test -bench=StructSlice$ .
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkIndexStructSlice-8              5079468               234.9 ns/op
BenchmarkRangeIndexStructSlice-8         5087448               236.2 ns/op
BenchmarkRangeStructSlice-8                38716               32265 ns/op
           

可以看出,兩種通過 index 周遊 []struct 性能沒有差别,但是 range 周遊 []struct 中元素時,性能非常差。

range 隻周遊 []struct 下标時,性能比 range 周遊 []struct 值好很多。從這裡我們應該能夠知道二者性能差别之大的原因。

Item 是一個結構體類型 ,Item 由兩個字段構成,一個類型是 int,一個是類型是 [1024]byte,如果每次周遊 []Item,都會進行一次值拷貝,是以帶來了性能損耗。

此外,因為 range 時擷取的是值拷貝的副本,是以對副本的修改,是不會影響到原切片。

5.3 []*struct

那如果切片中是指向結構體的指針,而不是結構體呢?

// genItems 生成指定長度 []*Item 切片
func genItems(n int) []*Item {
 items := make([]*Item, 0, n)
 for i := 0; i < n; i++ {
  items = append(items, &Item{id: i})
 }
 return items
}

func BenchmarkIndexPointer(b *testing.B) {
 items := genItems(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for k := 0; k < len(items); k++ {
   tmp = items[k].id
  }
  _ = tmp
 }
}

func BenchmarkRangePointer(b *testing.B) {
 items := genItems(1024)
 for i := 0; i < b.N; i++ {
  var tmp int
  for _, item := range items {
   tmp = item.id
  }
  _ = tmp
 }
}
           

執行性能測試結果:

go test -bench=Pointer$ main/perf
goos: windows
goarch: amd64
pkg: main/perf
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkIndexPointer-8           773634              1521 ns/op
BenchmarkRangePointer-8           752077              1514 ns/op
           

切片元素從結構體 Item 替換為指針 *Item 後,for 和 range 的性能幾乎是一樣的。而且使用指針還有另一個好處,可以直接修改指針對應的結構體的值。

5.4 小結

range 在疊代過程中傳回的是元素的拷貝,index 則不存在拷貝。

如果 range 疊代的元素較小,那麼 index 和 range 的性能幾乎一樣,如基本類型的切片 []int。但如果疊代的元素較大,如一個包含很多屬性的 struct 結構體,那麼 index 的性能将顯著地高于 range,有時候甚至會有上千倍的性能差異。對于這種場景,建議使用 index。如果使用 range,建議隻疊代下标,通過下标通路元素,這種使用方式和 index 就沒有差別了。如果想使用 range 同時疊代下标和值,則需要将切片/數組的元素改為指針,才能不影響性能。

記憶體管理

1.使用空結構體節省記憶體

1.1 不占記憶體空間

在 Go 中,我們可以使用 unsafe.Sizeof 計算出一個資料類型執行個體需要占用的位元組數。

package main

import (
 "fmt"
 "unsafe"
)

func main() {
 fmt.Println(unsafe.Sizeof(struct{}{}))
}
           

運作上面的例子将會輸出:

go run main.go
0
           

可以看到,Go 中空結構體 struct{} 是不占用記憶體空間,不像 C/C++ 中空結構體仍占用 1 位元組。

1.2 用法

因為空結構體不占據記憶體空間,是以被廣泛作為各種場景下的占位符使用。一是節省資源,二是空結構體本身就具備很強的語義,即這裡不需要任何值,僅作為占位符,達到的代碼即注釋的效果。

1.2.1 實作集合(Set)

Go 語言标準庫沒有提供 Set 的實作,通常使用 map 來代替。事實上,對于集合來說,隻需要 map 的鍵,而不需要值。即使是将值設定為 bool 類型,也會多占據 1 個位元組,那假設 map 中有一百萬條資料,就會浪費 1MB 的空間。

是以呢,将 map 作為集合(Set)使用時,可以将值類型定義為空結構體,僅作為占位符使用即可。

type Set map[string]struct{}

func (s Set) Has(key string) bool {
 _, ok := s[key]
 return ok
}

func (s Set) Add(key string) {
 s[key] = struct{}{}
}

func (s Set) Delete(key string) {
 delete(s, key)
}

func main() {
 s := make(Set)
 s.Add("foo")
 s.Add("bar")
 fmt.Println(s.Has("foo"))
 fmt.Println(s.Has("bar"))
}
           

如果想使用 Set 的完整功能,如初始化(通過切片建構一個 Set)、Add、Del、Clear、Contains 等操作,可以使用開源庫 golang-set。

1.2.2 不發送資料的信道

func worker(ch chan struct{}) {
 <-ch
 fmt.Println("do something")
}

func main() {
 ch := make(chan struct{})
 go worker(ch)
 ch <- struct{}{}
 close(ch)
}
           

有時候使用 channel 不需要發送任何的資料,隻用來通知子協程(goroutine)執行任務,或隻用來控制協程的并發。這種情況下,使用空結構體作為占位符就非常合适了。

1.2.3 僅包含方法的結構體

type Door struct{}

func (d Door) Open() {
 fmt.Println("Open the door")
}

func (d Door) Close() {
 fmt.Println("Close the door")
}
           

在部分場景下,結構體隻包含方法,不包含任何的字段。例如上面例子中的 Door,在這種情況下,Door 事實上可以用任何的資料結構替代。

type Door int
type Door bool
           

無論是 int 還是 bool 都會浪費額外的記憶體,是以呢,這種情況下,聲明為空結構體最合适。

2. struct 布局要考慮記憶體對齊

2.1 為什麼需要記憶體對齊

CPU 通路記憶體時,并不是逐個位元組通路,而是以字長(word size)為機關通路。比如 32 位的 CPU ,字長為 4 位元組,那麼 CPU 通路記憶體的機關也是 4 位元組。

這麼設計的目的,是減少 CPU 通路記憶體的次數,加大 CPU 通路記憶體的吞吐量。比如同樣讀取 8 個位元組的資料,一次讀取 4 個位元組那麼隻需要讀取 2 次。

CPU 始終以字長通路記憶體,如果不進行記憶體對齊,很可能增加 CPU 通路記憶體的次數,例如:

「後端」Go 高性能程式設計技法

變量 a、b 各占據 3 位元組的空間,記憶體對齊後,a、b 占據 4 位元組空間,CPU 讀取 b 變量的值隻需要進行一次記憶體通路。如果不進行記憶體對齊,CPU 讀取 b 變量的值需要進行 2 次記憶體通路。第一次通路得到 b 變量的第 1 個位元組,第二次通路得到 b 變量的後兩個位元組。

從這個例子中也可以看到,記憶體對齊對實作變量的原子性操作也是有好處的,每次記憶體通路是原子的,如果變量的大小不超過字長,那麼記憶體對齊後,對該變量的通路就是原子的,這個特性在并發場景下至關重要。

簡言之:合理的記憶體對齊可以提高記憶體讀寫的性能,并且便于實作變量操作的原子性。

2.2 Go 記憶體對齊規則

編譯器一般為了減少 CPU 訪存指令周期,提高記憶體的通路效率,會對變量進行記憶體對齊。Go 作為一門追求高性能的背景程式設計語言,當然也不例外。

Go Language Specification 中 Size and alignment guarantees 描述了記憶體對齊的規則。

1.For a variable x of any type: unsafe.Alignof(x) is at least 1. 2.For a variable x of struct type: unsafe.Alignof(x) is the largest of all the values unsafe.Alignof(x.f) for each field f of x, but at least 1. 3.For a variable x of array type: unsafe.Alignof(x) is the same as the alignment of a variable of the array's element type.
  • 對于任意類型的變量 x ,unsafe.Alignof(x) 至少為 1。
  • 對于結構體類型的變量 x,計算 x 每一個字段 f 的 unsafe.Alignof(x.f),unsafe.Alignof(x) 等于其中的最大值。
  • 對于數組類型的變量 x,unsafe.Alignof(x) 等于構成數組的元素類型的對齊系數。

其中函數 unsafe.Alignof 用于擷取變量的對齊系數。對齊系數決定了字段的偏移和變量的大小,兩者必須是對齊系數的整數倍。

2.3 合理的 struct 布局

因為記憶體對齊的存在,合理的 struct 布局可以減少記憶體占用,提高程式性能。

type demo1 struct {
 a int8
 b int16
 c int32
}

type demo2 struct {
 a int8
 c int32
 b int16
}

func main() {
 fmt.Println(unsafe.Sizeof(demo1{})) // 8
 fmt.Println(unsafe.Sizeof(demo2{})) // 12
}
           

可以看到,同樣的字段,因字段排列順序不同,最終會導緻不一樣的結構體大小。

每個字段按照自身的對齊系數來确定在記憶體中的偏移量,一個字段因偏移而浪費的大小也不同。

接下來逐個分析,首先是 demo1:a 是第一個字段,預設是已經對齊的,從第 0 個位置開始占據 1 位元組。b 是第二個字段,對齊系數為 2,是以,必須空出 1 個位元組,偏移量才是 2 的倍數,從第 2 個位置開始占據 2 位元組。c 是第三個字段,對齊倍數為 4,此時,記憶體已經是對齊的,從第 4 個位置開始占據 4 位元組即可。

是以 demo1 的記憶體占用為 8 位元組。

對于 demo2:a 是第一個字段,預設是已經對齊的,從第 0 個位置開始占據 1 位元組。c 是第二個字段,對齊倍數為 4,是以,必須空出 3 個位元組,偏移量才是 4 的倍數,從第 4 個位置開始占據 4 位元組。b 是第三個字段,對齊倍數為 2,從第 8 個位置開始占據 2 位元組。

demo2 的對齊系數由 c 的對齊系數決定,也是 4,是以,demo2 的記憶體占用為 12 位元組。

「後端」Go 高性能程式設計技法

是以,在對記憶體特别敏感的結構體的設計上,我們可以通過調整字段的順序,将字段寬度從小到大由上到下排列,來減少記憶體的占用。

2.4 空結構與空數組對記憶體對齊的影響

空結構與空數組在 Go 中比較特殊。沒有任何字段的空 struct{} 和沒有任何元素的 array 占據的記憶體空間大小為 0。

因為這一點,空 struct{} 或空 array 作為其他 struct 的字段時,一般不需要記憶體對齊。但是有一種情況除外:即當 struct{} 或空 array 作為結構體最後一個字段時,需要記憶體對齊。因為如果有指針指向該字段,傳回的位址将在結構體之外,如果此指針一直存活不釋放對應的記憶體,就會有記憶體洩露的問題(該記憶體不因結構體釋放而釋放)。

type demo3 struct {
 a struct{}
 b int32
}
type demo4 struct {
 b int32
 a struct{}
}

func main() {
 fmt.Println(unsafe.Sizeof(demo3{})) // 4
 fmt.Println(unsafe.Sizeof(demo4{})) // 8
}
           

可以看到,demo3{} 的大小為 4 位元組,與字段 b 占據空間一緻,而 demo4{} 的大小為 8 位元組,即額外填充了 4 位元組的空間。

3.減少逃逸,将變量限制在棧上

變量逃逸一般發生在如下幾種情況:

  • 變量較大
  • 變量大小不确定
  • 變量類型不确定
  • 傳回指針
  • 傳回引用
  • 閉包

知道變量逃逸的原因後,我們可以有意識的控制變量不發生逃逸,将其控制在棧上,減少堆變量的配置設定,降低 GC 成本,提高程式性能。

3.1 小的拷貝好過引用

小的拷貝好過引用,什麼意思呢,就是盡量使用棧變量而不是堆變量。下面舉一個反常識的例子,來證明小的拷貝比在堆上建立引用變量要好。

我們都知道 Go 裡面的 Array 以 pass-by-value 方式傳遞後,再加上其長度不可擴充,考慮到性能我們一般很少使用它。實際上,凡事無絕對。有時使用數組進行拷貝傳遞,比使用切片要好。

// copy/copy.go

const capacity = 1024

func arrayFibonacci() [capacity]int {
 var d [capacity]int
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}

func sliceFibonacci() []int {
 d := make([]int, capacity)
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}
           

下面看一下性能對比。

func BenchmarkArray(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = arrayFibonacci()
 }
}

func BenchmarkSlice(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = sliceFibonacci()
 }
}
           

運作上面的基準測試,将得到如下結果。

go test -bench=. -benchmem -gcflags="-l" main/copy
goos: darwin
goarch: amd64
pkg: main/copy
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkArray-12         692400              1708 ns/op               0 B/op          0 allocs/op
BenchmarkSlice-12         464974              2242 ns/op            8192 B/op          1 allocs/op
PASS
ok      main/copy       3.908s
           

從測試結果可以看出,對數組的拷貝性能卻比使用切片要好。為什麼會這樣呢?

sliceFibonacci() 函數中配置設定的局部變量切片因為要傳回到函數外部,是以發生了逃逸,需要在堆上申請記憶體空間。從測試也過也可以看出,arrayFibonacci() 函數沒有記憶體配置設定,完全在棧上完成數組的建立。這裡說明了對于一些短小的對象,棧上複制的成本遠小于在堆上配置設定和回收操作。

需要注意,運作上面基準測試時,傳遞了禁止内聯的編譯選項 "-l",如果發生内聯,那麼将不會出現變量的逃逸,就不存在堆上配置設定記憶體與回收的操作了,二者将看不出性能差異。

編譯時可以借助選項 -gcflags=-m 檢視編譯器對上面兩個函數的優化決策。

go build  -gcflags=-m copy/copy.go
# command-line-arguments
copy/copy.go:5:6: can inline arrayFibonacci
copy/copy.go:17:6: can inline sliceFibonacci
copy/copy.go:18:11: make([]int, capacity) escapes to heap
           

可以看到,arrayFibonacci() 和 sliceFibonacci() 函數均可内聯。sliceFibonacci() 函數中定義的局部變量切片逃逸到了堆。

那麼多大的變量才算是小變量呢?對 Go 編譯器而言,超過一定大小的局部變量将逃逸到堆上,不同的 Go 版本的大小限制可能不一樣。一般是 <64KB,局部變量将不會逃逸到堆上。

3.2 傳回值 VS 傳回指針

值傳遞會拷貝整個對象,而指針傳遞隻會拷貝位址,指向的對象是同一個。傳回指針可以減少值的拷貝,但是會導緻記憶體配置設定逃逸到堆中,增加垃圾回收(GC)的負擔。在對象頻繁建立和删除的場景下,傳遞指針導緻的 GC 開銷可能會嚴重影響性能。

一般情況下,對于需要修改原對象值,或占用記憶體比較大的結構體,選擇傳回指針。對于隻讀的占用記憶體較小的結構體,直接傳回值能夠獲得更好的性能。

3.3 傳回值使用确定的類型

如果變量類型不确定,那麼将會逃逸到堆上。是以,函數傳回值如果能确定的類型,就不要使用 interface{}。

我們還是以上面斐波那契數列函數為例,看下傳回值為确定類型和 interface{} 的性能差别。

const capacity = 1024

func arrayFibonacci() [capacity]int {
 var d [capacity]int
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}

func arrayFibonacciIfc() interface{} {
 var d [capacity]int
 for i := 0; i < len(d); i++ {
  if i <= 1 {
   d[i] = 1
   continue
  }
  d[i] = d[i-1] + d[i-2]
 }
 return d
}
           
func BenchmarkArray(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = arrayFibonacci()
 }
}

func BenchmarkIfc(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = arrayFibonacciIfc()
 }
}
           

運作上面的基準測試結果如下:

go test -bench=. -benchmem main/copy
goos: darwin
goarch: amd64
pkg: main/copy
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkArray-12         832418              1427 ns/op               0 B/op          0 allocs/op
BenchmarkIfc-12           380626              2861 ns/op            8192 B/op          1 allocs/op
PASS
ok      main/copy       3.742s
           

可見,函數傳回值使用 interface{} 傳回時,編譯器無法确定傳回值的具體類型,導緻傳回值逃逸到堆上。當發生了堆上記憶體的申請與回收時,性能會差一點。

4.sync.Pool 複用對象

4.1 簡介

sync.Pool 是 sync 包下的一個元件,可以作為儲存臨時取還對象的一個“池子”。個人覺得它的名字有一定的誤導性,因為 Pool 裡裝的對象可以被無通知地被回收,可能 sync.Cache 是一個更合适的名字。

sync.Pool 是可伸縮的,同時也是并發安全的,其容量僅受限于記憶體的大小。存放在池中的對象如果不活躍了會被自動清理。

4.2 作用

對于很多需要重複配置設定、回收記憶體的地方,sync.Pool 是一個很好的選擇。頻繁地配置設定、回收記憶體會給 GC 帶來一定的負擔,嚴重的時候會引起 CPU 的毛刺,而 sync.Pool 可以将暫時不用的對象緩存起來,待下次需要的時候直接使用,不用再次經過記憶體配置設定,複用對象的記憶體,減輕 GC 的壓力,提升系統的性能。

一句話總結:用來儲存和複用臨時對象,減少記憶體配置設定,降低 GC 壓力。

4.3 如何使用

sync.Pool 的使用方式非常簡單,隻需要實作 New 函數即可。對象池中沒有對象時,将會調用 New 函數建立。

假設我們有一個“學生”結構體,并複用改結構體對象。

type Student struct {
 Name   string
 Age    int32
 Remark [1024]byte
}

var studentPool = sync.Pool{
    New: func() interface{} { 
        return new(Student) 
    },
}
           

然後調用 Pool 的 Get() 和 Put() 方法來擷取和放回池子中。

stu := studentPool.Get().(*Student)
json.Unmarshal(buf, stu)
studentPool.Put(stu)
           
  • Get() 用于從對象池中擷取對象,因為傳回值是 interface{},是以需要類型轉換。
  • Put() 則是在對象使用完畢後,放回到對象池。

4.4 性能差異

我們以 bytes.Buffer 位元組緩沖器為例,利用 sync.Pool 複用 bytes.Buffer 對象,避免重複建立與回收記憶體,來看看對性能的提升效果。

var bufferPool = sync.Pool{
 New: func() interface{} {
  return &bytes.Buffer{}
 },
}

var data = make([]byte, 10000)

func BenchmarkBufferWithPool(b *testing.B) {
 for n := 0; n < b.N; n++ {
  buf := bufferPool.Get().(*bytes.Buffer)
  buf.Write(data)
  buf.Reset()
  bufferPool.Put(buf)
 }
}

func BenchmarkBuffer(b *testing.B) {
 for n := 0; n < b.N; n++ {
  var buf bytes.Buffer
  buf.Write(data)
 }
}
           

測試結果如下:

go test -bench=. -benchmem main/pool
goos: darwin
goarch: amd64
pkg: main/pool
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkBufferWithPool-12      11987966                97.12 ns/op            0 B/op          0 allocs/op
BenchmarkBuffer-12               1246887              1020 ns/op           10240 B/op          1 allocs/op
PASS
ok      main/pool       3.510s
           

這個例子建立了一個 bytes.Buffer 對象池,每次隻執行 Write 操作,及做一次資料拷貝,耗時幾乎可以忽略。而記憶體配置設定和回收的耗時占比較多,是以對程式整體的性能影響更大。從測試結果也可以看出,使用了 Pool 複用對象,每次操作不再有記憶體配置設定。

4.5 在标準庫中的應用

Go 标準庫也大量使用了 sync.Pool,例如 fmt 和 encoding/json。以 fmt 包為例,我們看下其是如何使用 sync.Pool 的。

我們可以看一下最常用的标準格式化輸出函數 Printf() 函數。

// Printf formats according to a format specifier and writes to standard output.
// It returns the number of bytes written and any write error encountered.
func Printf(format string, a ...interface{}) (n int, err error) {
 return Fprintf(os.Stdout, format, a...)
}
           

繼續看 Fprintf() 的定義。

// Fprintf formats according to a format specifier and writes to w.
// It returns the number of bytes written and any write error encountered.
func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
 p := newPrinter()
 p.doPrintf(format, a)
 n, err = w.Write(p.buf)
 p.free()
 return
}
           

Fprintf() 函數的參數是一個 io.Writer,Printf() 傳的是 os.Stdout,相當于直接輸出到标準輸出。這裡的 newPrinter 用的就是 sync.Pool。

// go version go1.17 darwin/amd64

// pp is used to store a printer's state and is reused with sync.Pool to avoid allocations.
type pp struct {
    buf buffer
    ...
}

var ppFree = sync.Pool{
 New: func() interface{} { return new(pp) },
}

// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
 p := ppFree.Get().(*pp)
 p.panicking = false
 p.erroring = false
 p.wrapErrs = false
 p.fmt.init(&p.buf)
 return p
}

// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
 // Proper usage of a sync.Pool requires each entry to have approximately
 // the same memory cost. To obtain this property when the stored type
 // contains a variably-sized buffer, we add a hard limit on the maximum buffer
 // to place back in the pool.
 //
 // See https://golang.org/issue/23199
 if cap(p.buf) > 64<<10 {
  return
 }

 p.buf = p.buf[:0]
 p.arg = nil
 p.value = reflect.Value{}
 p.wrappedErr = nil
 ppFree.Put(p)
}
           

fmt.Printf() 的調用是非常頻繁的,利用 sync.Pool 複用 pp 對象能夠極大地提升性能,減少記憶體占用,同時降低 GC 壓力。

并發程式設計

1.關于鎖

1.1 無鎖化

加鎖是為了避免在并發環境下,同時通路共享資源産生的安全問題。那麼,在并發環境下,是否必須加鎖?答案是否定的。并非所有的并發都需要加鎖。适當地降低鎖的粒度,甚至采用無鎖化的設計,更能提升并發能力。

無鎖化主要有兩種實作,無鎖資料結構和串行無鎖。

1.1.1 無鎖資料結構

利用硬體支援的原子操作可以實作無鎖的資料結構,原子操作可以在 lock-free 的情況下保證并發安全,并且它的性能也能做到随 CPU 個數的增多而線性擴充。很多語言都提供 CAS 原子操作(如 Go 中的 atomic 包和 C++11 中的 atomic 庫),可以用于實作無鎖資料結構,如無鎖連結清單。

我們以一個簡單的線程安全單向連結清單的插入操作來看下無鎖程式設計和普通加鎖的差別。

package list

import (
 "fmt"
 "sync"
 "sync/atomic"

 "golang.org/x/sync/errgroup"
)

// Node 連結清單節點
type Node struct {
 Value interface{}
 Next  *Node
}

//
// 有鎖單向連結清單的簡單實作
//

// WithLockList 有鎖單向連結清單
type WithLockList struct {
 Head *Node
 mu   sync.Mutex
}

// Push 将元素插入到連結清單的首部
func (l *WithLockList) Push(v interface{}) {
 l.mu.Lock()
 defer l.mu.Unlock()
 n := &Node{
  Value: v,
  Next:  l.Head,
 }
 l.Head = n
}

// String 有鎖連結清單的字元串形式輸出
func (l WithLockList) String() string {
 s := ""
 cur := l.Head
 for {
  if cur == nil {
   break
  }
  if s != "" {
   s += ","
  }
  s += fmt.Sprintf("%v", cur.Value)
  cur = cur.Next
 }
 return s
}

//
// 無鎖單向連結清單的簡單實作
//

// LockFreeList 無鎖單向連結清單
type LockFreeList struct {
 Head atomic.Value
}

// Push 有鎖
func (l *LockFreeList) Push(v interface{}) {
 for {
  head := l.Head.Load()
  headNode, _ := head.(*Node)
  n := &Node{
   Value: v,
   Next:  headNode,
  }
  if l.Head.CompareAndSwap(head, n) {
   break
  }
 }
}

// String 有鎖連結清單的字元串形式輸出
func (l LockFreeList) String() string {
 s := ""
 cur := l.Head.Load().(*Node)
 for {
  if cur == nil {
   break
  }
  if s != "" {
   s += ","
  }
  s += fmt.Sprintf("%v", cur.Value)
  cur = cur.Next
 }
 return s
}
           

上面的實作有幾點需要注意一下:

(1)無鎖單向連結清單實作時在插入時需要進行 CAS 操作,即調用CompareAndSwap()方法進行插入,如果插入失敗則進行 for 循環多次嘗試,直至成功。

(2)為了友善列印連結清單内容,實作一個String()方法周遊連結清單,且使用值作為接收者,避免列印對象指針時無法生效。

If an operand implements method String() string, that method will be invoked to convert the object to a string, which will then be formatted as required by the verb (if any).

我們分别對兩種連結清單做一個并發寫入的操作驗證一下其功能。

package main

import (
 "fmt"
 
 "main/list"
)

// ConcurWriteWithLockList 并發寫入有鎖連結清單
func ConcurWriteWithLockList(l *WithLockList) {
 var g errgroup.Group
 // 10 個協程并發寫傳入連結表
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   l.Push(i)
   return nil
  })
 }
 _ = g.Wait()
}

// ConcurWriteLockFreeList 并發寫入無鎖連結清單
func ConcurWriteLockFreeList(l *LockFreeList) {
 var g errgroup.Group
 // 10 個協程并發寫傳入連結表
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   l.Push(i)
   return nil
  })
 }
 _ = g.Wait()
}

func main() {
 // 并發寫入與周遊列印有鎖連結清單
 l1 := &list.WithLockList{}
 list.ConcurWriteWithLockList(l1)
 fmt.Println(l1)

 // 并發寫入與周遊列印無鎖連結清單
 l2 := &list.LockFreeList{}
 list.ConcurWriteLockFreeList(l2)
 fmt.Println(l2)
}
           

注意,多次運作上面的main()函數的結果可能會不相同,因為并發是無序的。

8,7,6,9,5,4,3,1,2,0
9,8,7,6,5,4,3,2,0,1
           

下面再看一下連結清單 Push 操作的基準測試,對比一下有鎖與無鎖的性能差異。

func BenchmarkWriteWithLockList(b *testing.B) {
 l := &WithLockList{}
 for n := 0; n < b.N; n++ {
  l.Push(n)
 }
}
BenchmarkWriteWithLockList-8    14234166                83.58 ns/op

func BenchmarkWriteLockFreeList(b *testing.B) {
 l := &LockFreeList{}
 for n := 0; n < b.N; n++ {
  l.Push(n)
 }
}
BenchmarkWriteLockFreeList-8    15219405                73.15 ns/op
           

可以看出無鎖版本比有鎖版本性能高一些。

1.1.2 串行無鎖

串行無鎖是一種思想,就是避免對共享資源的并發通路,改為每個并發操作通路自己獨占的資源,達到串行通路資源的效果,來避免使用鎖。不同的場景有不同的實作方式。比如網絡 I/O 場景下将單 Reactor 多線程模型改為主從 Reactor 多線程模型,避免對同一個消息隊列鎖讀取。

這裡我介紹的是背景微服務開發經常遇到的一種情況。我們經常需要并發拉取多方面的資訊,彙聚到一個變量上。那麼此時就存在對同一個變量互斥寫入的情況。比如批量并發拉取使用者資訊寫入到一個 map。此時我們可以将每個協程拉取的結果寫入到一個臨時對象,這樣便将并發地協程與同一個變量解綁,然後再将其彙聚到一起,這樣便可以不用使用鎖。即獨立處理,然後合并。

「後端」Go 高性能程式設計技法

為了模拟上面的情況,簡單地寫個示例程式,對比下性能。

import (
 "sync"

 "golang.org/x/sync/errgroup"
)

// ConcurWriteMapWithLock 有鎖并發寫入 map
func ConcurWriteMapWithLock() map[int]int {
 m := make(map[int]int)
 var mu sync.Mutex
 var g errgroup.Group
 // 10 個協程并發寫入 map
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   mu.Lock()
   defer mu.Unlock()
   m[i] = i * i
   return nil
  })
 }
 _ = g.Wait()
 return m
}

// ConcurWriteMapLockFree 無鎖并發寫入 map
func ConcurWriteMapLockFree() map[int]int {
 m := make(map[int]int)
 // 每個協程獨占一 value
 values := make([]int, 10)
 // 10 個協程并發寫入 map
 var g errgroup.Group
 for i := 0; i < 10; i++ {
  i := i
  g.Go(func() error {
   values[i] = i * i
   return nil
  })
 }
 _ = g.Wait()
 // 彙聚結果到 map
 for i, v := range values {
  m[i] = v
 }
 return m
}
           

看下二者的性能差異:

func BenchmarkConcurWriteMapWithLock(b *testing.B) {
 for n := 0; n < b.N; n++ {
  _ = ConcurWriteMapWithLock()
 }
}
BenchmarkConcurWriteMapWithLock-8         218673              5089 ns/op

func BenchmarkConcurWriteMapLockFree(b *testing.B) {
 for n := 0; n < b.N; n++ {
  _ = ConcurWriteMapLockFree()
 }
}
BenchmarkConcurWriteMapLockFree-8         316635              4048 ns/op
           

1.2 減少鎖競争

如果加鎖無法避免,則可以采用分片的形式,減少對資源加鎖的次數,這樣也可以提高整體的性能。

比如 Golang 優秀的本地緩存元件 bigcache 、go-cache、freecache 都實作了分片功能,每個分片一把鎖,采用分片存儲的方式減少加鎖的次數進而提高整體性能。

以一個簡單的示例,通過對map[uint64]struct{}分片前後并發寫入的對比,來看下減少鎖競争帶來的性能提升。

var (
 num = 1000000
 m0  = make(map[int]struct{}, num)
 mu0 = sync.RWMutex{}
 m1  = make(map[int]struct{}, num)
 mu1 = sync.RWMutex{}
)

// ConWriteMapNoShard 不分片寫入一個 map。
func ConWriteMapNoShard() {
 g := errgroup.Group{}
 for i := 0; i < num; i++ {
  g.Go(func() error {
   mu0.Lock()
   defer mu0.Unlock()
   m0[i] = struct{}{}
   return nil
  })
 }
 _ = g.Wait()
}

// ConWriteMapTwoShard 分片寫入兩個 map。
func ConWriteMapTwoShard() {
 g := errgroup.Group{}
 for i := 0; i < num; i++ {
  g.Go(func() error {
   if i&1 == 0 {
    mu0.Lock()
    defer mu0.Unlock()
    m0[i] = struct{}{}
    return nil
   }
   mu1.Lock()
   defer mu1.Unlock()
   m1[i] = struct{}{}
   return nil
  })
 }
 _ = g.Wait()
}
           

看下二者的性能差異:

func BenchmarkConWriteMapNoShard(b *testing.B) {
 for i := 0; i < b.N; i++ {
  ConWriteMapNoShard()
 }
}
BenchmarkConWriteMapNoShard-12                 3         472063245 ns/op

func BenchmarkConWriteMapTwoShard(b *testing.B) {
 for i := 0; i < b.N; i++ {
  ConWriteMapTwoShard()
 }
}
BenchmarkConWriteMapTwoShard-12                4         310588155 ns/op
           

可以看到,通過對分共享資源的分片處理,減少了鎖競争,能明顯地提高程式的并發性能。可以預見的是,随着分片粒度地變小,性能差距會越來越大。當然,分片粒度不是越小越好。因為每一個分片都要配一把鎖,那麼會帶來很多額外的不必要的開銷。可以選擇一個不太大的值,在性能和花銷上尋找一個平衡。

1.3 優先使用共享鎖而非互斥鎖

如果并發無法做到無鎖化,優先使用共享鎖而非互斥鎖。

所謂互斥鎖,指鎖隻能被一個 Goroutine 獲得。共享鎖指可以同時被多個 Goroutine 獲得的鎖。

Go 标準庫 sync 提供了兩種鎖,互斥鎖(sync.Mutex)和讀寫鎖(sync.RWMutex),讀寫鎖便是共享鎖的一種具體實作。

1.3.1 sync.Mutex

互斥鎖的作用是保證共享資源同一時刻隻能被一個 Goroutine 占用,一個 Goroutine 占用了,其他的 Goroutine 則阻塞等待。

「後端」Go 高性能程式設計技法

sync.Mutex 提供了兩個導出方法用來使用鎖。

Lock()   // 加鎖
Unlock()   // 釋放鎖
           

我們可以通過在通路共享資源前前用 Lock 方法對資源進行上鎖,在通路共享資源後調用 Unlock 方法來釋放鎖,也可以用 defer 語句來保證互斥鎖一定會被解鎖。在一個 Go 協程調用 Lock 方法獲得鎖後,其他請求鎖的協程都會阻塞在 Lock 方法,直到鎖被釋放。

1.3.2 sync.RWMutex

讀寫鎖是一種共享鎖,也稱之為多讀單寫鎖 (multiple readers, single writer lock)。在使用鎖時,對擷取鎖的目的操作做了區分,一種是讀操作,一種是寫操作。因為同一時刻允許多個 Gorouine 擷取讀鎖,是以是一種共享鎖。但寫鎖是互斥的。

一般來說,有如下幾種情況:

  • 讀鎖之間不互斥,沒有寫鎖的情況下,讀鎖是無阻塞的,多個協程可以同時獲得讀鎖。
  • 寫鎖之間是互斥的,存在寫鎖,其他寫鎖阻塞。
  • 寫鎖與讀鎖是互斥的,如果存在讀鎖,寫鎖阻塞,如果存在寫鎖,讀鎖阻塞。
「後端」Go 高性能程式設計技法

sync.RWMutex 提供了五個導出方法用來使用鎖。

Lock()    // 加寫鎖
Unlock()   // 釋放寫鎖
RLock()    // 加讀鎖
RUnlock()   // 釋放讀鎖
RLocker() Locker // 傳回讀鎖,使用 Lock() 和 Unlock() 進行 RLock() 和 RUnlock()
           

讀寫鎖的存在是為了解決讀多寫少時的性能問題,讀場景較多時,讀寫鎖可有效地減少鎖阻塞的時間。

1.3.3 性能對比

大部分業務場景是讀多寫少,是以使用讀寫鎖可有效提高對共享資料的通路效率。最壞的情況,隻有寫請求,那麼讀寫鎖頂多退化成互斥鎖。是以優先使用讀寫鎖而非互斥鎖,可以提高程式的并發性能。

接下來,我們測試三種情景下,互斥鎖和讀寫鎖的性能差異。

  • 讀多寫少(讀占 80%)
  • 讀寫一緻(各占 50%)
  • 讀少寫多(讀占 20%)

首先根據互斥鎖和讀寫鎖分别實作對共享 map 的并發讀寫。

// OpMapWithMutex 使用互斥鎖讀寫 map。
// rpct 為讀操作占比。
func OpMapWithMutex(rpct int) {
 m := make(map[int]struct{})
 mu := sync.Mutex{}
 var wg sync.WaitGroup
 for i := 0; i < 100; i++ {
  i := i
  wg.Add(1)
  go func() {
   defer wg.Done()
   mu.Lock()
   defer mu.Unlock()
   // 寫操作。
   if i >= rpct {
    m[i] = struct{}{}
    time.Sleep(time.Microsecond)
    return
   }
   // 讀操作。
   _ = m[i]
   time.Sleep(time.Microsecond)
  }()
 }
 wg.Wait()
}

// OpMapWithRWMutex 使用讀寫鎖讀寫 map。
// rpct 為讀操作占比。
func OpMapWithRWMutex(rpct int) {
 m := make(map[int]struct{})
 mu := sync.RWMutex{}
 var wg sync.WaitGroup
 for i := 0; i < 100; i++ {
  i := i
  wg.Add(1)
  go func() {
   defer wg.Done()
   // 寫操作。
   if i >= rpct {
    mu.Lock()
    defer mu.Unlock()
    m[i] = struct{}{}
    time.Sleep(time.Microsecond)
    return
   }
   // 讀操作。
   mu.RLock()
   defer mu.RUnlock()
   _ = m[i]
   time.Sleep(time.Microsecond)
  }()
 }
 wg.Wait()
}
           

入參 rpct 用來調節讀操作的占比,來模拟讀寫占比不同的場景。rpct 設為 80 表示讀多寫少(讀占 80%),rpct 設為 50 表示讀寫一緻(各占 50%),rpct 設為 20 表示讀少寫多(讀占 20%)。

func BenchmarkMutexReadMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithMutex(80)
 }
}

func BenchmarkRWMutexReadMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithRWMutex(80)
 }
}

func BenchmarkMutexRWEqual(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithMutex(50)
 }
}

func BenchmarkRWMutexRWEqual(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithRWMutex(50)
 }
}

func BenchmarkMutexWriteMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithMutex(20)
 }
}

func BenchmarkRWMutexWriteMore(b *testing.B) {
 for i := 0; i < b.N; i++ {
  OpMapWithRWMutex(20)
 }
}
           

執行目前包下的所有基準測試,結果如下:

dablelv@DABLELV-MB0 mutex % go test -bench=.
goos: darwin
goarch: amd64
pkg: main/mutex
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkMutexReadMore-12                   2462            485917 ns/op
BenchmarkRWMutexReadMore-12                 8074            145690 ns/op
BenchmarkMutexRWEqual-12                    2406            498673 ns/op
BenchmarkRWMutexRWEqual-12                  4124            303693 ns/op
BenchmarkMutexWriteMore-12                  1906            532350 ns/op
BenchmarkRWMutexWriteMore-12                2462            432386 ns/op
PASS
ok      main/mutex      9.532s
           

可見讀多寫少的場景,使用讀寫鎖并發性能會更優。可以預見的是如果寫占比更低,那麼讀寫鎖帶的并發效果會更優。

這裡需要注意的是,因為每次讀寫 map 的操作耗時很短,是以每次睡眠一微秒(百萬分之一秒)來增加耗時,不然對共享資源的通路耗時,小于鎖處理的本身耗時,那麼使用讀寫鎖帶來的性能優化效果将變得不那麼明顯,甚至會降低性能。

2.限制協程數量

2.1 協程數過多的問題

2.1.1 程式崩潰

Go 程(goroutine)是由 Go 運作時管理的輕量級線程。通過它我們可以輕松實作并發程式設計。但是當我們無限開辟協程時,将會遇到緻命的問題。

func main() {
 var wg sync.WaitGroup
 for i := 0; i < math.MaxInt32; i++ {
  wg.Add(1)
  go func(i int) {
   defer wg.Done()
   fmt.Println(i)
   time.Sleep(time.Second)
  }(i)
 }
 wg.Wait()
}
           

這個例子實作了 math.MaxInt32 個協程的并發,2^31 - 1 約為 20 億個,每個協程内部幾乎沒有做什麼事情。正常的情況下呢,這個程式會亂序輸出 0 ~ 2^31-1 個數字。

程式會像預期的那樣順利的運作嗎?

go run main.go
...
108668
1142025
panic: too many concurrent operations on a single file or socket (max 1048575)

goroutine 1158408 [running]:
internal/poll.(*fdMutex).rwlock(0xc0000ae060, 0x0)
        /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x11b
internal/poll.(*FD).writeLock(...)
        /usr/local/go/src/internal/poll/fd_mutex.go:239
internal/poll.(*FD).Write(0xc0000ae060, {0xc12cadf690, 0x8, 0x8})
        /usr/local/go/src/internal/poll/fd_unix.go:262 +0x72
os.(*File).write(...)
        /usr/local/go/src/os/file_posix.go:49
os.(*File).Write(0xc0000ac008, {0xc12cadf690, 0x1, 0xc12ea62f50})
        /usr/local/go/src/os/file.go:176 +0x65
fmt.Fprintln({0x10c00e0, 0xc0000ac008}, {0xc12ea62f90, 0x1, 0x1})
        /usr/local/go/src/fmt/print.go:265 +0x75
fmt.Println(...)
        /usr/local/go/src/fmt/print.go:274
main.main.func1(0x0)
        /Users/dablelv/work/code/test/main.go:16 +0x8f
...
           

運作的結果是程式直接崩潰了,關鍵的報錯資訊是:

panic: too many concurrent operations on a single file or socket (max 1048575)
           

對單個 file/socket 的并發操作個數超過了系統上限,這個報錯是 fmt.Printf 函數引起的,fmt.Printf 将格式化後的字元串列印到螢幕,即标準輸出。在 Linux 系統中,标準輸出也可以視為檔案,核心(Kernel)利用檔案描述符(File Descriptor)來通路檔案,标準輸出的檔案描述符為 1,錯誤輸出檔案描述符為 2,标準輸入的檔案描述符為 0。

簡而言之,系統的資源被耗盡了。

那如果我們将 fmt.Printf 這行代碼去掉呢?那程式很可能會因為記憶體不足而崩潰。這一點更好了解,每個協程至少需要消耗 2KB 的空間,那麼假設計算機的記憶體是 4GB,那麼至多允許 4GB/2KB = 1M 個協程同時存在。那如果協程中還存在着其他需要配置設定記憶體的操作,那麼允許并發執行的協程将會數量級地減少。

2.1.2 協程的代價

前面的例子過于極端,一般情況下程式也不會無限開辟協程,旨在說明協程數量是有限制的,不能無限開辟。

如果我們開辟很多協程,但不會導緻程式崩潰,可以嗎?如果真要這麼做的話,我們應該清楚地知道,協程雖然輕量,但仍有開銷。

Go 的開銷主要是三個方面:建立(占用記憶體)、排程(增加排程器負擔)和删除(增加 GC 壓力)。

  • 記憶體開銷

空間上,一個 Go 程占用約 2K 的記憶體,在源碼 src/runtime/runtime2.go裡面,我們可以找到 Go 程的結構定義type g struct。

  • 排程開銷

時間上,協程排程也會有 CPU 開銷。我們可以利用runntime.Gosched()讓目前協程主動讓出 CPU 去執行另外一個協程,下面看一下協程之間切換的耗時。

const NUM = 10000

func cal() {
 for i := 0; i < NUM; i++ {
  runtime.Gosched()
 }
}

func main() {
 // 隻設定一個 Processor
 runtime.GOMAXPROCS(1)
 start := time.Now().UnixNano()
 go cal()
 for i := 0; i < NUM; i++ {
  runtime.Gosched()
 }
 end := time.Now().UnixNano()
 fmt.Printf("total %vns per %vns", end-start, (end-start)/NUM)
}
           

運作輸出:

total 997200ns per 99ns
           

可見一次協程的切換,耗時大概在 100ns,相對于線程的微秒級耗時切換,性能表現非常優秀,但是仍有開銷。

  • GC 開銷 建立 Go 程到運作結束,占用的記憶體資源是需要由 GC 來回收,如果無休止地建立大量 Go 程後,勢必會造成對 GC 的壓力。
package main

import (
 "fmt"
 "runtime"
 "runtime/debug"
 "sync"
 "time"
)

func createLargeNumGoroutine(num int, wg *sync.WaitGroup) {
 wg.Add(num)
 for i := 0; i < num; i++ {
  go func() {
   defer wg.Done()
  }()
 }
}

func main() {
 // 隻設定一個 Processor 保證 Go 程串行執行
 runtime.GOMAXPROCS(1)
 // 關閉GC改為手動執行
 debug.SetGCPercent(-1)

 var wg sync.WaitGroup
 createLargeNumGoroutine(1000, &wg)
 wg.Wait()
 t := time.Now()
 runtime.GC() // 手動GC
 cost := time.Since(t)
 fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 1000)

 createLargeNumGoroutine(10000, &wg)
 wg.Wait()
 t = time.Now()
 runtime.GC() // 手動GC
 cost = time.Since(t)
 fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 10000)

 createLargeNumGoroutine(100000, &wg)
 wg.Wait()
 t = time.Now()
 runtime.GC() // 手動GC
 cost = time.Since(t)
 fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 100000)
}
           

運作輸出:

GC cost 0s when goroutine num is 1000
GC cost 2.0027ms when goroutine num is 10000
GC cost 30.9523ms when goroutine num is 100000
           

當建立的 Go 程數量越多,GC 耗時越大。

上面的分析目的是為了盡可能地量化 Goroutine 的開銷。雖然官方宣稱用 Golang 寫并發程式的時候随便起個成千上萬的 Goroutine 毫無壓力,但當我們起十萬、百萬甚至千萬個 Goroutine 呢?Goroutine 輕量的開銷将被放大。

2.2 限制協程數量

系統地資源是有限,協程是有代價的,為了保護程式,提高性能,我們應主動限制并發的協程數量。

可以利用信道 channel 的緩沖區大小來實作。

func main() {
 var wg sync.WaitGroup
 ch := make(chan struct{}, 3)
 for i := 0; i < 10; i++ {
  ch <- struct{}{}
  wg.Add(1)
  go func(i int) {
   defer wg.Done()
   log.Println(i)
   time.Sleep(time.Second)
   <-ch
  }(i)
 }
 wg.Wait()
}
           

上例中建立了緩沖區大小為 3 的 channel,在沒有被接收的情況下,至多發送 3 個消息則被阻塞。開啟協程前,調用ch <- struct{}{},若緩存區滿,則阻塞。協程任務結束,調用 <-ch 釋放緩沖區。

sync.WaitGroup 并不是必須的,例如 Http 服務,每個請求天然是并發的,此時使用 channel 控制并發處理的任務數量,就不需要 sync.WaitGroup。

運作結果如下:

2022/03/06 20:37:02 0
2022/03/06 20:37:02 2
2022/03/06 20:37:02 1
2022/03/06 20:37:03 3
2022/03/06 20:37:03 4
2022/03/06 20:37:03 5
2022/03/06 20:37:04 6
2022/03/06 20:37:04 7
2022/03/06 20:37:04 8
2022/03/06 20:37:05 9
           

從日志中可以很容易看到,每秒鐘隻并發執行了 3 個任務,達到了協程并發控制的目的。

2.3 協程池化

上面的例子隻是簡單地限制了協程開辟的數量。在此基礎之上,基于對象複用的思想,我們可以重複利用已開辟的協程,避免協程的重複建立銷毀,達到池化的效果。

協程池化,我們可以自己寫一個協程池,但不推薦這麼做。因為已經有成熟的開源庫可供使用,無需再重複造輪子。目前有很多第三方庫實作了協程池,可以很友善地用來控制協程的并發數量,比較受歡迎的有:

  • Jeffail/tunny
  • panjf2000/ants

下面以 panjf2000/ants 為例,簡單介紹其使用。

ants 是一個簡單易用的高性能 Goroutine 池,實作了對大規模 Goroutine 的排程管理和複用,允許使用者在開發并發程式的時候限制 Goroutine 數量,複用協程,達到更高效執行任務的效果。

package main

import (
 "fmt"
 "time"

 "github.com/panjf2000/ants"
)

func main() {
 // Use the common pool
 for i := 0; i < 10; i++ {
  i := i
  ants.Submit(func() {
   fmt.Println(i)
  })
 }
 time.Sleep(time.Second)
}
           

使用 ants,我們簡單地使用其預設的協程池,直接将任務送出并發執行。預設協程池的預設容量 math.MaxInt32。

如果自定義協程池容量大小,可以調用 NewPool 方法來執行個體化具有給定容量的池,如下所示:

// Set 10000 the size of goroutine pool
p, _ := ants.NewPool(10000)
           

2.4 小結

Golang 為并發而生。Goroutine 是由 Go 運作時管理的輕量級線程,通過它我們可以輕松實作并發程式設計。Go 雖然輕量,但天下沒有免費的午餐,無休止地開辟大量 Go 程勢必會帶來性能影響,甚至程式崩潰。是以,我們應盡可能的控制協程數量,如果有需要,請複用它。

3.使用 sync.Once 避免重複執行

3.1 簡介

sync.Once 是 Go 标準庫提供的使函數隻執行一次的實作,常應用于單例模式,例如初始化配置、保持資料庫連接配接等。作用與 init 函數類似,但有差別。

  • init 函數是當所在的 package 首次被加載時執行,若遲遲未被使用,則既浪費了記憶體,又延長了程式加載時間。
  • sync.Once 可以在代碼的任意位置初始化和調用,是以可以延遲到使用時再執行,并發場景下是線程安全的。

在多數情況下,sync.Once 被用于控制變量的初始化,這個變量的讀寫滿足如下三個條件:

  • 當且僅當第一次通路某個變量時,進行初始化(寫);
  • 變量初始化過程中,所有讀都被阻塞,直到初始化完成;
  • 變量僅初始化一次,初始化完成後駐留在記憶體裡。

3.2 原理

sync.Once 用來保證函數隻執行一次。要達到這個效果,需要做到兩點:

  • 計數器,統計函數執行次數;
  • 線程安全,保障在多 Go 程的情況下,函數仍然隻執行一次,比如鎖。

3.2.1 源碼

下面看一下 sync.Once 結構,其有兩個變量。使用 done 統計函數執行次數,使用鎖 m 實作線程安全。果不其然,和上面的猜想一緻。

// Once is an object that will perform exactly one action.
//
// A Once must not be copied after first use.
type Once struct {
 // done indicates whether the action has been performed.
 // It is first in the struct because it is used in the hot path.
 // The hot path is inlined at every call site.
 // Placing done first allows more compact instructions on some architectures (amd64/386),
 // and fewer instructions (to calculate offset) on other architectures.
 done uint32
 m    Mutex
}
           

sync.Once 僅提供了一個導出方法 Do(),參數 f 是隻會被執行一次的函數,一般為對象初始化函數。

// go version go1.17 darwin/amd64

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
//  var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
//  config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
 // Note: Here is an incorrect implementation of Do:
 //
 // if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
 //  f()
 // }
 //
 // Do guarantees that when it returns, f has finished.
 // This implementation would not implement that guarantee:
 // given two simultaneous calls, the winner of the cas would
 // call f, and the second would return immediately, without
 // waiting for the first's call to f to complete.
 // This is why the slow path falls back to a mutex, and why
 // the atomic.StoreUint32 must be delayed until after f returns.

 if atomic.LoadUint32(&o.done) == 0 {
  // Outlined slow-path to allow inlining of the fast-path.
  o.doSlow(f)
 }
}

func (o *Once) doSlow(f func()) {
 o.m.Lock()
 defer o.m.Unlock()
 if o.done == 0 {
  defer atomic.StoreUint32(&o.done, 1)
  f()
 }
}
           

抛去大段的注釋,可以看到 sync.Once 實作非常簡潔。Do() 函數中,通過對成員變量 done 的判斷,來決定是否執行傳入的任務函數。執行任務函數前,通過鎖保證任務函數的執行和 done 的修改是一個互斥操作。在執行任務函數前,對 done 做一個二次判斷,來保證任務函數隻會被執行一次,done 隻會被修改一次。

3.2.2 done 為什麼是第一個字段

從字段 done 前有一段注釋,說明了done 為什麼是第一個字段。

done 在熱路徑中,done 放在第一個字段,能夠減少 CPU 指令,也就是說,這樣做能夠提升性能。

熱路徑(hot path)是程式非常頻繁執行的一系列指令,sync.Once 絕大部分場景都會通路 o.done,在熱路徑上是比較好了解的。如果 hot path 編譯後的機器碼指令更少,更直接,必然是能夠提升性能的。

為什麼放在第一個字段就能夠減少指令呢?因為結構體第一個字段的位址和結構體的指針是相同的,如果是第一個字段,直接對結構體的指針解引用即可。如果是其他的字段,除了結構體指針外,還需要計算與第一個值的偏移(calculate offset)。在機器碼中,偏移量是随指令傳遞的附加值,CPU 需要做一次偏移值與指針的加法運算,才能擷取要通路的值的位址。因為,通路第一個字段的機器代碼更緊湊,速度更快。

參考 What does “hot path” mean in the context of sync.Once? - StackOverflow

3.3 性能差異

我們以一個簡單示例,來說明使用 sync.Once 保證函數隻會被執行一次和多次執行,二者的性能差異。

考慮一個簡單的場景,函數 ReadConfig 需要讀取環境變量,并轉換為對應的配置。環境變量在程式執行前已經确定,執行過程中不會發生改變。ReadConfig 可能會被多個協程并發調用,為了提升性能(減少執行時間和記憶體占用),使用 sync.Once 是一個比較好的方式。

type Config struct {
 GoRoot string
 GoPath string
}

var (
 once   sync.Once
 config *Config
)

func ReadConfigWithOnce() *Config {
 once.Do(func() {
  config = &Config{
   GoRoot: os.Getenv("GOROOT"),
   GoPath: os.Getenv("GOPATH"),
  }
 })
 return config
}

func ReadConfig() *Config {
 return &Config{
  GoRoot: os.Getenv("GOROOT"),
  GoPath: os.Getenv("GOPATH"),
 }
}
           

我們看下二者的性能差異。

func BenchmarkReadConfigWithOnce(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = ReadConfigWithOnce()
 }
}

func BenchmarkReadConfig(b *testing.B) {
 for i := 0; i < b.N; i++ {
  _ = ReadConfig()
 }
}
           

執行測試結果如下:

go test -bench=. main/once
goos: darwin
goarch: amd64
pkg: main/once
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkReadConfigWithOnce-12          670438965                1.732 ns/op
BenchmarkReadConfig-12                  13339154                87.46 ns/op
PASS
ok      main/once       3.006s
           

sync.Once 中保證了 Config 初始化函數僅執行了一次,避免了多次重複初始化,在并發環境下很有用。

4.使用 sync.Cond 通知協程

4.1 簡介

sync.Cond 是基于互斥鎖/讀寫鎖實作的條件變量,用來協調想要通路共享資源的那些 Goroutine,當共享資源的狀态發生變化的時候,sync.Cond 可以用來通知等待條件發生而阻塞的 Goroutine。

sync.Cond 基于互斥鎖/讀寫鎖,它和互斥鎖的差別是什麼呢?

互斥鎖 sync.Mutex 通常用來保護共享的臨界資源,條件變量 sync.Cond 用來協調想要通路共享資源的 Goroutine。當共享資源的狀态發生變化時,sync.Cond 可以用來通知被阻塞的 Goroutine。

4.2 使用場景

sync.Cond 經常用在多個 Goroutine 等待,一個 Goroutine 通知(事件發生)的場景。如果是一個通知,一個等待,使用互斥鎖或 channel 就能搞定了。

我們想象一個非常簡單的場景:

有一個協程在異步地接收資料,剩下的多個協程必須等待這個協程接收完資料,才能讀取到正确的資料。在這種情況下,如果單純使用 chan 或互斥鎖,那麼隻能有一個協程可以等待,并讀取到資料,沒辦法通知其他的協程也讀取資料。

這個時候,就需要有個全局的變量來标志第一個協程資料是否接受完畢,剩下的協程,反複檢查該變量的值,直到滿足要求。或者建立多個 channel,每個協程阻塞在一個 channel 上,由接收資料的協程在資料接收完畢後,逐個通知。總之,需要額外的複雜度來完成這件事。

Go 語言在标準庫 sync 中内置一個 sync.Cond 用來解決這類問題。

4.3 原理

sync.Cond 内部維護了一個等待隊列,隊列中存放的是所有在等待這個 sync.Cond 的 Go 程,即儲存了一個通知清單。sync.Cond 可以用來喚醒一個或所有因等待條件變量而阻塞的 Go 程,以此來實作多個 Go 程間的同步。

sync.Cond 的定義如下:

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
 noCopy noCopy

 // L is held while observing or changing the condition
 L Locker

 notify  notifyList
 checker copyChecker
}
           

每個 Cond 執行個體都會關聯一個鎖 L(互斥鎖 *Mutex,或讀寫鎖 *RWMutex),當修改條件或者調用 Wait 方法時,必須加鎖。

sync.Cond 的四個成員函數定義如下:

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
 return &Cond{L: l}
}
           

NewCond 建立 Cond 執行個體時,需要關聯一個鎖。

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}
           

Wait 用于阻塞調用者,等待通知。調用 Wait 會自動釋放鎖 c.L,并挂起調用者所在的 goroutine。如果其他協程調用了 Signal 或 Broadcast 喚醒了該協程,那麼 Wait 方法在結束阻塞時,會重新給 c.L 加鎖,并且繼續執行 Wait 後面的代碼。

對條件的檢查,使用了 for !condition() 而非 if,是因為目前協程被喚醒時,條件不一定符合要求,需要再次 Wait 等待下次被喚醒。為了保險起,使用 for 能夠確定條件符合要求後,再執行後續的代碼。

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
 c.checker.check()
 runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
 c.checker.check()
 runtime_notifyListNotifyAll(&c.notify)
}
           

Signal 隻喚醒任意 1 個等待條件變量 c 的 goroutine,無需鎖保護。Broadcast 喚醒所有等待條件變量 c 的 goroutine,無需鎖保護。

4.4 使用示例

我們實作一個簡單的例子,三個協程調用 Wait() 等待,另一個協程調用 Broadcast() 喚醒所有等待的協程。

var done = false

func read(name string, c *sync.Cond) {
 c.L.Lock()
 for !done {
  c.Wait()
 }
 log.Println(name, "starts reading")
 c.L.Unlock()
}

func write(name string, c *sync.Cond) {
 log.Println(name, "starts writing")
 time.Sleep(time.Second)
 done = true
 log.Println(name, "wakes all")
 c.Broadcast()
}

func main() {
 cond := sync.NewCond(&sync.Mutex{})

 go read("reader1", cond)
 go read("reader2", cond)
 go read("reader3", cond)
 write("writer", cond)

 time.Sleep(time.Second * 3)
}
           
  • done 即多個 Goroutine 阻塞等待的條件。
  • read() 調用 Wait() 等待通知,直到 done 為 true。
  • write() 接收資料,接收完成後,将 done 置為 true,調用 Broadcast() 通知所有等待的協程。
  • write() 中的暫停了 1s,一方面是模拟耗時,另一方面是確定前面的 3 個 read 協程都執行到 Wait(),處于等待狀态。main 函數最後暫停了 3s,確定所有操作執行完畢。

運作輸出:

go run main.go
2022/03/07 17:20:09 writer starts writing
2022/03/07 17:20:10 writer wakes all
2022/03/07 17:20:10 reader3 starts reading
2022/03/07 17:20:10 reader1 starts reading
2022/03/07 17:20:10 reader2 starts reading
           

更多關于 sync.Cond 的讨論可參考 How to correctly use sync.Cond? - StackOverflow。

4.5 注意事項

  • sync.Cond 不能被複制

sync.Cond 不能被複制的原因,并不是因為其内部嵌套了 Locker。因為 NewCond 時傳入的 Mutex/RWMutex 指針,對于 Mutex 指針複制是沒有問題的。

主要原因是 sync.Cond 内部是維護着一個 Goroutine 通知隊列 notifyList。如果這個隊列被複制的話,那麼就在并發場景下導緻不同 Goroutine 之間操作的 notifyList.wait、notifyList.notify 并不是同一個,這會導緻出現有些 Goroutine 會一直阻塞。

  • 喚醒順序

從等待隊列中按照順序喚醒,先進入等待隊列,先被喚醒。

  • 調用 Wait() 前要加鎖

調用 Wait() 函數前,需要先獲得條件變量的成員鎖,原因是需要互斥地變更條件變量的等待隊列。在 Wait() 傳回前,會重新上鎖。

文章來源:https://mp.weixin.qq.com/s?__biz=MjM5ODYwMjI2MA==&mid=2649769371&idx=1&sn=2aa88c3a960edeeeac98fbbe741e5207&chksm=beccd6e089bb5ff6e1f9c915f40af886cb00c42668395e1d13bb0fa53d254a712a1cc510eabd&scene=21#wechat_redirect

繼續閱讀