環境:
Golang: go1.18.2 windows/amd64
gRPC: v1.47.0
Protobuf: v1.28.0
完整代碼:
https://github.com/WanshanTian/GolangLearning
cd GolangLearning/RPC/gRPC-Watch
1. 簡介
用戶端可以通過
Watch
機制來訂閱伺服器上某一節點的資料或狀态,當其發生變化時可以收到相應的通知。
前陣子學習了
gRPC
的服務端流模式,今天我們就用這個流模式來具體實作Watch功能
2. 實踐
現有下面一種場景:服務端儲存着使用者的年齡資訊,用戶端發送含使用者姓名的message可以擷取對應使用者的年齡或者更新對應使用者的年齡(年齡+1);通過Watch功能可以實時監聽使用者年齡的狀态,當有使用者的年齡發生變化時,用戶端收到通知
2.1 proto檔案
2.1.1 建立gRPC-Watch檔案夾,使用go mod init初始化,建立pb檔案夾,建立query.proto檔案
syntax = "proto3";
package pb;
option go_package= ".;pb";
import "google/protobuf/empty.proto";
// 定義查詢服務包含的方法
service Query {
rpc GetAge (userInfo) returns (ageInfo) {};
rpc Update (userInfo) returns (google.protobuf.Empty) {};
rpc Watch (watchTime) returns (stream userInfo){}
}
// 請求用的結構體,包含一個name字段
message userInfo {
string name = 1;
}
// 響應用的結構體,包含一個age字段
message ageInfo {
int32 age = 1;
}
// watch的時間
message watchTime{
int32 time = 1;
}
-
和GetAge
方法分别用于擷取年齡和更新年齡,均采用簡單RPC方式Update
-
方法用于監聽年齡狀态的變化,采用服務端流方式Watch
- 當gRPC的方法不需要請求message或者不需要響應message時,可以先
,然後直接使用import "google/protobuf/empty.proto"
google.protobuf.Empty
2.1.2 在.\gRPC-Watch\pb目錄下使用protoc工具進行編譯,在pb檔案夾下直接生成.pb.go和_grpc.pb.go檔案。關于protoc的詳細使用可以檢視【Golang | gRPC】使用protoc編譯.proto檔案
protoc --go_out=./ --go-grpc_out=./ .\query.proto
2.2 服務端
在gRPC-Watch目錄下建立Server檔案夾,建立main.go檔案
2.2.1 下面我們通過Query這個結構體具體實作QueryServer接口
var userinfo = map[string]int32{
"foo": 18,
"bar": 20,
}
// Query 結構體,實作QueryServer接口
type Query struct {
mu sync.Mutex
ch chan string
pb.UnimplementedQueryServer // 涉及版本相容
}
func (q *Query) GetAge(ctx context.Context, info *pb.UserInfo) (*pb.AgeInfo, error) {
age := userinfo[info.GetName()]
var res = new(pb.AgeInfo)
res.Age = age
return res, nil
}
//Update用于更新使用者年齡,通過sync.Mutex加鎖,如果年齡有更新,則向chan發送對應的使用者名
func (q *Query) Update(ctx context.Context, info *pb.UserInfo) (*emptypb.Empty, error) {
q.mu.Lock()
defer q.mu.Unlock()
name := info.GetName()
userinfo[name] += 1
if q.ch != nil {
q.ch <- name
}
return &emptypb.Empty{}, nil
}
//Watch用于監聽使用者年齡狀态的變化,先執行個體化一個chan,然後通過select方法監聽chan内是否有資料,
//如果有則通過服務端流向用戶端發送message,如果超過指定時間無更新,則退出
func (q *Query) Watch(timeSpecify *pb.WatchTime, serverStream pb.Query_WatchServer) error {
if q.ch != nil {
return errors.New("Watching is running, please stop first")
}
q.ch = make(chan string, 1)
for {
select {
case <-time.After(time.Second * time.Duration(timeSpecify.GetTime())):
close(q.ch)
q.ch = nil
return nil
case nameModify := <-q.ch:
log.Printf("The name of %s is updated\n", nameModify)
serverStream.Send(&pb.UserInfo{Name: nameModify})
}
}
}
-
用于更新使用者年齡,通過sync.Mutex加鎖,防止沖突;如果年齡有更新且watch功能開啟,則向chan發送對應的使用者名Update
-
用于監聽使用者年齡狀态的變化,先執行個體化一個chan,表示開啟watch功能。然後通過Watch
方法監聽chan内是否有資料,如果有則通過服務端流向用戶端發送message,如果超過指定時間年齡無更新,則關閉watch功能并退出select
- 當Watch功能已經開啟時,如果再次開啟會傳回報錯
2.3 用戶端
在gRPC-Watch目錄下建立Client檔案夾,建立main.go檔案
func main() {
//建立無認證的連接配接
conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panic(err)
}
defer conn.Close()
client := pb.NewQueryClient(conn)
//RPC方法調用
ctx := context.Background()
//先擷取更新前的年齡
age, _ := client.GetAge(ctx, &pb.UserInfo{Name: "foo"})
log.Printf("Before updating, the age is %d\n", age.GetAge())
//更新年齡
log.Println("updating")
client.Update(ctx, &pb.UserInfo{Name: "foo"})
//再擷取更新後的年齡
age, _ = client.GetAge(ctx, &pb.UserInfo{Name: "foo"})
log.Printf("After updating, the age is %d\n", age.GetAge())
}
2.4 Watch功能
在gRPC-Watch目錄下建立Watch檔案夾,建立main.go檔案
func main() {
//建立無認證的連接配接
conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panic(err)
}
defer conn.Close()
client := pb.NewQueryClient(conn)
//RPC方法調用
stream, _ := client.Watch(context.Background(), &pb.WatchTime{Time: 10})
for {
userInfoRecv, err := stream.Recv()
if err == io.EOF {
log.Println("end of watch")
break
} else if err != nil {
log.Println(err)
break
}
log.Printf("The name of %s is updated\n", userInfoRecv.GetName())
}
}
2.5 運作
首先開啟服務端,然後開啟Watch功能,在開啟用戶端,有如下輸出結果(當指定時間内沒有年齡更新,Watch自動退出):
當Watch已經開啟,并再次開啟時,會傳回如下自定義的報錯,對應2.2代碼裡的
return errors.New("Watching is running, please stop first")
3. 總結
- Watch功能整體思路就是:服務端執行個體化一個chan,如果監聽對象發生變化,向chan中發送值,用戶端從chan中收到值,如果沒有就一直阻塞