天天看點

golang 系列: mutex 講解

摘要

Go 号稱是為了高并發而生的,在高并發場景下,勢必會涉及到對公共資源的競争。當對應場景發生時,我們經常會使用 mutex 的 Lock() 和 Unlock() 方法來占有或釋放資源。雖然調用簡單,但 mutex 的内部卻涉及挺多的。今天,就讓我們好好研究一下。

mutex 初步認識

mutex 的源碼主要是在

src/sync/mutex.go

檔案裡,它的結構體比較簡單,如下:

type Mutex struct {
    state int32
    sema  uint32
}           

我們可以看到有一個字段 sema,它表示信号量标記位。所謂的信号量是用于 Goroutine 之間阻塞或喚醒的。這有點像作業系統裡的 PV 原語操作,我們先來認識下 PV 原語操作:

PV 原語解釋:

通過操作信号量 S 來處理程序間的同步與互斥的問題。

S>0:表示有 S 個資源可用;S=0 表示無資源可用;S<0 絕對值表示等待隊列或連結清單中的程序個數。信号量 S 的初值應大于等于 0。

P 原語:表示申請一個資源,對 S 原子性的減 1,若 減 1 後仍 S>=0,則該程序繼續執行;若 減 1 後 S<0,表示已無資源可用,需要将自己阻塞起來,放到等待隊列上。

V 原語:表示釋放一個資源,對 S 原子性的加 1;若 加 1 後 S>0,則該程序繼續執行;若 加 1 後 S<=0,表示等待隊列上有等待程序,需要将第一個等待的程序喚醒。

通過上面的解釋,mutex 就可以利用信号量來實作 goroutine 的阻塞和喚起了。

其實 mutex 本質上就是一個關于信号量的阻塞喚起操作。

當 goroutine 不能占有鎖資源的時候會被阻塞挂起,此時不能繼續執行後面的代碼邏輯。

當 mutex 釋放鎖資源時,則會繼續喚起之前的 goroutine 去搶占鎖資源。

至于 mutex 的 state 狀态字段則是用來做狀态流轉的,這些狀态值涉及到了一些概念,下面我們具體來解釋一番。

mutex 狀态标志位

mutex 的 state 有 32 位,它的低 3 位分别表示 3 種狀态:喚醒狀态、上鎖狀态、饑餓狀态,剩下的位數則表示目前阻塞等待的 goroutine 數量。

mutex 會根據目前的 state 狀态來進入正常模式、饑餓模式或者是自旋。

mutex 正常模式

當 mutex 調用 Unlock() 方法釋放鎖資源時,如果發現有等待喚起的 Goroutine 隊列時,則會将隊頭的 Goroutine 喚起。

隊頭的 goroutine 被喚起後,會調用 CAS 方法去嘗試性的修改 state 狀态,如果修改成功,則表示占有鎖資源成功。

(注:CAS 在 Go 裡用 atomic.CompareAndSwapInt32(addr *int32, old, new int32) 方法實作,CAS 類似于樂觀鎖作用,修改前會先判斷位址值是否還是 old 值,隻有還是 old 值,才會繼續修改成 new 值,否則會傳回 false 表示修改失敗。)

mutex 饑餓模式

由于上面的 Goroutine 喚起後并不是直接的占用資源,還需要調用 CAS 方法去嘗試性占有鎖資源。如果此時有新來的 Goroutine,那麼它也會調用 CAS 方法去嘗試性的占有資源。

但對于 Go 的排程機制來講,會比較偏向于 CPU 占有時間較短的 Goroutine 先運作,而這将造成一定的幾率讓新來的 Goroutine 一直擷取到鎖資源,此時隊頭的 Goroutine 将一直占用不到,導緻餓死。

針對這種情況,Go 采用了饑餓模式。即通過判斷隊頭 Goroutine 在超過一定時間後還是得不到資源時,會在 Unlock 釋放鎖資源時,直接将鎖資源交給隊頭 Goroutine,并且将目前狀态改為饑餓模式。

後面如果有新來的 Goroutine 發現是饑餓模式時, 則會直接添加到等待隊列的隊尾。

mutex 自旋

如果 Goroutine 占用鎖資源的時間比較短,那麼每次都調用信号量來阻塞喚起 goroutine,将會很浪費資源。

是以在符合一定條件後,mutex 會讓目前的 Goroutine 去空轉 CPU,在空轉完後再次調用 CAS 方法去嘗試性的占有鎖資源,直到不滿足自旋條件,則最終會加入到等待隊列裡。

自旋的條件如下:

  • 還沒自旋超過 4 次
  • 多核處理器
  • GOMAXPROCS > 1
  • p 上本地 Goroutine 隊列為空

可以看出,自旋條件還是比較嚴格的,畢竟這會消耗 CPU 的運算能力。

mutex 的 Lock() 過程

首先,如果 mutex 的 state = 0,即沒有誰在占有資源,也沒有阻塞等待喚起的 goroutine。則會調用 CAS 方法去嘗試性占有鎖,不做其他動作。

如果不符合 m.state = 0,則進一步判斷是否需要自旋。

當不需要自旋又或者自旋後還是得不到資源時,此時會調用 runtime_SemacquireMutex 信号量函數,将目前的 goroutine 阻塞并加入等待喚起隊列裡。

當有鎖資源釋放,mutex 在喚起了隊頭的 goroutine 後,隊頭 goroutine 會嘗試性的占有鎖資源,而此時也有可能會和新到來的 goroutine 一起競争。

當隊頭 goroutine 一直得不到資源時,則會進入饑餓模式,直接将鎖資源交給隊頭 goroutine,讓新來的 goroutine 阻塞并加入到等待隊列的隊尾裡。

對于饑餓模式将會持續到沒有阻塞等待喚起的 goroutine 隊列時,才會解除。

Unlock 過程

mutex 的 Unlock() 則相對簡單。同樣的,會先進行快速的解鎖,即沒有等待喚起的 goroutine,則不需要繼續做其他動作。

如果目前是正常模式,則簡單的喚起隊頭 Goroutine。如果是饑餓模式,則會直接将鎖交給隊頭 Goroutine,然後喚起隊頭 Goroutine,讓它繼續運作。

mutex 代碼詳解

好了,上面大體流程講完了,下面将會把詳細的代碼流程呈上,讓大家能更詳細的知道 mutex 的 Lock()、Unlock() 方法邏輯。

mutex Lock() 代碼詳解:

// Lock mutex 的鎖方法。
func (m *Mutex) Lock() {
    // 快速上鎖.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // 快速上鎖失敗,将進行操作較多的上鎖動作。
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
  var waitStartTime int64  // 記錄目前 goroutine 的等待時間
  starving := false // 是否饑餓
  awoke := false // 是否被喚醒
  iter := 0 // 自旋次數
  old := m.state // 目前 mutex 的狀态
  for {
    // 目前 mutex 的狀态已上鎖,并且非饑餓模式,并且符合自旋條件
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      // 目前還沒設定過喚醒辨別
      if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
        awoke = true
      }
      runtime_doSpin()
      iter++
      old = m.state
      continue
    }
    new := old
    // 如果不是饑餓狀态,則嘗試上鎖
    // 如果是饑餓狀态,則不會上鎖,因為目前的 goroutine 将會被阻塞并添加到等待喚起隊列的隊尾
    if old&mutexStarving == 0 {
      new |= mutexLocked
    }
    // 等待隊列數量 + 1
    if old&(mutexLocked|mutexStarving) != 0 {
      new += 1 << mutexWaiterShift
    }
    // 如果 goroutine 之前是饑餓模式,則此次也設定為饑餓模式
    if starving && old&mutexLocked != 0 {
      new |= mutexStarving
    }
    //
    if awoke {
      // 如果狀态不符合預期,則報錯
      if new&mutexWoken == 0 {
        throw("sync: inconsistent mutex state")
      }
      // 新狀态值需要清除喚醒辨別,因為目前 goroutine 将會上鎖或者再次 sleep
      new &^= mutexWoken
    }
    // CAS 嘗試性修改狀态,修改成功則表示擷取到鎖資源
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
      // 非饑餓模式,并且未擷取過鎖,則說明此次的擷取鎖是 ok 的,直接 return
      if old&(mutexLocked|mutexStarving) == 0 {
        break
      }
      // 根據等待時間計算 queueLifo
      queueLifo := waitStartTime != 0
      if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
      }
      // 到這裡,表示未能上鎖成功
      // queueLife = true, 将會把 goroutine 放到等待隊列隊頭
      // queueLife = false, 将會把 goroutine 放到等待隊列隊尾
      runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 計算是否符合饑餓模式,即等待時間是否超過一定的時間
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      old = m.state
      // 上一次是饑餓模式
      if old&mutexStarving != 0 {
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
          throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        // 此次不是饑餓模式又或者下次沒有要喚起等待隊列的 goroutine 了
        if !starving || old>>mutexWaiterShift == 1 {
          delta -= mutexStarving
        }
        atomic.AddInt32(&m.state, delta)
        break
      }
      // 此處已不再是饑餓模式了,清除自旋次數,重新到 for 循環競争鎖。
      awoke = true
      iter = 0
    } else {
      old = m.state
    }
  }
​
  if race.Enabled {
    race.Acquire(unsafe.Pointer(m))
  }
}           

mutex Unlock() 代碼詳解:

// Unlock 對 mutex 解鎖.
// 如果沒有上過鎖,缺調用此方法解鎖,将會抛出運作時錯誤。
// 它将允許在不同的 Goroutine 上進行上鎖解鎖
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 快速嘗試解鎖
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // 快速解鎖失敗,将進行操作較多的解鎖動作。
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
  // 非上鎖狀态,直接抛出異常
  if (new+mutexLocked)&mutexLocked == 0 {
    throw("sync: unlock of unlocked mutex")
  }
  // 正常模式
  if new&mutexStarving == 0 {
    old := new
    for {
      // 沒有需要喚起的等待隊列
      if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
        return
      }
      // 喚起等待隊列并數量-1
      new = (old - 1<<mutexWaiterShift) | mutexWoken
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
        runtime_Semrelease(&m.sema, false, 1)
        return
      }
      old = m.state
    }
  } else {
    //饑餓模式,将鎖直接給等待隊列的隊頭 goroutine
    runtime_Semrelease(&m.sema, true, 1)
  }
}