天天看點

Go推薦包「八」: 并發安全 (sync)

1.介紹

sync包提供了互斥鎖。除了Once和WaitGroup類型,其餘多數适用于低水準的程式,多數情況下,高水準的同步使用channel通信性能會更優一些。

2.并發等待組(WaitGroup)

WaitGroup,即等待一組Goroutine結束。父Goroutine調用Add()方法來設定應等待Goroutine的數量。每個被等待的Goroutine在結束時應該調用Done()方法。與此同時,主Goroutine可調用Wait()方法阻塞至所有Goroutine結束。

2.1 WaitGroup結構

// A WaitGroup must not be copied after first use.
type WaitGroup struct {
 noCopy noCopy
 state1 [3]uint32
}
           

2.2 方法清單

方法名 功能
(wg *WaitGroup) Add(delta int) 等待組的計數器 +1
(wg *WaitGroup) Done() 等待組的計數器 -1
(wg *WaitGroup) Wait() 當等待組計數器不等于0時,阻塞直到0

2.3 Add參數取值範圍

等待組内部擁有一個計數器,計數器的值可以通過Add(delta int)方法調用實作計數器的增加和減少。該方法應該在建立新的Goroutine之前調用。

參數值x取值

取值 描述
delta < 0 x小于0時,但會報錯: panic: sync: negative WaitGroup counter
delta = 0 x等于0時,會釋放Wait()方法阻塞等待的所有Goroutine
delta > 0 x大于0時,Wait()方法會阻塞Goroutine直到WaitGroup計數減為0

2.4 使用示例

1. 不使用WaitGroup示例

package main
import (
 "fmt"
 "time"
)
func main() {
 // 建立通道
 intChan := make(chan int)

 // 計算1-50的和
 go func(intChan chan int) {
  sum := 0
  for i := 1; i<= 50 ; i++  {
   sum += i
  }
  intChan <- sum
 }(intChan)
 // 計算51-100的和
 go func(intChan chan int) {
  sum := 0
  for i := 51; i<= 100 ; i++  {
   sum += i
  }
  intChan <- sum
 }(intChan)
 // 另外建立個channle聚合結果
 go func(intChan chan int) {
  sum1 := <- intChan
  sum2 := <- intChan
  fmt.Printf("sum1 = %d sum2 = %d  \nsum1 + sum2 = %d \n",sum1,sum2,sum1+sum2)
 }(intChan)

 // 注意,需求手動sleep
 time.Sleep( time.Second)
 fmt.Println("運作結束")
}
/** 輸出:
  sum1 = 1275 sum2 = 3775  
  sum1 + sum2 = 5050 
  運作結束
*/
           

2.使用WaitGroup示例

package main
import (
 "fmt"
 "sync"
)
func main() {
 // 聲明等待組
 var wg sync.WaitGroup
 // 設定,需要等待3個協程執行完成
 wg.Add(3)
 // 建立通道
 intChan := make(chan int)
 // 計算1-50的和
 go func(intChan chan int, wg *sync.WaitGroup) {
  sum := 0
  for i := 1; i <= 50; i++ {
   sum += i
  }
  intChan <- sum
  // 計數器減一
  wg.Done()
 }(intChan, &wg)
 // 計算51-100的和
 go func(intChan chan int, wg *sync.WaitGroup) {
  sum := 0
  for i := 51; i <= 100; i++ {
   sum += i
  }
  intChan <- sum
  // 計數器減一
  wg.Done()
 }(intChan, &wg)
 // 另外建立個channle聚合結果
 go func(intChan chan int,wg *sync.WaitGroup) {
  sum1 := <-intChan
  sum2 := <-intChan
  fmt.Printf("sum1 = %d sum2 = %d  \nsum1 + sum2 = %d \n", sum1, sum2, sum1+sum2)
  // 計數器減一
  wg.Done()
 }(intChan,&wg)
 // 阻塞,直到等待組的計數器等于0
 wg.Wait()
 fmt.Println("運作結束")
}
/**輸出
  sum1 = 3775 sum2 = 1275  
  sum1 + sum2 = 5050 
  運作結束
*/
           

3.互斥鎖(Mutex)

Mutex是一個互斥鎖,保證同時隻有一個Goroutine可以通路共享資源。Mutex類型的鎖和Goroutine無關,可以由不同的Goroutine加鎖和解鎖。也可以為其他結構體的字段,零值為解鎖狀态。

3.1 結構介紹

type Mutex struct {
 state int32 // state 表示目前互斥鎖的狀态
 sema  uint32 // sema 是用于控制鎖狀态的信号量
}
           

3.2 方法清單

方法名 描述
(m *Mutex) Lock() 方法鎖住m,如果m 已經加鎖,則阻塞直到 m 解鎖。
(m *Mutex) Unlock() 解鎖 m,如果 m 未加鎖會導緻運作時錯誤。

3.3 使用(售票)

需求:模拟多個視窗售票

1.不作為結構體屬性使用

package main
import (
 "fmt"
 "sync"
 "time"
)
// 聲明全局等待組
var wg sync.WaitGroup
// 聲明全局鎖
var mutex sync.Mutex
// 聲明全局餘票
var ticket int = 10
func main() {
 // 設定等待組計數器
 wg.Add(3)
 // 視窗賣票
 go saleTicket("視窗A",&wg)
 go saleTicket("視窗B",&wg)
 go saleTicket("視窗C",&wg)
 wg.Wait()
 fmt.Println("運作結束!")
}
// 賣票流程
func saleTicket(windowName string, wg *sync.WaitGroup)  {
 // 賣票流程結束後關閉
 defer wg.Done()
 for {
  // 加鎖
  mutex.Lock()
  if  ticket > 0{
   time.Sleep(10 * time.Millisecond)
   ticket--
   fmt.Printf("%s 賣出一張票,餘票: %d \n",windowName,ticket)
  } else  {
   fmt.Printf("%s 票已賣完! \n",windowName)
   // 解鎖
   mutex.Unlock()
   break
  }
  // 解鎖
  mutex.Unlock()
 }
}
/**輸出
  視窗C 賣出一張票,餘票: 9 
  視窗C 賣出一張票,餘票: 8 
  視窗B 賣出一張票,餘票: 7 
  視窗A 賣出一張票,餘票: 6 
  視窗C 賣出一張票,餘票: 5 
  視窗B 賣出一張票,餘票: 4 
  視窗A 賣出一張票,餘票: 3 
  視窗C 賣出一張票,餘票: 2 
  視窗B 賣出一張票,餘票: 1 
  視窗A 賣出一張票,餘票: 0 
  視窗C 票已賣完! 
  視窗B 票已賣完! 
  視窗A 票已賣完! 
  運作結束!
*/
           

2.作為結構體屬性使用

package main
import (
 "fmt"
 "strconv"
 "sync"
 "time"
)
// 聲明一個票池
type ticketPool struct {
 over int
 lock sync.Mutex
 wg   sync.WaitGroup
}
// 定義售票方法
func (t *ticketPool) sellTicket(windowName string) {
 // 等待組減一
 defer t.wg.Done()
 for {
  // 加鎖
  t.lock.Lock()
  if t.over > 0 {
   time.Sleep(10 * time.Millisecond)
   t.over--
   fmt.Printf("%s 賣出一張票,餘票: %d \n", windowName, t.over)
  } else {
   // 無票,跳無限循環并解鎖
   t.lock.Unlock()
   fmt.Printf("%s 票已賣完! \n", windowName)
   break
  }
  // 正常售票流程解鎖
  t.lock.Unlock()
 }
}
func main() {
 // 建立一個票池
 ticketP := ticketPool{over: 10}
 fmt.Printf("T:%T v: %v \n", ticketP, ticketP)
 // 設定視窗數量
 windowNum := 3
 // 設定等待組計數器
 ticketP.wg.Add(windowNum)
 // 定義3個視窗售票
 for i:= 1 ; i <= windowNum; i++ {
  go ticketP.sellTicket("視窗" + strconv.Itoa(i))
 }
 ticketP.wg.Wait()
 fmt.Println("運作結束!")
}
/**輸出
  視窗3 賣出一張票,餘票: 9 
  視窗3 賣出一張票,餘票: 8 
  視窗1 賣出一張票,餘票: 7 
  視窗2 賣出一張票,餘票: 6 
  視窗3 賣出一張票,餘票: 5 
  視窗1 賣出一張票,餘票: 4 
  視窗2 賣出一張票,餘票: 3 
  視窗3 賣出一張票,餘票: 2 
  視窗1 賣出一張票,餘票: 1 
  視窗2 賣出一張票,餘票: 0 
  視窗1 票已賣完! 
  視窗2 票已賣完! 
  視窗3 票已賣完! 
  運作結束!
*/
           

4.讀寫鎖(RWMutex)

4.1 結構介紹

RWMutex是讀寫互斥鎖,簡稱讀寫鎖。該鎖可以同時被多個讀取者持有或被唯一個寫入者持有。RWMutex類型鎖跟Goroutine無關,可以由不同的Goroutine加鎖、解鎖。RWMutex也可以建立為其他結構體的字段;零值為解鎖狀态。

1. RWMutex鎖結構

type RWMutex struct {
    w  Mutex //用于控制多個寫鎖,獲得寫鎖首先要擷取該鎖,如果有一個寫鎖在進行,那麼再到來的寫鎖将會阻塞于此
    writerSem  uint32 //寫阻塞等待的信号量,最後一個讀者釋放鎖時會釋放信号量
    readerSem  uint32 //讀阻塞的協程等待的信号量,持有寫鎖的協程釋放鎖後會釋放信号量
    readerCount int32  //記錄讀者個數
    readerWait  int32  //記錄寫阻塞時讀者個數
}
           

2. 讀寫鎖堵塞場景

  1. 寫鎖需要阻塞寫鎖:一個協程擁有寫鎖時,其他協程寫鎖需要阻塞
  2. 寫鎖需要阻塞讀鎖:一個協程擁有寫鎖時,其他協程讀鎖需要阻塞
  3. 讀鎖需要阻塞寫鎖:一個協程擁有讀鎖時,其他協程寫鎖需要阻塞
  4. 讀鎖不能阻塞讀鎖:一個協程擁有讀鎖時,其他協程也可以擁有讀鎖

4.2 方法清單

方法名 描述
(rw *RWMutex) RLock() 擷取讀鎖,當一個協程擁有讀鎖時,其他協程寫鎖需要阻塞。
(rw *RWMutex) RUnlock() 釋放讀鎖。
(rw *RWMutex) Lock() 擷取寫鎖,與Mutex完全一緻; 當一個協程擁有寫鎖時,其他協程讀寫鎖都需要阻塞
(rw *RWMutex) Unlock() 釋放寫鎖

4.3 使用(讀寫檔案)

1.不作為結構體屬性使用

package main
import (
 "fmt"
 "strconv"
 "sync"
)
// 聲明全局變量,檔案内容
var fileContext string
// 聲明全局讀寫互斥鎖
var rxMutex sync.RWMutex
// 聲明全局等待組
var wg sync.WaitGroup
func main() {
 // 設定計數器
 wg.Add(5)
 for i := 1; i <= 5; i++ {
  name := "同學-" + strconv.Itoa(i)
  if i%2 == 0 {
   go readFile(name)
  } else {
   go writeFile(name, strconv.Itoa(i))
  }
 }
 // 等待所有計數器執行完成
 wg.Wait()
 fmt.Println("運作結束!")
}

// 讀檔案
func readFile(name string) {
 // 釋放讀鎖
 defer rxMutex.RUnlock()
 // 擷取讀鎖
 rxMutex.RLock()
 // 列印讀取内容
 fmt.Printf("%s 擷取讀鎖,讀取内容為: %s \n", name, fileContext)
 // 計數器減一
 wg.Done()
}
// 寫檔案
func writeFile(name, s string) {
 // 釋放寫鎖
 defer rxMutex.Unlock()
 // 擷取寫鎖
 rxMutex.Lock()
 // 寫入内容
 fileContext = fileContext + " " + s
 fmt.Printf("%s 擷取寫鎖,寫入内容: %s。 檔案内容變成: %s \n", name, s, fileContext)
 // 計數器減一
 wg.Done()
}

/**輸出
  同學-1 擷取寫鎖,寫入内容: 1。 檔案内容變成:  1 
  同學-4 擷取讀鎖,讀取内容為:  1 
  同學-2 擷取讀鎖,讀取内容為:  1 
  同學-5 擷取寫鎖,寫入内容: 5。 檔案内容變成:  1 5 
  同學-3 擷取寫鎖,寫入内容: 3。 檔案内容變成:  1 5 3 
  運作結束!
*/
           

2.作為結構體屬性使用

package main
import (
 "fmt"
 "strconv"
 "sync"
 "time"
)
// 定義一個檔案結構體
type fileResource struct {
 content string
 wg sync.WaitGroup
 rwLock sync.RWMutex
}

// 讀檔案
func (f *fileResource)readFile(name string) {
 // 釋放讀鎖
 defer f.rwLock.RUnlock()
 // 擷取讀鎖
 f.rwLock.RLock()
 // 列印讀取内容
 time.Sleep(time.Second)
 fmt.Printf("%s 擷取讀鎖,讀取内容為: %s \n", name, f.content)
 // 計數器減一
 f.wg.Done()
}

// 寫檔案
func (f *fileResource)writeFile(name, s string) {
 // 釋放寫鎖
 defer f.rwLock.Unlock()
 // 擷取寫鎖
 f.rwLock.Lock()
 // 寫入内容
 time.Sleep(time.Second)
 f.content = f.content + " " + s
 fmt.Printf("%s 擷取寫鎖,寫入内容: %s。 檔案内容變成: %s \n", name, s, f.content)
 // 計數器減一
 f.wg.Done()
}
func main() {
 // 聲明結構體
 var file fileResource
 // 設定計數器
 file.wg.Add(5)
 for i := 1; i <= 5; i++ {
  name := "同學-" + strconv.Itoa(i)
  if i%2 == 0 {
   go file.readFile(name)
  } else {
   go file.writeFile(name, strconv.Itoa(i))
  }
 }
 // 等待所有計數器執行完成
 file.wg.Wait()
 fmt.Println("運作結束!")
}
/**輸出
  同學-5 擷取寫鎖,寫入内容: 5。 檔案内容變成:  5 
  同學-1 擷取寫鎖,寫入内容: 1。 檔案内容變成:  5 1 
  同學-2 擷取讀鎖,讀取内容為:  5 1 
  同學-3 擷取寫鎖,寫入内容: 3。 檔案内容變成:  5 1 3 
  同學-4 擷取讀鎖,讀取内容為:  5 1 3 
  運作結束!
*/
           

5.條件變量(Cond)

5.1 介紹

與互斥鎖不同,條件變量的作用并不是保證在同一時刻僅有一個線程通路某一個共享資料,而是在對應的共享資料的狀态發生變化時,通知其他是以而被阻塞的線程。條件變量總是與互斥鎖組合使用,互斥鎖為共享資料的通路提供互斥支援,而條件變量可以就共享資料的狀态的變化向相關線程發出通知。

使用場景: 我需要完成一項任務,但是這項任務需要滿足一定條件才可以執行,否則我就等着。

5.2 方法清單

方法名 描述
NewCond(l Locker) *Cond 生成一個cond,需要傳入實作Locker接口的變量。 一般是*Mutex或*RWMutex類型的值。
(c *Cond) Wait() 等待通知
(c *Cond) Signal() 發送單個通知
(c *Cond) Broadcast() 廣播(多個通知)

5.3 使用示例

package main
import (
 "fmt"
 "sync"
 "time"
)
func main() {
 // 聲明互斥鎖
 var mutex sync.Mutex
 // 聲明條件變量
 cond := sync.NewCond(&mutex)
 for i := 1; i <= 10; i++ {
  go func(i int) {
   // 擷取鎖
   cond.L.Lock()
   // 釋放鎖
   defer cond.L.Unlock()
   // 等待通知,阻塞目前協程
   cond.Wait()
   // 等待通知後列印輸出
   fmt.Printf("輸出:%d ! \n", i)
  }(i)
 }
 // 單個通知
 time.Sleep(time.Second)
 fmt.Println("單個通知A!")
 cond.Signal()
 time.Sleep(time.Second)
 fmt.Println("單個通知B!")
 cond.Signal()

 // 廣播通知
 time.Sleep(time.Second)
 fmt.Println("廣播通知!并睡眠1秒,等待其他協程輸出!")
 cond.Broadcast()
 // 等待其他協程處理完
 time.Sleep(time.Second)
 fmt.Println("運作結束!")
}
/**輸出
  單個通知A!
  輸出:1 ! 
  單個通知B!
  輸出:4 ! 
  廣播通知!并睡眠1秒,等待其他協程輸出!
  輸出:10 ! 
  輸出:2 ! 
  輸出:3 ! 
  輸出:8 ! 
  輸出:9 ! 
  輸出:6 ! 
  輸出:5 ! 
  輸出:7 ! 
  運作結束!
*/
           

6.一次(Once)

sync.Once 是使Go方法隻執行一次的對象實作,作用與 init 函數類似,但也有所不同。差別如下:

  • init 函數是在檔案包首次被加載的時候執行,且隻執行一次
  • sync.Onc 是在代碼運作中需要的時候執行,且隻執行一次

6.1 方法介紹

方法名 描述
(o *Once) Do(f func()) 函數隻會執行一次,并保證在傳回時,傳入Do的函數已經執行完成。 多個 goroutine 同時執行 once.Do 的時候,可以保證搶占到 once.Do 執行權的 goroutine 執行完 once.Do 後,其他goroutine才能得到傳回 。

6.2 使用示例

示例1: 重複調用隻執行一次

package main
import (
 "fmt"
 "strconv"
 "sync"
 "time"
)
func main() {
 echo := func() {
  t := time.Now().Unix()
  fmt.Printf("輸出時間 %v ",strconv.FormatInt(t,10))
 }
 var one sync.Once
  // 雖然周遊調用,但是隻會執行一次
 for i := 1; i< 10 ; i++  {
  go func(a,b int) {
   one.Do(echo)
  }(i,i+1)
 }
 time.Sleep(3 * time.Second)
 fmt.Println("運作結束!")
}
/**輸出
  輸出時間 1608083525 
  運作結束!
*/
           

7.對象池(Pool)

7.1 為什麼使用?

Go語言是支援垃圾自動回收的。對于一些暫時用不到但是後續會用到的對象,為了提升性能,可以先暫存起來,這雖然會占用一些記憶體,但是比起銷毀了再建立,要節省運作時間。Go語言專門提供了暫存對象的工具,就是sync.Pool。

sync.Pool是一個對象池,它是并發安全的,而且大小是可伸縮的,僅受限于記憶體。當需要使用對象的時候可以從對象池中直接取出使用。

7.2 資料結構

type Pool struct {
 noCopy noCopy //禁止複制
 local     unsafe.Pointer //本地緩沖池指針,每個處理器配置設定一個;其類型是[P]poolLocal數組
 localSize uintptr //數組大小

 victim     unsafe.Pointer // local from previous cycle
 victimSize uintptr        // size of victims array

 // 緩存池沒有對象時,調用此方法建立
 New func() interface{}
}
           

7.3 方法清單

sync.Pool提供以下兩個公共方法,用來操作對象池。

方法名 描述
(p *Pool) Put(x interface{}) 向池中添加對象
(p *Pool) Get() interface{} 從池中擷取對象
Get方法是從池中擷取對象,如果沒有對象則調用New方法建立生成,如果未設定New則傳回nil。

7.4 使用示例

package main
import (
 "fmt"
 "sync"
)
func main() {
 // 建立對象池
 pool := sync.Pool{
  New: func() interface{}{
   return make([]string,5)
  },
 }
 // 首次擷取
 fmt.Printf("不設定直接擷取: %v\n",pool.Get())
 // 設定後擷取
 pool.Put([]string{"Hello","Word"})
 // 設定後擷取
 fmt.Printf("設定後,第一次擷取: %v\n",pool.Get())
 fmt.Printf("設定後,第二次擷取: %v\n",pool.Get())
}
           

7.5 注意事項

存入sync.Pool的對象可能會在不通知的情況下被釋放,這一點一定要注意。比如一些socket長連接配接就不适合存入sync.Pool内。

8.sync.Map

如果要緩存的資料量不大,可以考慮使用sync.Map(Go 1.9+版本支援)。在1.6版本以前,Go語言自帶标準的map類型是并發讀安全的,但是并發寫不安全。

8.1 查詢和新增

a.查找方法:

  • Load: 通過參數key查詢對應的value,如果不存在則傳回nil;ok表示是否找到對應的值。

b.新增方法:

  • Store: 對sync.Map的更新或新增,參數是鍵值對
  • LoadOrStore: 參數為key和value。根據參數key查找對應的value,如果找到,則不修改原來的值并通過actual傳回,并且loaded為true;如果未找到,則存儲key-value并且将存儲的value通過actual傳回,loaded為false。

c.使用示例:

package main
import (
 "fmt"
 "sync"
)
func main() {
 // 定義map 類型
 var syncMap sync.Map
 // 新增
 syncMap.Store("name","張三")
 load, _ := syncMap.Load("name")
 fmt.Printf("Store新增->name:%v\n",load)
 // 找到則不更新,傳回舊值
 store, loaded := syncMap.LoadOrStore("name", "李四")
 fmt.Printf("找到則傳回舊值-> name:%v loaded:%v \n",store,loaded)
 // 找不到則新增
 age, loaded := syncMap.LoadOrStore("age", 20)
 fmt.Printf("找不到則新增-> age:%v loaded:%v \n",age,loaded)
}
/**輸出:
Store新增->name:張三
找到則傳回舊值-> name:張三 loaded:true 
找不到則新增-> age:20 loaded:false 
*/
           

8.2 删除

a.方法清單:

  • LoadAndDelete: 根據參數key删除對應的value,如果找到則删除,并通過value傳回删除的值,并設定loaded為true;如果未找到,則value傳回nil,loaded為false。
  • Delete:根據參數key删除對應的value。

b.使用示例:

package main
import (
 "fmt"
 "sync"
)
func main() {
 // 定義map 類型
 var syncMap sync.Map
 // 新增
 syncMap.Store("name","張三")
 syncMap.Store("age",20)

  // 找到情況
 andDelete, loaded := syncMap.LoadAndDelete("name")
 fmt.Printf("找到-> val:%v loaded:%v \n",andDelete,loaded)
 search, ok := syncMap.Load("name")
 fmt.Printf("删除name後查找-> search:%v ok:%v \n",search,ok)

 // 找不到情況
 andDelete2, loaded := syncMap.LoadAndDelete("name2")
 fmt.Printf("找不到-> val:%v loaded:%v \n",andDelete2,loaded)

 syncMap.Delete("age")
 searchAge, ok := syncMap.Load("name")
 fmt.Printf("删除age後查找-> searchAge:%v ok:%v \n",searchAge,ok)
}
/** 輸出
找到-> val:張三 loaded:true 
删除name後查找-> search:<nil> ok:false 
找不到-> val:<nil> loaded:false 
删除age後查找-> searchAge:<nil> ok:false 
*/
           

8.3 周遊

sync.Map不能通過for...range周遊,隻能通過包提供的方法Range進行周遊。

package main
import (
 "fmt"
 "sync"
)
func main() {
 // 定義map 類型
 var syncMap sync.Map
 // 新增
 syncMap.Store("name", "張三")
 syncMap.Store("age", 20)
 syncMap.Store("home", "天津永和大區")
 syncMap.Range(func(key, value interface{}) bool {
  fmt.Printf("key: %v value: %v \n", key, value)
  return true
 })
}
/**輸出
key: name value: 張三 
key: age value: 20 
key: home value: 天津永和大區 
*/
           

微信搜尋關注【猿碼記】檢視更多文章。

繼續閱讀