天天看點

grpc之 普通流 、服務端流、 用戶端流 、雙向流模式

流模式入門(上)、場景:批量查詢使用者積分

為何要用流模式

前面的例子,我們僅僅是傳輸比較小的資料 基本模式是用戶端請求----服務端響應

如果是傳輸較大資料呢?會帶來

1、資料包過大導緻壓力陡增

2、需要等待用戶端包全部發送,才能處理以及響應

1,普通查詢積分方式

服務端:

grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
syntax="proto3";
package services;

import "google/protobuf/timestamp.proto";

message ProdModel{ //商品模型
    int32 prod_id=1;
    string prod_name=2;
    float prod_price=3;
}


message OrderMain{ //主訂單模型
      int32 order_id=1;//訂單ID,數字自增
      string order_no=2; //訂單号
      int32 user_id=3; //購買者ID
      float order_money=4;//商品金額
      google.protobuf.Timestamp order_time=5; //下單時間
      repeated OrderDetail order_details=6;
}

//子訂單模型
message OrderDetail{
    int32 detail_id=1;
    string order_no=2;
    int32 prod_id=3;
    float prod_price=4;
    int32 prod_num=5;
}

//使用者模型
message UserInfo{
    int32 user_id=1;
    int32 user_score=2;
}      

Models.proto

grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
    repeated UserInfo users=1;
}
message UserScoreResponse{
    repeated UserInfo users=1;
}
service UserService{
    rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
}      

Users.proto

執行腳本 生成pd.go檔案

cd pbfiles && protoc --go_out=plugins=grpc:../services  Prod.proto
 protoc --go_out=plugins=grpc:../services   Orders.proto
protoc --go_out=plugins=grpc:../services   Users.proto

protoc --go_out=plugins=grpc:../services --validate_out=lang=go:../services   Models.proto

protoc  --grpc-gateway_out=logtostderr=true:../services Prod.proto
protoc  --grpc-gateway_out=logtostderr=true:../services Orders.proto
protoc  --grpc-gateway_out=logtostderr=true:../services Users.proto

cd ..      
grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
package services

import "context"

type UserService struct {
}
func(*UserService) GetUserScore(ctx context.Context, in *UserScoreRequest)  (*UserScoreResponse, error){
    var score int32=101
    users:=make([]*UserInfo,0)
    for _,user:=range in.Users{
        user.UserScore=score
        score++
        users=append(users,user)
    }
    return &UserScoreResponse{Users:users},nil

}      

UserService.go

grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
package main

import (
"google.golang.org/grpc"
"grpcpro/services"
"net"
)

func main()  {
    rpcServer:=grpc.NewServer()
    services.RegisterProdServiceServer(rpcServer,new(services.ProdService))//商品服務
    services.RegisterOrderSerivceServer(rpcServer,new(services.OrdersService))//訂單服務
    services.RegisterUserServiceServer(rpcServer,new(services.UserService))

    lis,_:=net.Listen("tcp",":8081")

    rpcServer.Serve(lis)


}      

server.go

go build server.go

用戶端:

拷貝服務端生成的pd.go檔案到用戶端

grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
func main(){
    conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
    if err!=nil{
        log.Fatal(err)
    }
    defer conn.Close()
    ctx:=context.Background()
    userClient:=services.NewUserServiceClient(conn)
    var i int32
    req:=services.UserScoreRequest{}
    req.Users=make([]*services.UserInfo,0)

    for i=1;i<20;i++{
        req.Users=append(req.Users,&services.UserInfo{UserId:i})
    }
    res,_ := userClient.GetUserScore(ctx,&req)
    fmt.Println(res.Users)

}      

go build maiin.go

列印結果:

[user_id:1 user_score:101  user_id:2 user_score:102  user_id:3 user_score:103  user_id:4 user_score:104  user_id:5 user_score:105  user_id:6 user_score:106  user_id:7 user_score:107  user_id:8 user_score:108  user_id:9 user_score:109  user_id:10 user_score:110  user_id:11 user_score:111  user_id:12 user_score:112  user_id:13 user_score:113  user_id:14 user_score:114  user_id:15 user_score:115  user_id:16 user_score:116  user_id:17 user_score:117  user_id:18 user_score:118  user_id:19 user_score:119 ]

Process finished with exit code 0      

2,服務端流

 假設 用戶端一次性發送6個客戶資料給服務端

再假設 服務端查詢使用者積分 有點慢。是以 采用的政策是 服務端每查詢2個就發送給用戶端

服務端:

修改users.proto

syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
    repeated UserInfo users=1;
}
message UserScoreResponse{
    repeated UserInfo users=1;
}
service UserService{
    rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
}      

處理方法:

func(*UserService)     GetUserScoreByServerStream(in *UserScoreRequest,stream UserService_GetUserScoreByServerStreamServer) error {
    var score int32=101
    users:=make([]*UserInfo,0)
    for index,user:=range in.Users{
        user.UserScore=score
        score++
        users=append(users,user)

        if (index+1) % 2==0 && index>0{
            err:=stream.Send(&UserScoreResponse{Users:users})
            if err!=nil{
                return err
            }
            users=(users)[0:0]
        }
        time.Sleep(time.Second*1)
    }
    if len(users)>0{
        err:=stream.Send(&UserScoreResponse{Users:users})
        if err!=nil{
            return err
        }
    }
    return nil
}      

用戶端調用:

stream,_:=userClient.GetUserScoreByServerStream(ctx,&req)
        for {
            resp, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatal(err)
            }

            fmt.Println(resp.Users)
        }      

列印出:

[user_id:1 user_score:101  user_id:2 user_score:102 ]
[user_id:3 user_score:103  user_id:4 user_score:104 ]
[user_id:5 user_score:105 ]      

3,用戶端流:

用戶端流模式、場景:分批發送請求

grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
場景:
   用戶端批量查詢使用者積分
1、用戶端一次性把使用者清單發送過去(不是很多,擷取清單很快)
2、服務端查詢積分比較耗時
。 是以查到一部分 就傳回一部分。

   而不是 全部查完再傳回給用戶端      

服務端:

修改users.proto

syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
    repeated UserInfo users=1;
}
message UserScoreResponse{
    repeated UserInfo users=1;
}
service UserService{
    rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
    rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse);
}      

新增service處理方法

func(*UserService)  GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error{
    var score int32=101
    users:=make([]*UserInfo,0)
    for{
        req,err:=stream.Recv()
        if err==io.EOF{ //接收完了
            return stream.SendAndClose(&UserScoreResponse{Users:users})
        }
        if err!=nil{
            return err
        }
        for _,user:=range req.Users{
            user.UserScore=score  //這裡好比是服務端做的業務處理
            score++
            users=append(users,user)
        }
    }
}      

用戶端:

//用戶端流
func main(){
    conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
    if err!=nil{
        log.Fatal(err)
    }
    defer conn.Close()

    ctx:=context.Background()
    userClient:=services.NewUserServiceClient(conn)
    var i int32
    if err!=nil{
        log.Fatal(err)
    }
    stream,err:=userClient.GetUserScoreByClientStream(ctx)
    if err!=nil{
        log.Fatal(err)
    }

    for j:=1;j<=3;j++{
        req:=services.UserScoreRequest{}
        req.Users=make([]*services.UserInfo,0)
        for i=1;i<=5;i++{  //加了5條使用者資訊  假設是一個耗時的過程
            req.Users=append(req.Users,&services.UserInfo{UserId:i})
        }
        err:=stream.Send(&req)
        if err!=nil{
            log.Println(err)
        }
    }
    res,_:=stream.CloseAndRecv()
    fmt.Println(res.Users)
}      

go build server.go

go build main.go

[user_id:1 user_score:101  user_id:2 user_score:102  user_id:3 user_score:103  user_id:4 user_score:104  user_id:5 user_score:105  user_id:1 user_score:106  user_id:2 user_score:107  user_id:3 user_score:108  user_id:4 user_score:109  user_id:5 user_score:110  user_id:1 user_score:111  user_id:2 user_score:112  user_id:3 user_score:113  user_id:4 user_score:114  user_id:5 user_score:115 ]

Process finished with exit code 0      

用戶端分批發送,服務端一次傳回結果

雙向流模式

grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
場景:
   用戶端批量查詢使用者積分
1、用戶端分批把使用者清單發送過去(用戶端擷取清單比較慢)
2、服務端查詢積分也很慢,是以分批發送過去

此時我們可以使用 雙向流模式      

服務端:

修改users.proto

rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse);      
grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
grpc之 普通流 、服務端流、 用戶端流 、雙向流模式
syntax="proto3";
package services;
import "Models.proto";
message UserScoreRequest{
    repeated UserInfo users=1;
}
message UserScoreResponse{
    repeated UserInfo users=1;
}
service UserService{
    rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse);
    rpc GetUserScoreByClientStream(stream UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByTWS(stream UserScoreRequest) returns (stream UserScoreResponse);
}      

View Code

然後生成

cd pbfiles && protoc --go_out=plugins=grpc:../services  Prod.proto
 protoc --go_out=plugins=grpc:../services   Orders.proto
protoc --go_out=plugins=grpc:../services   Users.proto

protoc --go_out=plugins=grpc:../services    Models.proto

protoc  --grpc-gateway_out=logtostderr=true:../services Prod.proto
protoc  --grpc-gateway_out=logtostderr=true:../services Orders.proto
protoc  --grpc-gateway_out=logtostderr=true:../services Users.proto

cd ..      

處理 UserService.go

//雙向流
func(*UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error  {
    var score int32=101
    users:=make([]*UserInfo,0)
    for{
        req,err:=stream.Recv()
        if err==io.EOF{ //接收完了
            return nil
        }
        if err!=nil{
            return err
        }
        for _,user:=range req.Users{
            user.UserScore=score  //這裡好比是服務端做的業務處理
            score++
            users=append(users,user)
        }
        err=stream.Send(&UserScoreResponse{Users:users})
        if err!=nil{
            log.Println(err)
        }
        users=(users)[0:0]
    }
}      

用戶端:

//雙向流
func main(){
    conn,err:=grpc.Dial(":8081",grpc.WithInsecure())
    if err!=nil{
        log.Fatal(err)
    }
    defer conn.Close()

    ctx:=context.Background()
    userClient:=services.NewUserServiceClient(conn)
    var i int32
    if err!=nil{
        log.Fatal(err)
    }
    stream,err:=userClient.GetUserScoreByTWS(ctx)
    if err!=nil{
        log.Fatal(err)
    }

    var uid int32=1
    for j:=1;j<=3;j++{
        req:=services.UserScoreRequest{}
        req.Users=make([]*services.UserInfo,0)
        for i=1;i<=5;i++{  //加5條使用者資訊  假設是一個耗時的過程
            req.Users=append(req.Users,&services.UserInfo{UserId:uid})
            uid++
        }
        err:=stream.Send(&req)
        if err!=nil{
            log.Println(err)
        }
        res,err:=stream.Recv()
        if err==io.EOF{
            break
        }
        if err!=nil{
            log.Println(err)
        }
        fmt.Println(res.Users)

    }
}      

傳回結果:

[user_id:1 user_score:101  user_id:2 user_score:102  user_id:3 user_score:103  user_id:4 user_score:104  user_id:5 user_score:105 ]
[user_id:6 user_score:106  user_id:7 user_score:107  user_id:8 user_score:108  user_id:9 user_score:109  user_id:10 user_score:110 ]
[user_id:11 user_score:111  user_id:12 user_score:112  user_id:13 user_score:113  user_id:14 user_score:114  user_id:15 user_score:115 ]

Process finished with exit code 0      

 可以看出,當我們生産環境中 用戶端擷取資料耗時并且服務端處理資料耗時,此時運用雙向流模式大大節省任務時間

 源碼位址: