天天看點

使用go語言開發一個簡單的rpc

作者:幹飯人小羽
使用go語言開發一個簡單的rpc

1、什麼是grpc

在 gRPC 中,用戶端應用程式可以直接調用不同機器上的伺服器應用程式上的方法,就像它是本地對象一樣,使您更容易建立分布式應用程式和服務。 與許多 RPC 系統一樣,gRPC 基于定義服務的思想,指定可以遠端調用的方法及其參數和傳回類型。 在服務端,服務端實作這個接口并運作一個 gRPC 伺服器來處理用戶端調用。 在用戶端,用戶端有一個stub(在某些語言中僅稱為用戶端),它提供與伺服器相同的方法。

使用go語言開發一個簡單的rpc

是以grpc是跨語言的。

2、什麼是Protocol Buffers

Protocol Buffers提供了一種語言中立、平台中立、可擴充的機制,用于以向前相容和向後相容的方式序列化結構化資料。 它類似于 JSON,隻是它更小更快,并且生成本地語言綁定。

可以通過 .proto定義資料結構,然後就可以使用Protocol Buffers編譯器 protoc 從. proto 定義中生成我們喜歡的語言的資料通路類。 它們為每個字段提供簡單的通路器,如 name() 和 set_name(),以及将整個結構序列化/解析到原始位元組/從原始位元組中提取的方法。

3、grpc服務端

1、首先我們需要下載下傳go對應的protoc插件

go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]
           

然後把GOPATH放到PATH

export PATH="$PATH:$(go env GOPATH)/bin"
           

接着列印下看看有沒有進去

echo $PATH
/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:$GOPATH/bin:/usr/local/go/bin
           

接着開新視窗,執行下面指令看下protoc是否安裝成功

protoc --version
libprotoc 3.19.1
           

2、接着我們建立一個hello.proto

内容如下(不懂結構的可自行百度)

syntax = "proto3";

package helloservice;

option go_package = ".;helloservice"; // 指定包名

message String {
  string value = 1;
}

service HelloService {
  rpc Hello(String) returns (String); // 一進制方法
  rpc Channel (stream String) returns (stream String); // 流式方法
}
           

目錄結構如下

.
├── go.mod
├── go.sum
├── helloclient
│   └── main.go
├── helloservice
│   ├── hello.proto
           

3、接着指令行生成對應語言的類代碼

cd helloservice
 protoc --go_out=./ --go-grpc_out=./ hello.proto
           

我們可以看下現在的目錄結構(其他檔案目錄可忽略,後面會建立,現在隻需要關注hello_grpc.pb.go)

.
├── go.mod
├── go.sum
├── helloclient
│   └── main.go
├── helloservice
│   ├── hello.pb.go
│   ├── hello.proto
│   ├── hello_grpc.pb.go
│   ├── hello_service.go
│   └── main
│       └── main.go
           

4、實作自己的hello service

在上面生成的hello_grpc.pb.go中我們可以看到這樣的接口

// HelloServiceServer is the server API for HelloService service.
// All implementations must embed UnimplementedHelloServiceServer
// for forward compatibility
type HelloServiceServer interface {
	Hello(context.Context, *String) (*String, error)
	Channel(HelloService_ChannelServer) error
	mustEmbedUnimplementedHelloServiceServer()
}
           

翻譯一下就是

// HelloServiceServer 是 HelloService 服務的服務端 API。

// 所有實作都必須嵌入 UnimplementedHelloServiceServer

// 為了向前相容

是以我們在helloservice中建立一個hello_service.go檔案,用來實作上面的接口

package helloservice

import (
	"context"
	"io"
	"time"
)

type HelloService struct {
}

func (h HelloService) mustEmbedUnimplementedHelloServiceServer() {
	panic("implement me")
}

func (h HelloService) Hello(ctx context.Context, args *String) (*String, error) {
	time.Sleep(time.Second)
	reply := &String{Value: "hello:" + args.GetValue()}
	return reply, nil
}

func (h HelloService) Channel(stream HelloService_ChannelServer) error {
	for {
		recv, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				return nil
			}
			return err
		}

		reply := &String{Value: "hello:" + recv.Value}
		err = stream.Send(reply)
		if err != nil {
			return err
		}
	}
}
           

上面的方法和簡單,就是列印我們自定義的字元串。

然後我們在main中編寫下服務啟動的代碼

package main

import (
	"google.golang.org/grpc"
	"grpcdemo/helloservice"
	"log"
	"net"
)

func main() {
  // NewServer 建立一個 gRPC 伺服器,它沒有注冊服務,也沒有開始接受請求。
	grpcServer := grpc.NewServer()
  // 注冊服務
	helloservice.RegisterHelloServiceServer(grpcServer, new(helloservice.HelloService))

  // 開啟一個tcp監聽
	listen, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal(err)
	}
	log.Println("server started...")
  // 在監聽器 listen 上接受傳入的連接配接,建立一個新的ServerTransport 和 service goroutine。 服務 goroutine讀取 gRPC 請求,然後調用注冊的處理程式來回複它們。
	log.Fatal(grpcServer.Serve(listen))
}
           

然後我們啟動下看下效果

go run helloservice/main/main.go
2022/10/13 23:07:46 server started...
           

4、grpc用戶端

接着我們編寫用戶端的代碼helloclient/main.go

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"grpcdemo/helloservice"
	"io"
	"log"
	"time"
)

func main() {
	// 連接配接grpc服務端
	conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

  // 一進制rpc
	unaryRpc(conn)
  // 流式rpc
	streamRpc(conn)

}


func unaryRpc(conn *grpc.ClientConn) {
  // 建立grpc用戶端
	client := helloservice.NewHelloServiceClient(conn)
  // 發送請求
	reply, err := client.Hello(context.Background(), &helloservice.String{Value: "hello"})
	if err != nil {
		log.Fatal(err)
	}
	log.Println("unaryRpc recv: ", reply.Value)
}

func streamRpc(conn *grpc.ClientConn) {
  // 建立grpc用戶端
	client := helloservice.NewHelloServiceClient(conn)
  // 生成ClientStream
	stream, err := client.Channel(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		for {
      // 發送消息
			if err := stream.Send(&helloservice.String{Value: "hi"}); err != nil {
				log.Fatal(err)
			}
			time.Sleep(time.Second)
		}
	}()

	for {
    // 接收消息
		recv, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}
			log.Fatal(err)
		}

		fmt.Println("streamRpc recv: ", recv.Value)

	}
}           

繼續閱讀