同步
一、鎖的使用
- go語言除了提供并發程式設計模型還提供了同步工具(sync,sync/atomic)包括互斥鎖,讀寫鎖,
- 互斥鎖
I. 互斥鎖 是并發程式對共享資(臨界區)源進行通路控制的主要手段,由标準庫sync.Mutex類型提供。
II. sync.Mutex零值表示未被鎖定的互斥量,提供倆個方法 Lock和Unlock,前者用于鎖定,後者用于解鎖
III.//案例1
var mx sync.Mutex
mx.Lock()
defer mx.Unlock()
//案例2
func main() {
var mx sync.Mutex
log.Println("Lock the lock (G0)")
mx.Lock()
log.Println("the lock is locked.(G0)")
for i:=1;i<=3;i++ {
go func(i int) {
log.Printf("lock the lock (G%d)\n",i)
mx.Lock()
log.Printf("the lock is locked (G%d)\n",i)
}(i)
}
time.Sleep(time.Second)
log.Println("unlock the (G0)")
mx.Unlock()
log.Println("the lock is unlocked.(G0)")
time.Sleep(time.Second)
}
/**
E:\source\go\src\exam>go run main.go
2022/10/08 14:11:46 Lock the lock (G0)
2022/10/08 14:11:46 the lock is locked.(G0)
2022/10/08 14:11:46 lock the lock (G3)
2022/10/08 14:11:46 lock the lock (G2)
2022/10/08 14:11:46 lock the lock (G1)
2022/10/08 14:11:47 unlock the (G0)
2022/10/08 14:11:47 the lock is unlocked.(G0)
2022/10/08 14:11:47 the lock is locked (G3)
*/
IV. main函數的goroutine 稱G0,另外又開了3個goroutine 分别稱G1,G2,G3.在啟用這3個Goroutine之前已經對互斥鎖mx進行了鎖定。
VI. 我們看到G1,G2,G3 在G0 鎖定後,都被阻塞了。原因是該互斥鎖已經處于鎖定狀态。
VII. 随後我們釋放了G0,之後G3獲得了互斥鎖的鎖定,其餘依然處于鎖定狀态。
VIII. 其他語言可能會忘記鎖定的互斥量,忘記解鎖。然而Go語言提供了defer mx.Unlock 語句 極大地降低了可能性
IX. 互斥鎖的鎖定操作和解鎖操作必須成對出現,如果對于一個已經鎖定的互斥量,重複鎖定将會被阻塞,直到該互斥鎖回到解鎖狀态。
X. 雖然互斥鎖可以在多個goroutine共享,但我們強烈建議把同一個互斥鎖的成對鎖定與解鎖操作放在同一個層次中。
- 讀寫鎖
I. 讀寫鎖針對于讀寫操作的互斥鎖,與普通的互斥鎖最大的不同在于 可以分别針對讀操作和寫操作進行 鎖定和解鎖操作。
II. 讀寫鎖允許任意個讀操作同時進行,同一時刻隻允許有一個寫操作在進行;在寫操作時,讀操作也不被允許。
III.
var mx sync.RWMutex
mx.Lock()
mx.Unlock()
mx.RLock()
mx.RUnlock()
IV. 前者對應寫操作的鎖定和解鎖,後者對應讀操作的鎖定和解鎖;寫解鎖會喚醒被阻塞的讀鎖goroutine 。讀解鎖在确定無其他讀鎖時,會喚醒被阻塞的寫鎖goroutine。
VI. 對于未被寫鎖定的讀寫鎖 進行寫解鎖,會引發運作時恐慌;對于未被讀鎖定的讀寫鎖 進行 讀解鎖不會。
- 鎖的案例
I. 建立一個檔案來存放資料
II. 同一時刻允許有多個goroutine分别對檔案寫操作和讀操作,寫入操作不能穿插,并且彼此獨立。讀操作要互相獨立,每次讀取的資料塊不能重複,且按順序讀取。
III.package main
import (
"errors"
"io"
"os"
"sync"
)
type Data []byte
type DataFile interface {
//一次讀取一個資料塊
Read() (rsn int64,d Data,err error)
//一次寫入一個資料塊
Write(d Data) (wsn int64,err error)
//傳回最後讀取資料塊的序号
Rsn() int64
//傳回最後寫入資料塊的序号
Wsn() int64
//寫入或讀取資料塊的 長度
DataLen() int64
}
type myDataFile struct {
//檔案描述符
f *os.File
//讀寫鎖
fMutex sync.RWMutex
//寫入偏移量
wOffset int64
//讀入偏移量
rOffset int64
//寫偏移量 互斥鎖
wMutex sync.Mutex
//讀偏移量互斥鎖
rMutex sync.Mutex
//資料塊長度
dataLen uint32
}
func NewDataFile(path string,dataLen uint32) (DataFile,error) {
f,err := os.Create(path)
if err != nil {
return nil, err
}
if dataLen ==0 {
return nil, errors.New("invalid data length")
}
df := &myDataFile{f: f,dataLen: dataLen}
return df,nil
}
/**
1. 擷取并更新讀偏移量
2. 根據偏移量 從檔案中讀取指定len一塊資料
3. 把資料塊封裝成Data類型,
*/
func (f *myDataFile) Read() (rsn int64,d Data,err error) {
//1. 擷取并更新讀偏移量
var offset int64
f.rMutex.Lock()
offset = f.rOffset
f.rOffset += int64(f.dataLen)
f.rMutex.Unlock()
//2. 根據偏移量 從檔案中讀取指定len一塊資料
rsn = offset/int64(f.dataLen)
bytes := make([]byte,f.dataLen)
// 3. 把資料塊封裝成Data類型,
for {
f.fMutex.RLock()
_,err := f.f.WriteAt(bytes,offset)
if err != nil {
if err == io.EOF {
f.fMutex.RUnlock()
continue
}
f.fMutex.RUnlock()
return 0, nil, err
}
}
d = bytes
return
}
func (f *myDataFile) Write(d Data) (wsn int64,err error) {
//1. 讀入寫入偏移量并修改
var offset int64
f.wMutex.Lock()
offset = f.wOffset
f.wOffset += int64(f.dataLen)
f.wMutex.Unlock()
//寫入資料塊
wsn = offset/int64(f.dataLen)
var bytes []byte
if len(d) > int(f.dataLen) {
bytes = d[:f.dataLen]
} else {
bytes = d
}
f.fMutex.Lock()
defer f.fMutex.Unlock()
_,err = f.f.WriteAt(bytes,offset)
return wsn, err
}
/**
*/
func (f *myDataFile) Rsn() int64 {
f.rMutex.Lock()
defer f.rMutex.Unlock()
return f.rOffset/int64(f.dataLen)
}
/**
1. 互斥鎖 鎖定操作
2. 同時通過defer及時解鎖
*/
func (f *myDataFile) Wsn() int64 {
f.wMutex.Lock()
defer f.wMutex.Unlock()
return f.wOffset/int64(f.dataLen)
}
func (f *myDataFile) DataLen() int64 {
return 0
}
IV. 需要設定三個鎖,對檔案的讀寫鎖,分别
對讀偏移量,寫偏移量的兩個互斥鎖。
VI. 但在讀的goroutine > 寫goroutine時,寫goroutine阻塞,導緻寫操作沒有機會。最終緻使沒有資料可讀/io.EOF,需要針對這種邊界做特别處理
二、條件變量
- 需要借助sync.NewCond函數建立 sync.Cond條件變量類型。而sync.Cond有三個方法Wait,Signal,BroadCast方法。分别代表了等待通知,單發通知,廣播通知。
- Wait方法會自動對與該條件變量關聯的鎖進行解鎖,同時使調用方所在goroutine被阻塞。一旦該方法收到通知,就會嘗試再次鎖定該鎖。如果鎖定成功,就會喚醒被阻塞的goroutine
- Signal,BroadCast作用都是 發送 通知 來喚醒 被Wait阻塞的goroutine,不同的是,前者是單一通知。後者是廣播通知
-
/**
1. 擷取并更新讀偏移量
2. 根據偏移量 從檔案中讀取指定len一塊資料
3. 把資料塊封裝成Data類型,
*/
func (f *myDataFile) Read() (rsn int64,d Data,err error) {
//1. 擷取并更新讀偏移量
var offset int64
f.rMutex.Lock()
offset = f.rOffset
f.rOffset += int64(f.dataLen)
f.rMutex.Unlock()
//2. 根據偏移量 從檔案中讀取指定len一塊資料
f.cond= sync.NewCond(&f.fMutex)
rsn = offset/int64(f.dataLen)
bytes := make([]byte,f.dataLen)
// 3. 把資料塊封裝成Data類型,
f.fMutex.RLock()
defer f.fMutex.Unlock()
for {
_,err := f.f.WriteAt(bytes,offset)
if err != nil {
if err == io.EOF {
f.cond.Wait() //通過條件變量阻塞目前 goroutine 讓其他goroutine 有寫的機會
continue
}
f.fMutex.RUnlock()
return 0, nil, err
}
}
d = bytes
return
}
- 檔案内容讀操作造成EOF錯誤時,通過條件變量讓目前 goroutine暫時放棄對fmutex 讀鎖,阻塞目前攜程,并等待通知的到來。
- 在寫操作完成後,應該及時向條件變量發送signal通知喚醒讀goroutine
func (f *myDataFile) Write(d Data) (wsn int64,err error) {
//1. 讀入寫入偏移量并修改
var offset int64
f.wMutex.Lock()
offset = f.wOffset
f.wOffset += int64(f.dataLen)
f.wMutex.Unlock()
//寫入資料塊
wsn = offset/int64(f.dataLen)
var bytes []byte
if len(d) > int(f.dataLen) {
bytes = d[:f.dataLen]
} else {
bytes = d
}
f.fMutex.Lock()
defer f.fMutex.Unlock()
_,err = f.f.WriteAt(bytes,offset)
f.cond.Signal() //通知讀鎖定獲得鎖
return wsn, err
}
三、原子操作
- 原子操作是過程不能被中斷的操作,針對某個值的原子操作在進行中,cpu不會再進行其他針對該值的操作
- go語言提供int32,int64,uint32,uint64,uintptr,unsafe.Pointer,類型,對應的操作有 增/減/比較并交換/載入/存儲/交換
- 原子增減操作即可實作對被操作值得增加或減小。atomic.AddInt64(&i,3)
- 比較并交換,atomic.CompareAndSwapInt32(&value,old,new).僅當old與value相等時,new會替換掉old值。
- 載入,atomic.LoadInt32(&value),原子讀取變量value
- 存儲,原子存儲某個值得過程中,任何cpu都不會進行針對同一個值得讀或寫入操作。
四、執行一次
- sync.Once 類型變量 接受一個無參數,無結果的函數值。該方法一旦被調用,就會調用由參數傳進來的那個函數。無論我們調用該方法多少次,無論我們多次調用傳遞給它的參數值是否相同,都僅有第一次調用有效。
-
package main
import (
"fmt"
"sync"
"time"
)
var once sync.Once
func main() {
var num int
sign := make(chan bool)
f := func(ii int ) func() {
return func() {
num = num + ii *2
sign <- true
}
}
for i:=0;i<3;i++ {
fi := f(i+1)
go once.Do(fi)
}
for j:=0;j<3;j++ {
select {
case <-sign:
fmt.Println("received a signal")
case <-time.After(time.Millisecond* 100):
fmt.Println("timeout.")
}
}
fmt.Printf("Num:%d\n",num)
}
/**
2022/10/09 11:27:44 0
2022/10/09 11:27:44 received a signal
2022/10/09 11:27:44 timeout.
2022/10/09 11:27:44 timeout.
2022/10/09 11:27:44 Num:2
*/
五、WaitGroup
- var wg sync.WaitGroup 是一個結構體類型,其中計數值是0;可以通過wg.Add方法增加或減少計數值,但千萬别讓計數值為負數,會産生運作恐慌。也可以通過wg.Done方法減少計數值。
- 調用wg.Wait方法會檢查計數值是否為0;如果不為0,就會阻塞目前goroutine。
-
package main
import "sync"
func main() {
var wg sync.WaitGroup
wg.Add(3)
go func() { // G2
wg.Done()
}()
go func() { // G3
wg.Done()
}()
go func() { // G4
wg.Done()
}()
wg.Wait()
}
六、臨時對象池
-
package main
import (
"log"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
)
func main() {
defer debug.SetGCPercent(debug.SetGCPercent(-1))
var count int32
newFunc := func() interface{} {
return atomic.AddInt32(&count,1)
}
pool := sync.Pool{New: newFunc}
v1 := pool.Get()
log.Printf("v1:%v\n",v1)
//臨時對象池的存儲
pool.Put(newFunc())
//pool.Put(newFunc())
//pool.Put(newFunc())
v2 := pool.Get()
log.Printf("v2:%v\n",v2)
//垃圾回收對臨時對象池的影響
debug.SetGCPercent(100)
runtime.GC()
v3 := pool.Get()
log.Printf("v3:%v\n",v3)
//pool.New = nil
v4 := pool.Get()
log.Printf("v4:%v\n",v4)
}
/**
E:\source\go\src\exam>go run main.go
2022/10/09 12:04:42 v1:1
2022/10/09 12:04:42 v2:2
2022/10/09 12:04:42 v3:3
2022/10/09 12:04:42 v4:4
*/
- 臨時對象池的對象值可能在任何時候被移除,并不會通知池的調用方。在垃圾回收器開始回收記憶體垃圾的時候。
- 被指派給New字段的函數被臨時用來建立對象