針對Golang 1.9的sync.Mutex進行分析,與Golang 1.10基本一樣除了将 panic throw
改為了
之外其他的都一樣。
源代碼位置:
sync\mutex.go
。
可以看到注釋如下:
1Mutex can be in 2 modes of operations: normal and starvation.
2
3 In normal mode waiters are queued in FIFO order, but a woken up waiter does not own the mutex and competes with new arriving goroutines over the ownership. New arriving goroutines have an advantage -- they are already running on CPU and there can be lots of them, so a woken up waiter has good chances of losing. In such case it is queued at front of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, it switches mutex to the starvation mode.
4
5
6
7In starvation mode ownership of the mutex is directly handed off from the unlocking goroutine to the waiter at the front of the queue. New arriving goroutines don't try to acquire the mutex even if it appears to be unlocked, and don't try to spin. Instead they queue themselves at the tail of the wait queue.
8
9
10
11If a waiter receives ownership of the mutex and sees that either (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, it switches mutex back to normal operation mode.
12
13
14
15 Normal mode has considerably better performance as a goroutine can acquire a mutex several times in a row even if there are blocked waiters.
16
17Starvation mode is important to prevent pathological cases of tail latency.
部落客英文很爛,就粗略翻譯一下,僅供參考:
1互斥量可分為兩種操作模式:正常和饑餓。
2
3在正常模式下,等待的goroutines按照FIFO(先進先出)順序排隊,但是goroutine被喚醒之後并不能立即得到mutex鎖,它需要與新到達的goroutine争奪mutex鎖。
4
5因為新到達的goroutine已經在CPU上運作了,是以被喚醒的goroutine很大機率是争奪mutex鎖是失敗的。出現這樣的情況時候,被喚醒的goroutine需要排隊在隊列的前面。
6
7如果被喚醒的goroutine有超過1ms沒有擷取到mutex鎖,那麼它就會變為饑餓模式。
8
9在饑餓模式中,mutex鎖直接從解鎖的goroutine交給隊列前面的goroutine。新達到的goroutine也不會去争奪mutex鎖(即使沒有鎖,也不能去自旋),而是到等待隊列尾部排隊。
10
11在饑餓模式下,有一個goroutine擷取到mutex鎖了,如果它滿足下條件中的任意一個,mutex将會切換回去正常模式:
12
131. 是等待隊列中的最後一個goroutine
14
152. 它的等待時間不超過1ms。
16
17正常模式有更好的性能,因為goroutine可以連續多次獲得mutex鎖;
18
19饑餓模式對于預防隊列尾部goroutine一緻無法擷取mutex鎖的問題。
看了這段解釋,那麼基本的業務邏輯也就了解了,可以整理一下衣裝,準備看代碼。
打開
mutex.go
看到如下代碼:
1type Mutex struct {
2
3 state int32 // 将一個32位整數拆分為 目前阻塞的goroutine數(29位)|饑餓狀态(1位)|喚醒狀态(1位)|鎖狀态(1位) 的形式,來台灣字段設計
4
5 sema uint32 // 信号量
6
7}
8
9
10
11const (
12
13 mutexLocked = 1 << iota // 1 0001 含義:用最後一位表示目前對象鎖的狀态,0-未鎖住 1-已鎖住
14
15 mutexWoken // 2 0010 含義:用倒數第二位表示目前對象是否被喚醒 0-喚醒 1-未喚醒
16
17 mutexStarving // 4 0100 含義:用倒數第三位表示目前對象是否為饑餓模式,0為正常模式,1為饑餓模式。
18
19 mutexWaiterShift = iota // 3,從倒數第四位往前的bit位表示在排隊等待的goroutine數
20
21 starvationThresholdNs = 1e6 // 1ms
22
23)
可以看到Mutex中含有:
● 一個非負數信号量sema;
● state表示Mutex的狀态。
常量:
● mutexLocked表示鎖是否可用(0可用,1被别的goroutine占用)
● mutexWoken=2表示mutex是否被喚醒
● mutexWaiterShift=4表示統計阻塞在該mutex上的goroutine數目需要移位的數值。
将3個常量映射到state上就是
1state: |32|31|...| |3|2|1|
2
3 \__________/ | | |
4
5 | | | |
6
7 | | | mutex的占用狀态(1被占用,0可用)
8
9 | | |
10
11 | | mutex的目前goroutine是否被喚醒
12
13 | |
14
15 | 饑餓位,0正常,1饑餓
16
17 |
18
19 等待喚醒以嘗試鎖定的goroutine的計數,0表示沒有等待者
20
如果同學們熟悉Java的鎖,就會發現與AQS的設計是類似,隻是沒有AQS設計的那麼精緻,不得不感歎,
JAVA
的牛逼。
有同學是否會有疑問為什麼使用的是int32而不是int64呢,因為32位原子性操作更好,當然也滿足的需求。
Mutex在1.9版本中就兩個函數
Lock()
和
Unlock()
下面我們先來分析最難的
Lock()
函數:
1func (m *Mutex) Lock() {
2
3 // 如果m.state=0,說明目前的對象還沒有被鎖住,進行原子性指派操作設定為mutexLocked狀态,CompareAnSwapInt32傳回true
4
5 // 否則說明對象已被其他goroutine鎖住,不會進行原子指派操作設定,CopareAndSwapInt32傳回false
6
7 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
8
9 if race.Enabled {
10
11 race.Acquire(unsafe.Pointer(m))
12
13 }
14
15 return
16
17 }
18
19
20
21 // 開始等待時間戳
22
23 var waitStartTime int64
24
25 // 饑餓模式辨別
26
27 starving := false
28
29 // 喚醒辨別
30
31 awoke := false
32
33 // 自旋次數
34
35 iter := 0
36
37 // 儲存目前對象鎖狀态
38
39 old := m.state
40
41 // 看到這個for {}說明使用了cas算法
42
43 for {
44
45 // 相當于xxxx...x0xx & 0101 = 01,目前對象鎖被使用
46
47 if old&(mutexLocked|mutexStarving) == mutexLocked &&
48
49 // 判斷目前goroutine是否可以進入自旋鎖
50
51 runtime_canSpin(iter) {
52
53
54
55 // 主動旋轉是有意義的。試着設定mutexwake标志,告知解鎖,不要喚醒其他阻塞的goroutines。
56
57 if !awoke &&
58
59 // 再次确定是否被喚醒: xxxx...xx0x & 0010 = 0
60
61 old&mutexWoken == 0 &&
62
63 // 檢視是否有goroution在排隊
64
65 old>>mutexWaiterShift != 0 &&
66
67 // 将對象鎖改為喚醒狀态:xxxx...xx0x | 0010 = xxxx...xx1x
68
69 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
70
71 awoke = true
72
73 }//END_IF_Lock
74
75
76
77 // 進入自旋鎖後目前goroutine并不挂起,仍然在占用cpu資源,是以重試一定次數後,不會再進入自旋鎖邏輯
78
79 runtime_doSpin()
80
81 // 自加,表示自旋次數
82
83 iter++
84
85 // 儲存mutex對象即将被設定成的狀态
86
87 old = m.state
88
89 continue
90
91 }// END_IF_spin
92
93
94
95 // 以下代碼是不使用**自旋**的情況
96
97 new := old
98
99
100
101 // 不要試圖獲得饑餓的互斥,新來的goroutines必須排隊。
102
103 // 對象鎖饑餓位被改變,說明處于饑餓模式
104
105 // xxxx...x0xx & 0100 = 0xxxx...x0xx
106
107 if old&mutexStarving == 0 {
108
109 // xxxx...x0xx | 0001 = xxxx...x0x1,辨別對象鎖被鎖住
110
111 new |= mutexLocked
112
113 }
114
115 // xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;目前mutex處于饑餓模式并且鎖已被占用,新加入進來的goroutine放到隊列後面
116
117 if old&(mutexLocked|mutexStarving) != 0 {
118
119 // 更新阻塞goroutine的數量,表示mutex的等待goroutine數目加1
120
121 new += 1 << mutexWaiterShift
122
123 }
124
125
126
127 // 目前的goroutine将互斥鎖轉換為饑餓模式。但是,如果互斥鎖目前沒有解鎖,就不要打開開關,設定mutex狀态為饑餓模式。Unlock預期有饑餓的goroutine
128
129 if starving &&
130
131 // xxxx...xxx1 & 0001 != 0;鎖已經被占用
132
133 old&mutexLocked != 0 {
134
135 // xxxx...xxx | 0101 => xxxx...x1x1,辨別對象鎖被鎖住
136
137 new |= mutexStarving
138
139 }
140
141
142
143 // goroutine已經被喚醒,是以需要在兩種情況下重設标志
144
145 if awoke {
146
147 // xxxx...xx1x & 0010 = 0,如果喚醒标志為與awoke不相協調就panic
148
149 if new&mutexWoken == 0 {
150
151 panic("sync: inconsistent mutex state")
152
153 }
154
155 // new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :設定喚醒狀态位0,被喚醒
156
157 new &^= mutexWoken
158
159 }
160
161 // 擷取鎖成功
162
163 if atomic.CompareAndSwapInt32(&m.state, old, new) {
164
165 // xxxx...x0x0 & 0101 = 0,已經擷取對象鎖
166
167 if old&(mutexLocked|mutexStarving) == 0 {
168
169 // 結束cas
170
171 break
172
173 }
174
175 // 以下的操作都是為了判斷是否從饑餓模式中恢複為正常模式
176
177 // 判斷處于FIFO還是LIFO模式
178
179 queueLifo := waitStartTime != 0
180
181 if waitStartTime == 0 {
182
183 waitStartTime = runtime_nanotime()
184
185 }
186
187 runtime_SemacquireMutex(&m.sema, queueLifo)
188
189 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
190
191 old = m.state
192
193 // xxxx...x1xx & 0100 != 0
194
195 if old&mutexStarving != 0 {
196
197 // xxxx...xx11 & 0011 != 0
198
199 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
200
201 panic("sync: inconsistent mutex state")
202
203 }
204
205 delta := int32(mutexLocked - 1<<mutexWaiterShift)
206
207 if !starving || old>>mutexWaiterShift == 1 {
208
209 delta -= mutexStarving
210
211 }
212
213 atomic.AddInt32(&m.state, delta)
214
215 break
216
217 }
218
219 awoke = true
220
221 iter = 0
222
223 } else {
224
225 // 儲存mutex對象狀态
226
227 old = m.state
228
229 }
230
231 }// cas結束
232
233
234
235 if race.Enabled {
236
237 race.Acquire(unsafe.Pointer(m))
238
239 }
240
241}
看了
Lock()
函數之後是不是覺得一片懵逼狀态,告訴大家一個方法,看
Lock()
函數時候需要想着如何Unlock。下面就開始看看
Unlock()
函數。
1func (m *Mutex) Unlock() {
2
3 if race.Enabled {
4
5 _ = m.state
6
7 race.Release(unsafe.Pointer(m))
8
9 }
10
11
12
13 // state-1辨別解鎖
14
15 new := atomic.AddInt32(&m.state, -mutexLocked)
16
17 // 驗證鎖狀态是否符合
18
19 if (new+mutexLocked)&mutexLocked == 0 {
20
21 panic("sync: unlock of unlocked mutex")
22
23 }
24
25 // xxxx...x0xx & 0100 = 0 ;判斷是否處于正常模式
26
27 if new&mutexStarving == 0 {
28
29 old := new
30
31 for {
32
33 // 如果沒有等待的goroutine或goroutine已經解鎖完成
34
35 if old>>mutexWaiterShift == 0 ||
36
37 // xxxx...x0xx & (0001 | 0010 | 0100) => xxxx...x0xx & 0111 != 0
38
39 old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
40
41 return
42
43 }
44
45 // Grab the right to wake someone.
46
47 new = (old - 1<<mutexWaiterShift) | mutexWoken
48
49 if atomic.CompareAndSwapInt32(&m.state, old, new) {
50
51 runtime_Semrelease(&m.sema, false)
52
53 return
54
55 }
56
57 old = m.state
58
59 }
60
61 } else {
62
63 // 饑餓模式:将mutex所有權移交給下一個等待的goroutine
64
65 // 注意:mutexlock沒有設定,goroutine會在喚醒後設定。
66
67 // 但是互斥鎖仍然被認為是鎖定的,如果互斥對象被設定,是以新來的goroutines不會得到它
68
69 runtime_Semrelease(&m.sema, true)
70
71 }
72
73}
原文釋出時間為:2018-11-24
本文作者:freelang
本文來自雲栖社群合作夥伴“
Golang語言社群”,了解相關資訊可以關注“
”。