流模式入門(上)、場景:批量查詢使用者積分
為何要用流模式
前面的例子,我們僅僅是傳輸比較小的資料 基本模式是用戶端請求----服務端響應
如果是傳輸較大資料呢?會帶來
1、資料包過大導緻壓力陡增
2、需要等待用戶端包全部發送,才能處理以及響應
1,普通查詢積分方式
服務端:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiYWan5SO0gTMxATOhZWO1UjMzUTNyYzXzMzMwYTMyEzLcRDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.gif)
syntax="proto3";
package services;
import "google/protobuf/timestamp.proto";
message ProdModel{ //商品模型
int32 prod_id=1;
string prod_name=2;
float prod_price=3;
}
message OrderMain{ //主訂單模型
int32 order_id=1;//訂單ID,數字自增
string order_no=2; //訂單号
int32 user_id=3; //購買者ID
float order_money=4;//商品金額
google.protobuf.Timestamp order_time=5; //下單時間
repeated OrderDetail order_details=6;
}
//子訂單模型
message OrderDetail{
int32 detail_id=1;
string order_no=2;
int32 prod_id=3;
float prod_price=4;
int32 prod_num=5;
}
//使用者模型
message UserInfo{
int32 user_id=1;
int32 user_score=2;
}
Models.proto
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiYWan5SO0gTMxATOhZWO1UjMzUTNyYzXzMzMwYTMyEzLcRDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.gif)
syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
}
Users.proto
執行腳本 生成pd.go檔案
cd pbfiles && protoc --go_out=plugins=grpc:../services Prod.proto
protoc --go_out=plugins=grpc:../services Orders.proto
protoc --go_out=plugins=grpc:../services Users.proto
protoc --go_out=plugins=grpc:../services --validate_out=lang=go:../services Models.proto
protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto
protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto
protoc --grpc-gateway_out=logtostderr=true:../services Users.proto
cd ..
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiYWan5SO0gTMxATOhZWO1UjMzUTNyYzXzMzMwYTMyEzLcRDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.gif)
package services
import "context"
type UserService struct {
}
func(*UserService) GetUserScore(ctx context.Context, in *UserScoreRequest) (*UserScoreResponse, error){
var score int32=101
users:=make([]*UserInfo,0)
for _,user:=range in.Users{
user.UserScore=score
score++
users=append(users,user)
}
return &UserScoreResponse{Users:users},nil
}
UserService.go
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiYWan5SO0gTMxATOhZWO1UjMzUTNyYzXzMzMwYTMyEzLcRDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.gif)
package main
import (
"google.golang.org/grpc"
"grpcpro/services"
"net"
)
func main() {
rpcServer:=grpc.NewServer()
services.RegisterProdServiceServer(rpcServer,new(services.ProdService))//商品服務
services.RegisterOrderSerivceServer(rpcServer,new(services.OrdersService))//訂單服務
services.RegisterUserServiceServer(rpcServer,new(services.UserService))
lis,_:=net.Listen("tcp",":8081")
rpcServer.Serve(lis)
}
server.go
go build server.go
用戶端:
拷貝服務端生成的pd.go檔案到用戶端
func main(){
conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
if err!=nil{
log.Fatal(err)
}
defer conn.Close()
ctx:=context.Background()
userClient:=services.NewUserServiceClient(conn)
var i int32
req:=services.UserScoreRequest{}
req.Users=make([]*services.UserInfo,0)
for i=1;i<20;i++{
req.Users=append(req.Users,&services.UserInfo{UserId:i})
}
res,_ := userClient.GetUserScore(ctx,&req)
fmt.Println(res.Users)
}
go build maiin.go
列印結果:
[user_id:1 user_score:101 user_id:2 user_score:102 user_id:3 user_score:103 user_id:4 user_score:104 user_id:5 user_score:105 user_id:6 user_score:106 user_id:7 user_score:107 user_id:8 user_score:108 user_id:9 user_score:109 user_id:10 user_score:110 user_id:11 user_score:111 user_id:12 user_score:112 user_id:13 user_score:113 user_id:14 user_score:114 user_id:15 user_score:115 user_id:16 user_score:116 user_id:17 user_score:117 user_id:18 user_score:118 user_id:19 user_score:119 ]
Process finished with exit code 0
2,服務端流
假設 用戶端一次性發送6個客戶資料給服務端
再假設 服務端查詢使用者積分 有點慢。是以 采用的政策是 服務端每查詢2個就發送給用戶端
服務端:
修改users.proto
syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
}
處理方法:
func(*UserService) GetUserScoreByServerStream(in *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error {
var score int32=101
users:=make([]*UserInfo,0)
for index,user:=range in.Users{
user.UserScore=score
score++
users=append(users,user)
if (index+1) % 2==0 && index>0{
err:=stream.Send(&UserScoreResponse{Users:users})
if err!=nil{
return err
}
users=(users)[0:0]
}
time.Sleep(time.Second*1)
}
if len(users)>0{
err:=stream.Send(&UserScoreResponse{Users:users})
if err!=nil{
return err
}
}
return nil
}
用戶端調用:
stream,_:=userClient.GetUserScoreByServerStream(ctx,&req)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Println(resp.Users)
}
列印出:
[user_id:1 user_score:101 user_id:2 user_score:102 ]
[user_id:3 user_score:103 user_id:4 user_score:104 ]
[user_id:5 user_score:105 ]
3,用戶端流:
用戶端流模式、場景:分批發送請求
場景:
用戶端批量查詢使用者積分
1、用戶端一次性把使用者清單發送過去(不是很多,擷取清單很快)
2、服務端查詢積分比較耗時
。 是以查到一部分 就傳回一部分。
而不是 全部查完再傳回給用戶端
服務端:
修改users.proto
syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse);
}
新增service處理方法
func(*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error{
var score int32=101
users:=make([]*UserInfo,0)
for{
req,err:=stream.Recv()
if err==io.EOF{ //接收完了
return stream.SendAndClose(&UserScoreResponse{Users:users})
}
if err!=nil{
return err
}
for _,user:=range req.Users{
user.UserScore=score //這裡好比是服務端做的業務處理
score++
users=append(users,user)
}
}
}
用戶端:
//用戶端流
func main(){
conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
if err!=nil{
log.Fatal(err)
}
defer conn.Close()
ctx:=context.Background()
userClient:=services.NewUserServiceClient(conn)
var i int32
if err!=nil{
log.Fatal(err)
}
stream,err:=userClient.GetUserScoreByClientStream(ctx)
if err!=nil{
log.Fatal(err)
}
for j:=1;j<=3;j++{
req:=services.UserScoreRequest{}
req.Users=make([]*services.UserInfo,0)
for i=1;i<=5;i++{ //加了5條使用者資訊 假設是一個耗時的過程
req.Users=append(req.Users,&services.UserInfo{UserId:i})
}
err:=stream.Send(&req)
if err!=nil{
log.Println(err)
}
}
res,_:=stream.CloseAndRecv()
fmt.Println(res.Users)
}
go build server.go
go build main.go
[user_id:1 user_score:101 user_id:2 user_score:102 user_id:3 user_score:103 user_id:4 user_score:104 user_id:5 user_score:105 user_id:1 user_score:106 user_id:2 user_score:107 user_id:3 user_score:108 user_id:4 user_score:109 user_id:5 user_score:110 user_id:1 user_score:111 user_id:2 user_score:112 user_id:3 user_score:113 user_id:4 user_score:114 user_id:5 user_score:115 ]
Process finished with exit code 0
用戶端分批發送,服務端一次傳回結果
雙向流模式
場景:
用戶端批量查詢使用者積分
1、用戶端分批把使用者清單發送過去(用戶端擷取清單比較慢)
2、服務端查詢積分也很慢,是以分批發送過去
此時我們可以使用 雙向流模式
服務端:
修改users.proto
rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse);
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiYWan5SO0gTMxATOhZWO1UjMzUTNyYzXzMzMwYTMyEzLcRDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.gif)
syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
repeated UserInfo users=1;
}
message UserScoreResponse{
repeated UserInfo users=1;
}
service UserService{
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse);
rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse);
}
View Code
然後生成
cd pbfiles && protoc --go_out=plugins=grpc:../services Prod.proto
protoc --go_out=plugins=grpc:../services Orders.proto
protoc --go_out=plugins=grpc:../services Users.proto
protoc --go_out=plugins=grpc:../services Models.proto
protoc --grpc-gateway_out=logtostderr=true:../services Prod.proto
protoc --grpc-gateway_out=logtostderr=true:../services Orders.proto
protoc --grpc-gateway_out=logtostderr=true:../services Users.proto
cd ..
處理 UserService.go
//雙向流
func(*UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error {
var score int32=101
users:=make([]*UserInfo,0)
for{
req,err:=stream.Recv()
if err==io.EOF{ //接收完了
return nil
}
if err!=nil{
return err
}
for _,user:=range req.Users{
user.UserScore=score //這裡好比是服務端做的業務處理
score++
users=append(users,user)
}
err=stream.Send(&UserScoreResponse{Users:users})
if err!=nil{
log.Println(err)
}
users=(users)[0:0]
}
}
用戶端:
//雙向流
func main(){
conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
if err!=nil{
log.Fatal(err)
}
defer conn.Close()
ctx:=context.Background()
userClient:=services.NewUserServiceClient(conn)
var i int32
if err!=nil{
log.Fatal(err)
}
stream,err:=userClient.GetUserScoreByTWS(ctx)
if err!=nil{
log.Fatal(err)
}
var uid int32=1
for j:=1;j<=3;j++{
req:=services.UserScoreRequest{}
req.Users=make([]*services.UserInfo,0)
for i=1;i<=5;i++{ //加5條使用者資訊 假設是一個耗時的過程
req.Users=append(req.Users,&services.UserInfo{UserId:uid})
uid++
}
err:=stream.Send(&req)
if err!=nil{
log.Println(err)
}
res,err:=stream.Recv()
if err==io.EOF{
break
}
if err!=nil{
log.Println(err)
}
fmt.Println(res.Users)
}
}
傳回結果:
[user_id:1 user_score:101 user_id:2 user_score:102 user_id:3 user_score:103 user_id:4 user_score:104 user_id:5 user_score:105 ]
[user_id:6 user_score:106 user_id:7 user_score:107 user_id:8 user_score:108 user_id:9 user_score:109 user_id:10 user_score:110 ]
[user_id:11 user_score:111 user_id:12 user_score:112 user_id:13 user_score:113 user_id:14 user_score:114 user_id:15 user_score:115 ]
Process finished with exit code 0
可以看出,當我們生産環境中 用戶端擷取資料耗時并且服務端處理資料耗時,此時運用雙向流模式大大節省任務時間
源碼位址: