天天看點

深入了解go語言中的Mutext

作者:幹飯人小羽
深入了解go語言中的Mutext

在我們的日常開發中,總會有時候需要對一些變量做并發讀寫,比如 web 應用在同時接到多個請求之後, 需要對一些資源做初始化,而這些資源可能是隻需要初始化一次的,而不是每一個 http 請求都初始化, 在這種情況下,我們需要限制隻能一個協程來做初始化的操作,比如初始化資料庫連接配接等, 這個時候,我們就需要有一種機制,可以限制隻有一個協程來執行這些初始化的代碼。 在 go 語言中,我們可以使用互斥鎖(Mutex)來實作這種功能。

互斥鎖的定義

這裡引用一下維基百科的定義:

互斥鎖(Mutual exclusion,縮寫 Mutex)是一種用于多線程程式設計中,防止兩個線程同時對同一公共資源 (比如全局變量)進行讀寫的機制。該目的通過将代碼切片成一個一個的臨界區域(critical section)達成。 臨界區域指的是一塊對公共資源進行通路的代碼,并非一種機制或是算法。

互斥,顧名思義,也就是隻有一個線程能持有鎖。當然,在 go 中,是隻有一個協程能持有鎖。

下面是一個簡單的例子:

go複制代碼var sum int // 和
var mu sync.Mutex // 互斥鎖

// add 将 sum 加 1
func add() {
    // 擷取鎖,隻能有一個協程擷取到鎖,
    // 其他協程需要阻塞等待鎖釋放才能擷取到鎖。
   mu.Lock()
   // 臨界區域
   sum++
   mu.Unlock()
}

func TestMutex(t *testing.T) {
   // 啟動 1000 個協程
   var wg sync.WaitGroup
   wg.Add(1000)

   for i := 0; i < 1000; i++ {
      go func() {
         // 每個協程裡面調用 add()
         add()
         wg.Done()
      }()
   }

   // 等待所有協程執行完畢
   wg.Wait()
   // 最終 sum 的值應該是 1000
   assert.Equal(t, 1000, sum)
}
           

上面的例子中,我們定義了一個全局變量 sum,用于存儲和,然後定義了一個互斥鎖 mu, 在 add() 函數中,我們使用 mu.Lock() 來加鎖,然後對 sum 進行加 1 操作, 最後使用 mu.Unlock() 來解鎖,這樣就保證了在任意時刻,隻有一個協程能夠對 sum 進行加 1 操作, 進而保證了在并發執行 add() 操作的時候 sum 的值是正确的。

上面這個例子,在我之前的文章中已經作為例子出現過很多次了,這裡不再贅述了。

go Mutex 的基本用法

Mutex 我們一般隻會用到它的兩個方法:

  • Lock:擷取互斥鎖。(隻會有一個協程可以擷取到鎖,通常用在臨界區開始的地方。)
  • Unlock: 釋放互斥鎖。(釋放擷取到的鎖,通常用在臨界區結束的地方。)

Mutex 的模型可以用下圖表示:

深入了解go語言中的Mutext

說明:

  • 同一時刻隻能有一個協程擷取到 Mutex 的使用權,其他協程需要排隊等待(也就是上圖的 G1->G2->Gn)。
  • 擁有鎖的協程從臨界區退出的時候需要使用 Unlock 來釋放鎖,這個時候等待隊列的下一個協程可以擷取到鎖(實際實作比這裡說的複雜很多,後面會細說),進而進入臨界區。
  • 等待的協程會在 Lock 調用處阻塞,Unlock 的時候會使得一個等待的協程解除阻塞的狀态,得以繼續執行。
上面提到的這幾點也是 Mutex 的基本原理。

互斥鎖使用的兩個例子

了解了 go Mutex 基本原理之後,讓我們再來看看 Mutex 的一些使用的例子。

gin Context 中的 Set 方法

一個很常見的場景就是,并發對 map 進行讀寫,熟悉 go 的朋友應該知道,go 中的 map 是不支援并發讀寫的, 如果我們對 map 進行并發讀寫會導緻 panic。

而在 gin 的 Context 結構體中,也有一個 map 類型的字段 Keys,用來在上下文間傳遞鍵值對資料, 是以在通過 Set 來設定鍵值對的時候需要使用 c.mu.Lock() 來先擷取互斥鎖,然後再對 Keys 做設定。

go複制代碼// Set is used to store a new key/value pair exclusively for this context.
// It also lazy initializes  c.Keys if it was not used previously.
func (c *Context) Set(key string, value any) {
    // 擷取鎖
   c.mu.Lock()
    // 如果 Keys 還沒初始化,則進行初始化
   if c.Keys == nil {
      c.Keys = make(map[string]any)
   }

    // 設定鍵值對
   c.Keys[key] = value
    // 釋放鎖
   c.mu.Unlock()
}
           

同樣的,對 Keys 做讀操作的時候也需要使用互斥鎖:

go複制代碼// Get returns the value for the given key, ie: (value, true).
// If the value does not exist it returns (nil, false)
func (c *Context) Get(key string) (value any, exists bool) {
    // 擷取鎖
   c.mu.RLock()
    // 讀取 key
   value, exists = c.Keys[key]
    // 釋放鎖
   c.mu.RUnlock()
   return
}
           
可能會有人覺得奇怪,為什麼從 map 中讀也還需要鎖。這是因為,如果讀的時候沒有鎖保護, 那麼就有可能在 Set 設定的過程中,同時也在進行讀操作,這樣就會 panic 了。

這個例子想要說明的是,像 map 這種資料結構本身就不支援并發讀寫,我們這種情況下隻有使用 Mutex 了。

sync.Pool 中的 pinSlow 方法

在 sync.Pool 的實作中,有一個全局變量記錄了程序内所有的 sync.Pool 對象,那就是 allPools 變量, 另外有一個鎖 allPoolsMu 用來保護對 allPools 的讀寫操作:

go複制代碼var (
   // 保護 allPools 和 oldPools 的互斥鎖。
   allPoolsMu Mutex

   // allPools is the set of pools that have non-empty primary
   // caches. Protected by either 1) allPoolsMu and pinning or 2)
   // STW.
   allPools []*Pool

   // oldPools is the set of pools that may have non-empty victim
   // caches. Protected by STW.
   oldPools []*Pool
)
           

pinSlow 方法中會在 allPoolsMu 的保護下對 allPools 做讀寫操作:

go複制代碼func (p *Pool) pinSlow() (*poolLocal, int) {
   // Retry under the mutex.
   // Can not lock the mutex while pinned.
   runtime_procUnpin()
   allPoolsMu.Lock() // 擷取鎖
   defer allPoolsMu.Unlock() // 函數傳回的時候釋放鎖
   pid := runtime_procPin()
   // poolCleanup won't be called while we are pinned.
   s := p.localSize
   l := p.local
   if uintptr(pid) < s {
      return indexLocal(l, pid), pid
   }
   if p.local == nil {
      allPools = append(allPools, p) // 全局變量修改
   }
   // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
   size := runtime.GOMAXPROCS(0)
   local := make([]poolLocal, size)
   atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
   runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
   return &local[pid], pid
}
           

這個例子主要是為了說明使用 mu 的另外一種非常常見的場景:并發讀寫全局變量。

互斥鎖使用的注意事項

互斥鎖如果使用不當,可能會導緻死鎖或者出現 panic 的情況,下面是一些常見的錯誤:

  1. 忘記使用 Unlock 釋放鎖。
  2. Lock 之後還沒 Unlock 之前又使用 Lock 擷取鎖。也就是重複上鎖,go 中的 Mutex 不可重入。
  3. 死鎖:位于臨界區内不同的兩個協程都想擷取對方持有的不同的鎖。
  4. 還沒 Lock 之前就 Unlock。這會導緻 panic,因為這是沒有任何意義的。
  5. 複制 Mutex,比如将 Mutex 作為參數傳遞。

對于第 1 點,我們往往可以使用 defer 關鍵字來做釋放鎖的操作。第 2 點不太好發現,隻能在開發的時候多加注意。 第 3 點我們在使用鎖的時候可以考慮盡量避免在臨界區内再去使用别的鎖。 最後,Mutex 是不可以複制的,這個可以在編譯之前通過 go vet 來做檢查。

為什麼 Mutex 不能被複制呢?因為 Mutex 中包含了鎖的狀态,如果複制了,那麼這個狀态也會被複制, 如果在複制前進行 Lock,複制後進行 Unlock,那就意味着 Lock 和 Unlock 操作的其實是兩個不同的狀态, 這樣顯然是不行的,是釋放不了鎖的。

雖然不可以複制,但是我們可以通過傳遞指針類型的參數來傳遞 Mutex。

互斥鎖鎖定的是什麼?

在前一篇文章中,我們提到過,原子操作本質上是變量級的互斥鎖。而互斥鎖本身鎖定的又是什麼呢? 其實互斥鎖本質上是一個信号量,它通過擷取釋放信号量,最終使得協程獲得某一個代碼塊的執行權力。

也就是說,互斥鎖,鎖定的是一塊代碼塊。

我們以 go-zero 裡面的 collection/fifo.go 為例子說明一下:

go複制代碼// Take takes the first element out of q if not empty.
func (q *Queue) Take() (any, bool) {
   // 擷取互斥鎖(隻能有一個協程擷取到鎖)
   q.lock.Lock()
   // 函數傳回的時候釋放互斥鎖(擷取到鎖的協程釋放鎖之後,其他協程才能進行搶占鎖)
   defer q.lock.Unlock()

   // 下面的代碼隻有搶占到(也就是互斥鎖鎖定的代碼塊)
   if q.count == 0 {
      return nil, false
   }

   element := q.elements[q.head]
   q.head = (q.head + 1) % len(q.elements)
   q.count--

   return element, true
}
           

除了鎖定代碼塊的這一個作用,有另外一個比較關鍵的地方也是我們不能忽視的, 那就是 互斥鎖并不保證臨界區内操作的變量不能被其他協程通路。 互斥鎖隻能保證一段代碼隻能一個協程執行,但是對于臨界區内涉及的共享資源, 你在臨界區外也依然是可以對其進行讀寫的。

我們以上面的代碼說明一下:在上面的 Take 函數中,我們對 q.head 和 q.count 都進行了操作, 雖然這些操作代碼位于臨界區内,但是臨界區并不保證持有鎖期間其他協程不會在臨界區外去修改 q.head 和 q.count。

下面就是一個非常典型的錯誤的例子:

go複制代碼import (
   "fmt"
   "sync"
   "testing"
)

var mu sync.Mutex
var sum int

// 在鎖的保護下對 sum 做讀寫操作
func test() {
   mu.Lock()
   sum++
   mu.Unlock()
}

func TestMutex(t *testing.T) {
   var wg sync.WaitGroup
   wg.Add(1000)

   for i := 0; i < 500; i++ {
      go func() {
         test()
         wg.Done()
      }()

      // 位于臨界區外,也依然是可以對 sum 做讀寫操作的。
      sum++
   }

   wg.Wait()

   fmt.Println(sum)
}
           
靠譜的做法是,對于有共享資源的讀寫的操作都使用 Mutex 保護起來。

當然,如果我們隻有一個變量,那麼可能使用原子操作就足夠了。

互斥鎖實作原理

互斥鎖的實作有以下幾個關鍵的地方:

  • 信号量:這是作業系統中的同步對象。
  • 等待隊列:擷取不到互斥鎖的協程,會放入到一個先入先出隊列的隊列尾部。這樣信号量釋放的時候,可以依次對它們喚醒。
  • 原子操作:互斥鎖的實作中,使用了一個字段來記錄了幾種不同的狀态,使用原子操作可以保證幾種狀态可以一次性變更完成。

我們先來看看 Mutex結構體定義:

go複制代碼type Mutex struct {
   state int32 // 狀态字段
   sema  uint32 // 信号量
}
           

其中 state 字段記錄了四種不同的資訊:

深入了解go語言中的Mutext

這四種不同資訊在源碼中定義了不同的常量:

go複制代碼const (
   mutexLocked      = 1 << iota // 表示有 goroutine 擁有鎖
   mutexWoken                   // 喚醒(就是第 2 位)
   mutexStarving                // 饑餓(第 3 位)
   mutexWaiterShift = iota      // 表示第 4 位開始,表示等待者的數量

   starvationThresholdNs = 1e6  // 1ms 進入饑餓模式的等待時間門檻值
)
           

而 sema 的含義比較簡單,就是一個用作不同 goroutine 同步的信号量。

信号量

go 的 Mutex 是基于信号量來實作的,那信号量又是什麼呢?

維基百科:信号量是一個同步對象,用于保持在 0 至指定最大值之間的一個計數值。當線程完成一次對該 semaphore 對象的等待(wait)時,該計數值減一;當線程完成一次對 semaphore 對象的釋放(release)時,計數值加一。

上面這個解釋有點難懂,通俗地說,就是一個數字,調用 wait 的時候,這個數字減去 1,調用 release 的時候,這個數字加上 1。 (還有一個隐含的邏輯是,如果這個數小于 0,那麼調用 wait 的時候會阻塞,直到它大于 0。)

對應到 go 的 Mutex 中,有兩個操作信号量的函數:

  • runtime_Semrelease: 自動遞增信号量并通知等待的 goroutine。
  • runtime_SemacquireMutex: 是一直等到信号量大于 0,然後自動遞減。

我們注意到了,其實 runtime_SemacquireMutex 是有一個前提條件的,那就是等到信号量大于 0。 其實信号量的兩個操作 P/V 就是一個加 1 一個減 1,是以在實際使用的時候,也是需要一個擷取鎖的操作對應一個釋放鎖的操作, 否則,其他協程都無法擷取到鎖,因為信号量一直不滿足。

等待隊列

go 中如果已經有 goroutine 持有互斥鎖,那麼其他的協程會放入一個 FIFO 隊列中,如下圖:

深入了解go語言中的Mutext

說明:

  • G1 表示持有互斥鎖的 goroutine,G2...Gn 表示一個 goroutine 的等待隊列,這是一個先入先出的隊列。
  • G1 先持有鎖,得以進入臨界區,其他想搶占鎖的 goroutine 阻塞在 Lock 調用處。
  • G1 在使用完鎖後,會使用 Unlock 來釋放鎖,本質上是釋放了信号量,然後會喚醒 FIFO 隊列頭部的 goroutine。
  • G2 從 FIFO 隊列中移除,進入臨界區。G2 使用完鎖之後也會使用 Unlock 來釋放鎖。
上面隻是一個大概模型,在實際實作中,比這個複雜很多倍,下面會繼續深入講解。

原子操作

go 的 Mutex 實作中,state 字段是一個 32 位的整數,不同的位記錄了四種不同資訊,在這種情況下, 隻需要通過原子操作就可以保證一次性實作對四種不同狀态資訊的更改,而不需要更多額外的同步機制。

但是毋庸置疑,這種實作會大大降低代碼的可讀性,因為通過一個整數來記錄不同的資訊, 就意味着,需要通過各種位運算來實作對這個整數不同位的修改,比如将上鎖的操作:

go複制代碼new |= mutexLocked
           

當然,這隻是 Mutex 實作中最簡單的一種位運算了。下面以 state 記錄的四種不同資訊為次元來具體講解一下:

  • mutexLocked:這是 state 的最低位,1 表示鎖被占用,0 表示鎖沒有被占用。 new := mutexLocked 新狀态為上鎖狀态
  • mutexWoken: 這是表示是否有協程被喚醒了的狀态 new = (old - 1<<mutexWaiterShift) | mutexWoken 等待者數量減去 1 的同時,設定喚醒辨別 new &^= mutexWoken 清除喚醒辨別
  • mutexStarving:饑餓模式的辨別 new |= mutexStarving 設定饑餓辨別
  • 等待者數量:state >> mutexWaiterShift 就是等待者的數量,也就是上面提到的 FIFO 隊列中 goroutine 的數量 new += 1 << mutexWaiterShift 等待者數量加 1 delta := int32(mutexLocked - 1<<mutexWaiterShift) 上鎖的同時,将等待者數量減 1
這裡并沒有涵蓋 Mutex 中所有的位運算,其他操作在下文講解源碼實作的時候會提到。

在上面做了這一系列的位運算之後,我們會得到一個新的 state 狀态,假設名為 new,那麼我們就可以通過 CAS 操作來将 Mutex 的 state 字段更新:

go複制代碼atomic.CompareAndSwapInt32(&m.state, old, new)
           

通過上面這個原子操作,我們就可以一次性地更新 Mutex 的 state 字段,也就是一次性更新了四種狀态資訊。

這種通過一個整數記錄不同狀态的寫法在 sync 包其他的一些地方也有用到,比如 WaitGroup 中的 state 字段。

最後,對于這種操作,我們需要注意的是,因為我們在執行 CAS 前後是沒有其他什麼鎖或者其他的保護機制的, 這也就意味着上面的這個 CAS 操作是有可能會失敗的,那如果失敗了怎麼辦呢?

如果失敗了,也就意味着肯定有另外一個 goroutine 率先執行了 CAS 操作并且成功了,将 state 修改為了一個新的值。 這個時候,其實我們前面做的一系列位運算得到的結果實際上已經不對了,在這種情況下,我們需要擷取最新的 state,然後再次計算得到一個新的 state。

是以我們會在源碼裡面看到 CAS 操作是寫在 for 循環裡面的。

Mutex 的公平性

在前面,我們提到 goroutien 擷取不到鎖的時候,會進入一個 FIFO 隊列的隊列尾,在實際實作中,其實沒有那麼簡單, 為了獲得更好的性能,在實作的時候會盡量先讓運作狀态的 goroutine 獲得鎖,當然如果隊列中的 goroutine 等待太久(大于 1ms), 那麼就會先讓隊列中的 goroutine 獲得鎖。

下面是文檔中的說明:

Mutex 可以處于兩種操作模式:正常模式和饑餓模式。在正常模式下,等待者按照FIFO(先進先出)的順序排隊,但是被喚醒的等待者不擁有互斥鎖,會與新到達的 Goroutine 競争所有權。新到達的 Goroutine 有優勢——它們已經在 CPU 上運作,數量可能很多,是以被喚醒的等待者有很大的機會失去鎖。在這種情況下,它将排在等待隊列的前面。如果等待者未能在1毫秒内擷取到互斥鎖,則将互斥鎖切換到饑餓模式。 在饑餓模式下,互斥鎖的所有權直接從解鎖 Goroutine 移交給隊列前面的等待者。新到達的 Goroutine 即使看起來未被鎖定,也不會嘗試擷取互斥鎖,也不會嘗試自旋。相反,它們會将自己排隊在等待隊列的末尾。如果等待者獲得互斥鎖的所有權并發現(1)它是隊列中的最後一個等待者,或者(2)它等待時間少于1毫秒,則将互斥鎖切換回正常模式。 正常模式的性能要優于饑餓模式,因為 Goroutine 可以連續多次擷取互斥鎖,即使有被阻塞的等待者。饑餓模式很重要,可以防止尾部延遲的病态情況。

簡單總結:

  • Mutex 有兩種模式:正常模式、饑餓模式。
  • 正常模式下: 被喚醒的 goroutine 和正在運作的 goroutine 競争鎖。這樣可以運作中的協程有機會先擷取到鎖,進而避免了協程切換的開銷。性能更好。
  • 饑餓模式下: 優先讓隊列中的 goroutine 獲得鎖,并且直接放棄時間片,讓給隊列中的 goroutine,運作中的 goroutine 想擷取鎖要到隊尾排隊。更加公平。

Mutex 源碼剖析

Mutex 本身的源碼其實很少,但是複雜程度是非常高的,是以第一次看的時候可能會非常懵逼,但是不妨礙我們去了解它的大概實作原理。

Mutex 中主要有兩個方法,Lock 和 Unlock,使用起來非常的簡單,但是它的實作可不簡單。下面我們就來深入了解一下它的實作。

Lock

Lock 方法的實作如下:

go複制代碼// Lock 擷取鎖。
// 如果鎖已在使用中,則調用 goroutine 将阻塞,直到互斥量可用。
func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   // 上鎖成功則直接傳回
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      return
   }

   // Slow path (outlined so that the fast path can be inlined)
   // 沒有上鎖成功,這個時候需要做的事情就有點多了。
   m.lockSlow()
}
           

在 Lock 方法中,第一次擷取鎖的時候是非常簡單的,一個簡單的原子操作設定一下 mutexLocked 辨別就完成了。 但是如果這個原子操作失敗了,表示有其他 goroutine 先擷取到了鎖,這個時候就需要調用 lockSlow 來做一些額外的操作了:

go複制代碼// 擷取 mutex 鎖
func (m *Mutex) lockSlow() {
   var waitStartTime int64 // 目前協程開始等待的時間
   starving := false       // 目前協程是否是饑餓模式
   awoke := false          // 喚醒标志(是否目前協程就是被喚醒的協程)
   iter := 0               // 自旋次數(超過一定次數如果還沒能獲得鎖,就進入等待)
   old := m.state          // 舊的狀态,每次 for 循環會重新擷取目前的狀态字段

   for {
      // 自旋:目的是讓正在運作中的 goroutine 盡快擷取到鎖。
      // 兩種情況不會自旋:
      // 1. 饑餓模式:在饑餓模式下,鎖會直接交給等待隊列中的 goroutine,是以不會自旋。
      // 2. 鎖被釋放了:另外如果運作到這裡的時候,發現鎖已經被釋放了,也就不需要自旋了。
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         // 設定 mutexWoken 辨別
         // 如果自旋是有意義的,則會進入到這裡,嘗試設定 mutexWoken 辨別。
         // 設定成功在持有鎖的 goroutine 擷取鎖的時候不會喚醒等待隊列中的 goroutine,下一個擷取鎖的就是目前 goroutine。
         if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            // 各個判斷的含義:
            // !awoke 已經被喚醒過一次了,說明目前協程是被從等待隊列中喚醒的協程/又或者已經成功設定 mutexWoken 辨別了,不需要再喚醒了。
            // old&mutexWoken == 0 如果不等于 0 說明有 goroutine 被喚醒了,不會嘗試設定 mutexWoken 辨別
            // old>>mutexWaiterShift != 0 如果等待隊列為空,目前 goroutine 就是下一個搶占鎖的 goroutine
            // 前面的判斷都通過了,才會進行 CAS 操作嘗試設定 mutexWoken 辨別
            awoke = true
         }
         runtime_doSpin() // 自旋
         iter++           // 自旋次數 +1(超過一定次數會停止自旋)
         old = m.state    // 再次擷取鎖的最新狀态,之後會檢查是否鎖被釋放了
         continue         // 繼續下一次檢查
      }

      new := old
      // 饑餓模式下,新到達的 goroutines 必須排隊。
      // 不是饑餓狀态,直接競争鎖。
      if old&mutexStarving == 0 {
         new |= mutexLocked
      }
      // 進入等待隊列的兩種情況:
      // 1. 鎖依然被占用。
      // 2. 進入了饑餓模式。
      if old&(mutexLocked|mutexStarving) != 0 {
         new += 1 << mutexWaiterShift // 等待者數量 +1
      }
       // 已經等待超過了 1ms,且鎖被其他協程占用,則進入饑餓模式
      if starving && old&mutexLocked != 0 {
         new |= mutexStarving
      }
      // 喚醒之後,需要重置喚醒标志。
      // 不管有沒有擷取到鎖,都是要清除這個辨別的:
      // 擷取到鎖肯定要清除,如果擷取到鎖,需要讓其他運作中的 goroutine 來搶占鎖;
      // 如果沒有擷取到鎖,goroutine 會阻塞,這個時候是需要持有鎖的 goroutine 來喚醒的,如果有 mutexWoken 辨別,持有鎖的 goroutine 喚醒不了。
      if awoke {
         if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
         }
         new &^= mutexWoken // 重置喚醒标志
      }

      // 成功設定新狀态
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
         // 原來鎖的狀态已釋放,并且不是饑餓狀态,正常請求到了鎖,傳回
         if old&(mutexLocked|mutexStarving) == 0 { // 這意味着目前的 goroutine 成功擷取了鎖
            break
         }

         // 如果已經被喚醒過,會被加入到等待隊列頭。
         queueLifo := waitStartTime != 0
         if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
         }
         // 阻塞等待
         // queueLifo 為 true,表示加入到隊列頭。否則,加入到隊列尾。
         // (首次加入隊列加入到隊尾,不是首次加入則加入隊頭,這樣等待最久的 goroutine 優先能夠擷取到鎖。)
         runtime_SemacquireMutex(&m.sema, queueLifo, 1)
         // 從等待隊列中喚醒,檢查鎖是否應該進入饑餓模式。
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

         // 擷取目前的鎖最新狀态
         old = m.state
         // 如果鎖已經處于饑餓狀态,直接搶到鎖,傳回。
         // 饑餓模式下,被喚醒的協程可以直接擷取到鎖。
         // 新來的 goroutine 都需要進入隊列等待。
         if old&mutexStarving != 0 {
            // 如果這個 goroutine 被喚醒并且 Mutex 處于饑餓模式,P 的所有權已經移交給我們,
            // 但 Mutex 處于不一緻的狀态:mutexLocked 未設定,我們仍然被視為等待者。修複這個問題。
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            // 加鎖,并且減少等待者數量。
            // 實際上是兩步操作合成了一步:
            // 1. m.state = m.state + 1 (擷取鎖)
            // 2. m.state = m.state - 1<<mutexWaiterShift(waiter - 1)
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            // 清除饑餓狀态的兩種情況:
            // 1. 如果不需要進入饑餓模式(目前被喚醒的 goroutine 的等待時間小于 1ms)
            // 2. 原來的等待者數量為 1,說明是最後一個被喚醒的 goroutine。
            if !starving || old>>mutexWaiterShift == 1 {
               // 退出饑餓模式
               delta -= mutexStarving
            }
            // 原子操作,設定新狀态。
            atomic.AddInt32(&m.state, delta)
            break
         }
         // 設定喚醒标記,重新搶占鎖(會與那些運作中的 goroutine 一起競争鎖)
         awoke = true
         iter = 0
      } else {
         // CAS 更新狀态失敗,擷取最新狀态,然後重試
         old = m.state
      }
   }
}
           

我們可以看到,lockSlow 的處理非常的複雜,又要考慮讓運作中的 goroutine 盡快擷取到鎖,又要考慮不能讓等待隊列中的 goroutine 等待太久。

代碼中注釋很多,再簡單總結一下其中的流程:

  1. 為了讓循環中的 goroutine 可以先擷取到鎖,會先讓 goroutine 自旋等待鎖的釋放,這是因為運作中的 goroutine 正在占用 CPU,讓它先擷取到鎖可以避免一些不必要的協程切換,進而獲得更好的性能。
  2. 自旋完畢之後,會嘗試擷取鎖,同時也要根據舊的鎖狀态來更新鎖的不同狀态資訊,比如是否進入饑餓模式等。
  3. 計算得到一個新的 state 後,會進行 CAS 操作嘗試更新 state 狀态。
  4. CAS 失敗會重試上面的流程。
  5. CAS 成功之後會做如下操作:
  • 判斷目前是否已經擷取到鎖,如果是,則傳回,Lock 成功了。
  • 會判斷目前的 goroutine 是否是已經被喚醒過,如果是,會将目前 goroutine 加入到等待隊列頭部。
  • 調用 runtime_SemacquireMutex,進入阻塞狀态,等待下一次喚醒。
  • 喚醒之後,判斷是否需要進入饑餓模式。
  • 最後,如果已經是饑餓模式,目前 goroutine 直接擷取到鎖,退出循環,否則,再進行下一次搶占鎖的循環中。

具體流程我們可以參考一下下面的流程圖:

深入了解go語言中的Mutext
圖中有一些矩形方框描述了 unlockSlow 的關鍵流程。

Unlock

Unlock 方法的實作如下:

go複制代碼// Unlock 釋放互斥鎖。
// 如果 m 在進入 Unlock 時未被鎖定,則會出現運作時錯誤。
func (m *Mutex) Unlock() {
   // Fast path: drop lock bit.
   // unlock 成功
   // unLock 操作實際上是将 state 減去 1。
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 { // 等待隊列為空的時候直接傳回了
      // 喚醒一個等待鎖的 goroutine
      m.unlockSlow(new)
   }
}
           

Unlock 做了兩件事:

  1. 釋放目前 goroutine 持有的互斥鎖:也就是将 state 減去 1
  2. 喚醒等待隊列中的下一個 goroutine

如果隻有一個 goroutine 在使用鎖,隻需要簡單地釋放鎖就可以了。 但是如果有其他的 goroutine 在阻塞等待,那麼持有互斥鎖的 goroutine 就有義務去喚醒下一個 goroutine。

喚醒的流程相對複雜一些:

go複制代碼// unlockSlow 喚醒下一個等待鎖的協程。
func (m *Mutex) unlockSlow(new int32) {
   // 如果未加鎖,則會抛出錯誤。
   if (new+mutexLocked)&mutexLocked == 0 {
      fatal("sync: unlock of unlocked mutex")
   }

   // 下面的操作是喚醒一個在等待鎖的協程。
   // 存在兩種情況:
   // 1. 正常模式:
   //  a. 不需要喚醒:沒有等待者、鎖已經被搶占、有其他運作中的協程在嘗試擷取鎖、已經進入了饑餓模式
   //   b. 需要喚醒:其他情況
   // 2. 饑餓模式:喚醒等待隊列頭部的那個協程
   if new&mutexStarving == 0 {
      // 不是饑餓模式
      old := new
      // 自旋
      for {
         // 下面幾種情況不需要喚醒:
         // 1. 沒有等待者了(沒得喚醒)
         // 2. 鎖已經被占用(隻能有一個 goroutine 持有鎖)
         // 3. 有其他運作中的協程已經被喚醒(運作中的 goroutine 通過自旋先搶占到了鎖)
         // 4. 饑餓模式(饑餓模式下,所有新的 goroutine 都要排隊,饑餓模式會直接喚醒等待隊列頭部的 gorutine)
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 擷取到喚醒等待者的權力,開始喚醒一個等待者。
         // 下面這一行實際上是兩個操作:
         // 1. waiter 數量 - 1
         // 2. 設定 mutexWoken 标志
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 正常模式下喚醒了一個 goroutine
            //(第二個參數為 false,表示目前的 goroutine 在釋放信号量後還會繼續執行直到用完時間片)
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         // 喚醒失敗,進行下一次嘗試。
         old = m.state
      }
   } else {
      // 饑餓模式:将互斥鎖的所有權移交給下一個等待者,并放棄我們的時間片,以便下一個等待者可以立即開始運作。
      // 注意:如果“mutexLocked”未設定,等待者在喚醒後會将其設定。
      // 但是,如果設定了“mutexStarving”,則仍然認為互斥鎖已被鎖定,是以新到來的goroutine不會擷取它。
      //
      // 目前的 goroutine 放棄 CPU 時間片,讓給阻塞在 sema 的 goroutine。
      runtime_Semrelease(&m.sema, true, 1)
   }
}
           

unlockSlow 邏輯相比 lockSlow 要簡單許多,我們可以再結合下面的流程圖來閱讀上面的源碼:

深入了解go語言中的Mutext

runtime_Semrelease 第二個參數的含義

細心的朋友可能注意到了,在 unlockSlow 的實作中,有兩處地方調用了 runtime_Semrelease 這個方法, 這個方法的作用是釋放一個信号量,這樣可以讓阻塞在信号量上的 goroutine 得以繼續執行。 它的第一個參數我們都知道,是信号量,而第二個參數 true 和 false 分别傳遞了一次, 那麼 true 和 false 分别有什麼作用呢?

答案是,設定為 true 的時候,目前的 goroutine 會直接放棄自己的時間片, 将 P 的使用權交給 Mutex 等待隊列中的第一個 goroutine, 這樣的目的是,讓 Mutex 等待隊列中的 goroutine 可以盡快地擷取到鎖。

總結

互斥鎖在并發程式設計中也算是非常常見的一種操作了,使用互斥鎖可以限制隻有一個 goroutine 可以進入臨界區, 這對于并發修改全局變量、初始化等情況非常好用。最後,再總結一下本文所講述的内容:

  • 互斥鎖是一種用于多線程程式設計中,防止兩個線程同時對同一公共資源進行讀寫的機制。go 中的互斥鎖實作是 sync.Mutex。
  • Mutex 的操作隻有兩個: Lock 擷取鎖,同一時刻隻能有一個 goroutine 可以擷取到鎖,其他 goroutine 會先通過自旋搶占鎖,搶不到則阻塞等待。 Unlock 釋放鎖,釋放鎖之前必須有 goroutine 持有鎖。釋放鎖之後,會喚醒等待隊列中的下一個 goroutine。
  • Mutex 常見的使用場景有兩個: 并發讀寫 map:如 gin 中 Context 的 Keys 屬性的讀寫。 并發讀寫全局變量:如 sync.Pool 中對 allPools 的讀寫。
  • 使用 Mutex 需要注意以下幾點: 不要忘記使用 Unlock 釋放鎖 Lock 之後,沒有釋放鎖之前,不能再次使用 Lock 注意不同 goroutine 競争不同鎖的情況,需要考慮一下是否有可能會死鎖 在 Unlock 之前,必須已經調用了 Lock,否則會 panic 在第一次使用 Mutex 之後,不能複制,因為這樣一來 Mutex 的狀态也會被複制。這個可以使用 go vet 來檢查。
  • 互斥鎖可以保護一塊代碼塊隻能有一個 goroutine 執行,但是不保證臨界區内操作的變量不被其他 goroutine 做并發讀寫操作。
  • go 的 Mutex 基于以下技術實作: 信号量:這是作業系統層面的同步機制 隊列:在 goroutine 擷取不到鎖的時候,會将這些 goroutine 放入一個 FIFO 隊列中,下次喚醒會喚醒隊列頭的 goroutine 原子操作:state 字段記錄了四種不同的資訊,通過原子操作就可以保證資料的完整性
  • go Mutex 的公平性: 正在運作的 goroutine 如果需要鎖的話,盡量讓它先擷取到鎖,可以避免不必要的協程上下文切換。會和被喚醒的 goroutine 一起競争鎖。 但是如果等待隊列中的 goroutine 超過了 1ms 還沒有擷取到鎖,那麼會進入饑餓模式
  • go Mutex 的兩種模式: 正常模式:運作中的 goroutine 有一定機會比等待隊列中的 goroutine 先擷取到鎖,這種模式有更好的性能。 饑餓模式:所有後來的 goroutine 都直接進入等待隊列,會依次從等待隊列頭喚醒 goroutine。可以有效避免尾延遲。
  • 饑餓模式下,Unlock 的時候會直接将目前 goroutine 所在 P 的使用權交給等待隊列頭部的 goroutine,放棄原本屬于自己的時間片。

作者:eleven26

連結:https://juejin.cn/post/7216223889488560184

來源:稀土掘金

著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

繼續閱讀