天天看点

golang-grpc-demo 测试grpc dome

grpc dome

安装

环境变量添加代理

GO111MODULE=on
    GOPROXY=https://goproxy.io,direct

    # 私有仓库不走代理
    GOPRIVATE=*.example.com
           

安装 protobuf

go get -u -v github.com/golang/protobuf/proto
           

测试是否安装成功

protoc --version
           

安装 protobuf golang 插件

go get -u -v github.com/golang/protobuf/protoc-gen-go
           

安装 grpc

go get -u -v google.golang.org/grpc
           

使用

创建 proto 文件

helloworld.proto

syntax = "proto3";
    // 第一行说明语法版本

    // 生成 golang 的包名
    option go_package = "example/greeter";

    // 服务名
    service Greeter {
        // 接口声明, 类似http 的一来一回短消息
        rpc SayHello (HelloRequest) returns (HelloReply) {}
        // server stream    client端短连接 server端长连接
        rpc LotsOfReplies (HelloRequest) returns (stream HelloReply) {}
        // client stream    server端短连接 client端长连接
        rpc LotsOfGreetings (stream HelloRequest1) returns (HelloReply) {}
        // Bidirectional streaming      client,server全部长连接
        rpc BidiHello(stream HelloRequest1) returns (stream HelloReply) {}
    }
    // 消息结构体 HelloRequest
    message HelloRequest {
        // 类型 名称 标识符(用于标识字段在消息中的二进制格式)
        string name = 1;
    }
    // 消息结构体 HelloRequest1
    message HelloRequest1 {
        string name = 1;
        int32 index = 2;
    }
    // 消息结构体 HelloReply
    message HelloReply {
        string message = 1;
    }
           

another_service.proto

syntax = "proto3";

    option go_package = "example/another";

    service AnotherService {
        rpc AnotherHello(AnotherRequest) returns (AnotherReplay);
    }

    message = AnotherRequest {
        string name = 1;
    }

    message = AnotherReplqy {
        string message = 1;
    }
           

编译 proto 文件

# -I 指定 proto 文件目录
    # --go_out 指定编译后输出的目录
    # plugins 指定依赖的插件
    protoc -I ./*.proto  --go_out=plugins=grpc:./
           

可以查看编译后输出的内容

编写 server 端

package main

    import (
        "context"
        "crypto/tls"
        "fmt"
        "google.golang.org/grpc"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/credentials"
        "google.golang.org/grpc/keepalive"
        "google.golang.org/grpc/metadata"
        "google.golang.org/grpc/reflection"
        "google.golang.org/grpc/status"
        "io"
        "log"
        "net"
        "strings"
        "test/example/another"
        "test/example/greeter"
        "time"
    )

    // 服务的端口号
    const (
        port = ":20200"
    )

    var (
        errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
        errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
    )

    // 创建一个服务结构体
    type Server struct {

    }
    // 实现服务接口 简单的短连接
    func (s *Server) SayHello(ctx context.Context, in *greeter.HelloRequest) (*greeter.HelloReply, error) {
        // 调用 service, 链接数据库, 处理缓存等...
        return &greeter.HelloReply{Message: "Hello " + in.Name}, nil
    }
    // 实现服务接口
    func (s *Server) LotsOfReplies(in *greeter.HelloRequest, stream greeter.Greeter_LotsOfRepliesServer) error {
        // 服务端 长连接发送信息, 发送给客户端大量数据
        for i := 0; i < 10; i++ {
            stream.Send(&greeter.HelloReply{Message: fmt.Sprintf("Hello %s %d", in.Name, i)})
        }
        return nil
    }
    // 实现服务接口
    func (s *Server) LotsOfGreetings(stream greeter.Greeter_LotsOfGreetingsServer) error {
        // 客户端 长连接 发送连续大量数据
        var total int32
        var name string
        for{
            greeting, err := stream.Recv()
            if err == io.EOF {
                return stream.SendAndClose(&greeter.HelloReply{
                    Message: fmt.Sprintf("Hello %s, total %d", name, total),
                })
            }
            if err != nil {
                return err
            }
            name = greeting.Name
            total += greeting.Index
        }
    }
    // 实现服务接口
    func (s *Server)BidiHello(stream greeter.Greeter_BidiHelloServer) error {
        // 客户端服务端分别建立长连接, 发送响应的数据
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
            if err != nil {
                return err
            }
            message := strings.Replace(in.Name, "吗", "", -1)
            message = strings.Replace(message, "?", "!", -1)
            err = stream.Send(&greeter.HelloReply{Message:message})
            if err != nil {
                return err
            }
        }
    }
    // 声明服务结构体
    type AServer struct {
    }
    // 实现服务接口
    func (as *AServer)AnotherHello(ctx context.Context, request *another.AnotherRequest) (*another.AnotherReplay, error) {
        return &another.AnotherReplay{Message: request.Name}, nil
    }

    func main()  {
        // 加载证书
        cert, err := tls.LoadX509KeyPair("./ssl.crt", "./ssl.key")
        if err != nil {
            log.Fatalf("failed to load key pair: %#v \n", err)
        }
        // 监听端口
        lis, err := net.Listen("tcp", port)
        if err != nil {
            log.Fatalf("failed to listen: %#v \n", err)
        }
        // 初始化 grpc 服务设置
        opts := []grpc.ServerOption{
            // 心跳相关设置
            grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
                MinTime:             5 * time.Second,
                PermitWithoutStream: true,
            }),
            // 心跳相关设置
            //grpc.KeepaliveParams(keepalive.ServerParameters{
            //	MaxConnectionIdle:     15 * time.Second,
            //	//MaxConnectionAge:      120 * time.Second,
            //	MaxConnectionAgeGrace: 5 * time.Second,
            //	Time: 10 * time.Second,
            //	Timeout: 5 * time.Second,
            //}),
            // 拦截器, token 检验
            grpc.UnaryInterceptor(ensureValidToken),
            // 加载证书
            grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
        }
        // 创建 rpc 服务
        s := grpc.NewServer(opts...)
        // 注册 greeter Server 服务
        greeter.RegisterGreeterServer(s, &Server{})
        // 初测 another Server 服务
        another.RegisterAnotherServiceServer(s, &AServer{})

        reflection.Register(s)
        // 启动服务
        if err := s.Serve(lis); err != nil {
            log.Fatalf("failed to listen: %#v \n", err)
        }
    }

    // 验证token
    func valid(authorization []string) bool {
        if len(authorization) < 1 {
            return false
        }

        token := strings.TrimPrefix(authorization[0], "Bearer ")
        // 检查数据库 或 redis 缓存
        if token != "bearer-token" {
            return false
        }

        return true
    }

    // grpc 中间件 解析 token
    func ensureValidToken(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, errMissingMetadata
        }
        if !valid(md["authorization"]) {
            return nil, errInvalidToken
        }
        return handler(ctx, req)
    }

           

编写 client 端

package main

    import (
        "context"
        "crypto/tls"
        "fmt"
        "golang.org/x/oauth2"
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials"
        "google.golang.org/grpc/credentials/oauth"
        "google.golang.org/grpc/keepalive"
        "io"
        "log"
        "os"
        "test/example/another"
        "test/example/greeter"
        "time"
    )

    const (

        address = "localhost:20200"
        defaultName = "world"

    )

    func main()  {
        // 获取 token
        perRPC := oauth.NewOauthAccess(fetchToken())
        // 创建 rpc 连接配置
        opts := []grpc.DialOption{
            // 配置长连接 心跳参数
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                //Time:                10 * time.Second,
                //Timeout:             5 * time.Second,
                PermitWithoutStream: true,
            }),
            // 加入 token 设置
            grpc.WithPerRPCCredentials(perRPC),
            // 跳过 证书验证(因为私有)
            grpc.WithTransportCredentials(
                    credentials.NewTLS(&tls.Config{InsecureSkipVerify:true}),
                ),
        }
        // 创建连接
        conn, err := grpc.Dial(address, opts...)
        if err != nil {
            log.Fatalf("did not connect: %#v \n", err)
        }

        defer conn.Close()
        // 生成 AnotherService client
        ac := another.NewAnotherServiceClient(conn)
        // 生成 context
        ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
        defer cancel1()
        // 调用 服务的方法
        ar, err := ac.AnotherHello(ctx1, &another.AnotherRequest{Name: "another..."})
        if err != nil {
            log.Fatalf("could not greet: %#v \n", err)
        }
        // 打印返回
        log.Printf("Greeting: %s \n", ar.Message)

        // 生成 NewGreeterClient client
        c := greeter.NewGreeterClient(conn)

        name := defaultName

        if len(os.Args) > 1 {
            name = os.Args[1]
        }
        // 生成 context
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()

        // normal
        log.Printf("=========== normal =========== \n")
        r, err := c.SayHello(ctx, &greeter.HelloRequest{Name:name})
        if err != nil {
            log.Fatalf("could not greet: %#v \n", err)
        }
        log.Printf("Greeting: %s \n", r.Message)

        // server side stream
        log.Printf("=========== server side stream =========== \n")
        stream, err := c.LotsOfReplies(ctx, &greeter.HelloRequest{Name: name})
        if err != nil {
            log.Fatalf("could not greet: %#v \n", err)
        }
        for {
            replay, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatalf("%#v.LotsOfReplies() = _, %#v \n", c, err)
            }
            log.Printf("Greeting: %s \n", replay.Message)
        }
        stream.CloseSend()

        // client side stream
        log.Printf("=========== client side stream =========== \n")
        streamClientSide, err := c.LotsOfGreetings(ctx)
        if err != nil {
            log.Fatalf("could not greet: %#v \n", err)
        }
        for i := 0; i < 10; i++ {
            if err := streamClientSide.Send(&greeter.HelloRequest1{
                Name:  name,
                Index: int32(i),
            }); err != nil {
                log.Fatalf("send err: %#v \n", err)
            }
        }
        reply, err := streamClientSide.CloseAndRecv()
        if err != nil {
            log.Fatalf("%v.CloseAndRecv() got error %v, want %v", c, err, nil)
        }
        log.Printf("Greeting: %s \n", reply.Message)

        // Bidirectional stream
        start := time.Now().Unix()
        log.Printf("=========== bidirectional stream =========== \n")
        ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 60 * time.Minute)
        defer cancelTimeout()
        streamBidi, err := c.BidiHello(ctxTimeout)
        if err != nil {
            log.Fatalf("%v.BidiHello() got error %v, want %v", c, err, nil)
        }
        watic := make(chan struct{})
        // 读取
        go func() {
            for {
                in, err := streamBidi.Recv()
                if err == io.EOF {
                    close(watic)
                    return
                }
                if err != nil {
                    end := time.Now().Unix()
                    fmt.Printf("time: %d \n", end - start)
                    log.Fatalf("Failed to receive a note: %#v", err.Error())
                }
                fmt.Printf("AI: %#s \n", in.Message)
            }
        }()
        // 发送
        for {
            request := &greeter.HelloRequest1{}
            fmt.Scanln(&request.Name)
            if request.Name == "quit" {
                break
            }
            streamBidi.Trailer()
            if err := streamBidi.Send(request); err != nil {
                log.Fatalf("Failed to send a req: %#v", err)
            }
        }

        streamBidi.CloseSend()
        <- watic
    }
    // 获取token
    func fetchToken() *oauth2.Token {
        // 调用 api 接口
        return &oauth2.Token{
            AccessToken:  "bearer-token",
        }
    }

           

测试

go run server.go
           
go run client.go
           

查看输出结果

参考

https://blog.didiyun.com/index.php/2018/12/12/grpc-golang-1/

protobuf 关于标识符