天天看點

【Golang | gRPC】使用gRPC實作Watch功能1. 簡介2. 實踐3. 總結

環境:

Golang: go1.18.2 windows/amd64

gRPC: v1.47.0

Protobuf: v1.28.0

完整代碼:

https://github.com/WanshanTian/GolangLearning

cd GolangLearning/RPC/gRPC-Watch

1. 簡介

用戶端可以通過

Watch

機制來訂閱伺服器上某一節點的資料或狀态,當其發生變化時可以收到相應的通知。

前陣子學習了

gRPC

的服務端流模式,今天我們就用這個流模式來具體實作Watch功能

2. 實踐

現有下面一種場景:服務端儲存着使用者的年齡資訊,用戶端發送含使用者姓名的message可以擷取對應使用者的年齡或者更新對應使用者的年齡(年齡+1);通過Watch功能可以實時監聽使用者年齡的狀态,當有使用者的年齡發生變化時,用戶端收到通知

2.1 proto檔案

2.1.1 建立gRPC-Watch檔案夾,使用go mod init初始化,建立pb檔案夾,建立query.proto檔案

syntax = "proto3";
package pb;
option go_package= ".;pb";
import "google/protobuf/empty.proto";

// 定義查詢服務包含的方法
service Query {
  rpc GetAge (userInfo) returns (ageInfo) {};
  rpc Update (userInfo) returns (google.protobuf.Empty) {};
  rpc Watch (watchTime) returns (stream userInfo){}
}

// 請求用的結構體,包含一個name字段
message userInfo {
  string name = 1;
}

// 響應用的結構體,包含一個age字段
message ageInfo {
  int32 age = 1;
}

// watch的時間
message watchTime{
  int32 time = 1;
}
           
  • GetAge

    Update

    方法分别用于擷取年齡和更新年齡,均采用簡單RPC方式
  • Watch

    方法用于監聽年齡狀态的變化,采用服務端流方式
  • 當gRPC的方法不需要請求message或者不需要響應message時,可以先

    import "google/protobuf/empty.proto"

    ,然後直接使用

    google.protobuf.Empty

2.1.2 在.\gRPC-Watch\pb目錄下使用protoc工具進行編譯,在pb檔案夾下直接生成.pb.go和_grpc.pb.go檔案。關于protoc的詳細使用可以檢視【Golang | gRPC】使用protoc編譯.proto檔案

protoc --go_out=./ --go-grpc_out=./ .\query.proto
           

2.2 服務端

在gRPC-Watch目錄下建立Server檔案夾,建立main.go檔案

2.2.1 下面我們通過Query這個結構體具體實作QueryServer接口

var userinfo = map[string]int32{
	"foo": 18,
	"bar": 20,
}

// Query 結構體,實作QueryServer接口
type Query struct {
	mu                          sync.Mutex
	ch                          chan string
	pb.UnimplementedQueryServer // 涉及版本相容
}

func (q *Query) GetAge(ctx context.Context, info *pb.UserInfo) (*pb.AgeInfo, error) {
	age := userinfo[info.GetName()]
	var res = new(pb.AgeInfo)
	res.Age = age
	return res, nil
}
//Update用于更新使用者年齡,通過sync.Mutex加鎖,如果年齡有更新,則向chan發送對應的使用者名
func (q *Query) Update(ctx context.Context, info *pb.UserInfo) (*emptypb.Empty, error) {
	q.mu.Lock()
	defer q.mu.Unlock()
	name := info.GetName()
	userinfo[name] += 1
	if q.ch != nil {
		q.ch <- name
	}
	return &emptypb.Empty{}, nil
}
//Watch用于監聽使用者年齡狀态的變化,先執行個體化一個chan,然後通過select方法監聽chan内是否有資料,
//如果有則通過服務端流向用戶端發送message,如果超過指定時間無更新,則退出
func (q *Query) Watch(timeSpecify *pb.WatchTime, serverStream pb.Query_WatchServer) error {
	if q.ch != nil {
		return errors.New("Watching is running, please stop first")
	}
	q.ch = make(chan string, 1)
	for {
		select {
		case <-time.After(time.Second * time.Duration(timeSpecify.GetTime())):
			close(q.ch)
			q.ch = nil
			return nil
		case nameModify := <-q.ch:
			log.Printf("The name of %s is updated\n", nameModify)
			serverStream.Send(&pb.UserInfo{Name: nameModify})
		}
	}
}
           
  • Update

    用于更新使用者年齡,通過sync.Mutex加鎖,防止沖突;如果年齡有更新且watch功能開啟,則向chan發送對應的使用者名
  • Watch

    用于監聽使用者年齡狀态的變化,先執行個體化一個chan,表示開啟watch功能。然後通過

    select

    方法監聽chan内是否有資料,如果有則通過服務端流向用戶端發送message,如果超過指定時間年齡無更新,則關閉watch功能并退出
  • 當Watch功能已經開啟時,如果再次開啟會傳回報錯

2.3 用戶端

在gRPC-Watch目錄下建立Client檔案夾,建立main.go檔案

func main() {
	//建立無認證的連接配接
	conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Panic(err)
	}
	defer conn.Close()
	client := pb.NewQueryClient(conn)
	//RPC方法調用
	ctx := context.Background()
	//先擷取更新前的年齡
	age, _ := client.GetAge(ctx, &pb.UserInfo{Name: "foo"})
	log.Printf("Before updating, the age is %d\n", age.GetAge())
	//更新年齡
	log.Println("updating")
	client.Update(ctx, &pb.UserInfo{Name: "foo"})
	//再擷取更新後的年齡
	age, _ = client.GetAge(ctx, &pb.UserInfo{Name: "foo"})
	log.Printf("After updating, the age is %d\n", age.GetAge())
}
           

2.4 Watch功能

在gRPC-Watch目錄下建立Watch檔案夾,建立main.go檔案

func main() {
	//建立無認證的連接配接
	conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Panic(err)
	}
	defer conn.Close()
	client := pb.NewQueryClient(conn)
	//RPC方法調用
	stream, _ := client.Watch(context.Background(), &pb.WatchTime{Time: 10})
	for {
		userInfoRecv, err := stream.Recv()
		if err == io.EOF {
			log.Println("end of watch")
			break
		} else if err != nil {
			log.Println(err)
			break
		}
		log.Printf("The name of %s is updated\n", userInfoRecv.GetName())
	}
}
           

2.5 運作

首先開啟服務端,然後開啟Watch功能,在開啟用戶端,有如下輸出結果(當指定時間内沒有年齡更新,Watch自動退出):

【Golang | gRPC】使用gRPC實作Watch功能1. 簡介2. 實踐3. 總結

當Watch已經開啟,并再次開啟時,會傳回如下自定義的報錯,對應2.2代碼裡的

return errors.New("Watching is running, please stop first")

【Golang | gRPC】使用gRPC實作Watch功能1. 簡介2. 實踐3. 總結

3. 總結

  • Watch功能整體思路就是:服務端執行個體化一個chan,如果監聽對象發生變化,向chan中發送值,用戶端從chan中收到值,如果沒有就一直阻塞