grpc dome
安装
环境变量添加代理
GO111MODULE=on
GOPROXY=https://goproxy.io,direct
# 私有仓库不走代理
GOPRIVATE=*.example.com
安装 protobuf
go get -u -v github.com/golang/protobuf/proto
测试是否安装成功
protoc --version
安装 protobuf golang 插件
go get -u -v github.com/golang/protobuf/protoc-gen-go
安装 grpc
go get -u -v google.golang.org/grpc
使用
创建 proto 文件
helloworld.proto
syntax = "proto3";
// 第一行说明语法版本
// 生成 golang 的包名
option go_package = "example/greeter";
// 服务名
service Greeter {
// 接口声明, 类似http 的一来一回短消息
rpc SayHello (HelloRequest) returns (HelloReply) {}
// server stream client端短连接 server端长连接
rpc LotsOfReplies (HelloRequest) returns (stream HelloReply) {}
// client stream server端短连接 client端长连接
rpc LotsOfGreetings (stream HelloRequest1) returns (HelloReply) {}
// Bidirectional streaming client,server全部长连接
rpc BidiHello(stream HelloRequest1) returns (stream HelloReply) {}
}
// 消息结构体 HelloRequest
message HelloRequest {
// 类型 名称 标识符(用于标识字段在消息中的二进制格式)
string name = 1;
}
// 消息结构体 HelloRequest1
message HelloRequest1 {
string name = 1;
int32 index = 2;
}
// 消息结构体 HelloReply
message HelloReply {
string message = 1;
}
another_service.proto
syntax = "proto3";
option go_package = "example/another";
service AnotherService {
rpc AnotherHello(AnotherRequest) returns (AnotherReplay);
}
message = AnotherRequest {
string name = 1;
}
message = AnotherReplqy {
string message = 1;
}
编译 proto 文件
# -I 指定 proto 文件目录
# --go_out 指定编译后输出的目录
# plugins 指定依赖的插件
protoc -I ./*.proto --go_out=plugins=grpc:./
可以查看编译后输出的内容
编写 server 端
package main
import (
"context"
"crypto/tls"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"io"
"log"
"net"
"strings"
"test/example/another"
"test/example/greeter"
"time"
)
// 服务的端口号
const (
port = ":20200"
)
var (
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
)
// 创建一个服务结构体
type Server struct {
}
// 实现服务接口 简单的短连接
func (s *Server) SayHello(ctx context.Context, in *greeter.HelloRequest) (*greeter.HelloReply, error) {
// 调用 service, 链接数据库, 处理缓存等...
return &greeter.HelloReply{Message: "Hello " + in.Name}, nil
}
// 实现服务接口
func (s *Server) LotsOfReplies(in *greeter.HelloRequest, stream greeter.Greeter_LotsOfRepliesServer) error {
// 服务端 长连接发送信息, 发送给客户端大量数据
for i := 0; i < 10; i++ {
stream.Send(&greeter.HelloReply{Message: fmt.Sprintf("Hello %s %d", in.Name, i)})
}
return nil
}
// 实现服务接口
func (s *Server) LotsOfGreetings(stream greeter.Greeter_LotsOfGreetingsServer) error {
// 客户端 长连接 发送连续大量数据
var total int32
var name string
for{
greeting, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&greeter.HelloReply{
Message: fmt.Sprintf("Hello %s, total %d", name, total),
})
}
if err != nil {
return err
}
name = greeting.Name
total += greeting.Index
}
}
// 实现服务接口
func (s *Server)BidiHello(stream greeter.Greeter_BidiHelloServer) error {
// 客户端服务端分别建立长连接, 发送响应的数据
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
message := strings.Replace(in.Name, "吗", "", -1)
message = strings.Replace(message, "?", "!", -1)
err = stream.Send(&greeter.HelloReply{Message:message})
if err != nil {
return err
}
}
}
// 声明服务结构体
type AServer struct {
}
// 实现服务接口
func (as *AServer)AnotherHello(ctx context.Context, request *another.AnotherRequest) (*another.AnotherReplay, error) {
return &another.AnotherReplay{Message: request.Name}, nil
}
func main() {
// 加载证书
cert, err := tls.LoadX509KeyPair("./ssl.crt", "./ssl.key")
if err != nil {
log.Fatalf("failed to load key pair: %#v \n", err)
}
// 监听端口
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %#v \n", err)
}
// 初始化 grpc 服务设置
opts := []grpc.ServerOption{
// 心跳相关设置
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}),
// 心跳相关设置
//grpc.KeepaliveParams(keepalive.ServerParameters{
// MaxConnectionIdle: 15 * time.Second,
// //MaxConnectionAge: 120 * time.Second,
// MaxConnectionAgeGrace: 5 * time.Second,
// Time: 10 * time.Second,
// Timeout: 5 * time.Second,
//}),
// 拦截器, token 检验
grpc.UnaryInterceptor(ensureValidToken),
// 加载证书
grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
}
// 创建 rpc 服务
s := grpc.NewServer(opts...)
// 注册 greeter Server 服务
greeter.RegisterGreeterServer(s, &Server{})
// 初测 another Server 服务
another.RegisterAnotherServiceServer(s, &AServer{})
reflection.Register(s)
// 启动服务
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to listen: %#v \n", err)
}
}
// 验证token
func valid(authorization []string) bool {
if len(authorization) < 1 {
return false
}
token := strings.TrimPrefix(authorization[0], "Bearer ")
// 检查数据库 或 redis 缓存
if token != "bearer-token" {
return false
}
return true
}
// grpc 中间件 解析 token
func ensureValidToken(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}
if !valid(md["authorization"]) {
return nil, errInvalidToken
}
return handler(ctx, req)
}
编写 client 端
package main
import (
"context"
"crypto/tls"
"fmt"
"golang.org/x/oauth2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/keepalive"
"io"
"log"
"os"
"test/example/another"
"test/example/greeter"
"time"
)
const (
address = "localhost:20200"
defaultName = "world"
)
func main() {
// 获取 token
perRPC := oauth.NewOauthAccess(fetchToken())
// 创建 rpc 连接配置
opts := []grpc.DialOption{
// 配置长连接 心跳参数
grpc.WithKeepaliveParams(keepalive.ClientParameters{
//Time: 10 * time.Second,
//Timeout: 5 * time.Second,
PermitWithoutStream: true,
}),
// 加入 token 设置
grpc.WithPerRPCCredentials(perRPC),
// 跳过 证书验证(因为私有)
grpc.WithTransportCredentials(
credentials.NewTLS(&tls.Config{InsecureSkipVerify:true}),
),
}
// 创建连接
conn, err := grpc.Dial(address, opts...)
if err != nil {
log.Fatalf("did not connect: %#v \n", err)
}
defer conn.Close()
// 生成 AnotherService client
ac := another.NewAnotherServiceClient(conn)
// 生成 context
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
defer cancel1()
// 调用 服务的方法
ar, err := ac.AnotherHello(ctx1, &another.AnotherRequest{Name: "another..."})
if err != nil {
log.Fatalf("could not greet: %#v \n", err)
}
// 打印返回
log.Printf("Greeting: %s \n", ar.Message)
// 生成 NewGreeterClient client
c := greeter.NewGreeterClient(conn)
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
// 生成 context
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// normal
log.Printf("=========== normal =========== \n")
r, err := c.SayHello(ctx, &greeter.HelloRequest{Name:name})
if err != nil {
log.Fatalf("could not greet: %#v \n", err)
}
log.Printf("Greeting: %s \n", r.Message)
// server side stream
log.Printf("=========== server side stream =========== \n")
stream, err := c.LotsOfReplies(ctx, &greeter.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %#v \n", err)
}
for {
replay, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%#v.LotsOfReplies() = _, %#v \n", c, err)
}
log.Printf("Greeting: %s \n", replay.Message)
}
stream.CloseSend()
// client side stream
log.Printf("=========== client side stream =========== \n")
streamClientSide, err := c.LotsOfGreetings(ctx)
if err != nil {
log.Fatalf("could not greet: %#v \n", err)
}
for i := 0; i < 10; i++ {
if err := streamClientSide.Send(&greeter.HelloRequest1{
Name: name,
Index: int32(i),
}); err != nil {
log.Fatalf("send err: %#v \n", err)
}
}
reply, err := streamClientSide.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", c, err, nil)
}
log.Printf("Greeting: %s \n", reply.Message)
// Bidirectional stream
start := time.Now().Unix()
log.Printf("=========== bidirectional stream =========== \n")
ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 60 * time.Minute)
defer cancelTimeout()
streamBidi, err := c.BidiHello(ctxTimeout)
if err != nil {
log.Fatalf("%v.BidiHello() got error %v, want %v", c, err, nil)
}
watic := make(chan struct{})
// 读取
go func() {
for {
in, err := streamBidi.Recv()
if err == io.EOF {
close(watic)
return
}
if err != nil {
end := time.Now().Unix()
fmt.Printf("time: %d \n", end - start)
log.Fatalf("Failed to receive a note: %#v", err.Error())
}
fmt.Printf("AI: %#s \n", in.Message)
}
}()
// 发送
for {
request := &greeter.HelloRequest1{}
fmt.Scanln(&request.Name)
if request.Name == "quit" {
break
}
streamBidi.Trailer()
if err := streamBidi.Send(request); err != nil {
log.Fatalf("Failed to send a req: %#v", err)
}
}
streamBidi.CloseSend()
<- watic
}
// 获取token
func fetchToken() *oauth2.Token {
// 调用 api 接口
return &oauth2.Token{
AccessToken: "bearer-token",
}
}
测试
go run server.go
go run client.go
查看输出结果
参考
https://blog.didiyun.com/index.php/2018/12/12/grpc-golang-1/
protobuf 关于标识符