天天看點

基于記憶體通信的gRPC調用Golang/gRPC對網絡的抽象什麼是pipe如何用pipe作為gRPC的connection總結

Apache Dubbo 有injvm方式的通信,能夠避免網絡帶來的延遲,同時也不占用本地端口,對測試、本地驗證而言,是一種比較友善的RPC通信方式。

最近看到 containerd 的代碼,發現它也有類似的需求,那麼就考察了下gRPC有沒有類似的,基于記憶體的通信方式。發現pipe非常好用,是以記錄了下。

Golang/gRPC對網絡的抽象

首先,我們先看一下gRPC一次調用的架構圖。當然,這個架構圖目前隻關注了網絡抽象分布。

基于記憶體通信的gRPC調用Golang/gRPC對網絡的抽象什麼是pipe如何用pipe作為gRPC的connection總結

我們重點關注網絡部分。

作業系統系統抽象

首先,在網絡包之上,系統抽象出來了

socket

,代表一條虛拟連接配接,對于UDP,這個虛拟連接配接是不可靠的,對于TCP,這個連結是盡力可靠的。

對于網絡程式設計而言,僅僅有連接配接是不夠的,還需要告訴開發者如何建立、關閉連接配接。

對于服務端,系統提供了

accept

方法

,用來接收連接配接。

對于用戶端,系統提供了

connect

,用于和服務端建立連接配接。

Golang抽象

在Golang中,socket對等的概念叫

net.Conn

,代表了一條虛拟連接配接。

接下來,對于服務端,accept這個行為被包裝成了

net.Listener

接口

;對于用戶端,Golang則基于connect提供了

net.Dial

type Listener interface {
  // 接收來自用戶端的網絡連接配接
  Accept() (Conn, error)
  Close() error
  Addr() Addr
}           

gRPC使用

那麼gRPC是怎麼使用Listener和Dial的呢?

對于gRPC服務端,

Serve

接收一個Listener,表示在這個Listener上提供服務。

對于gRPC用戶端,網絡本質上就是一個能夠連接配接到某個地方的東西就可以,是以隻需要一個

dialer func(context.Context, string) (net.Conn, error)

函數就行了。

什麼是pipe

在作業系統層面,

pipe

表示一個資料管道,而這個管道兩端都在本程式中,可以很好的滿足我們的要求:基于記憶體的網絡通信。

Golang也基于pipe提供了

net.Pipe()

函數

建立了一個雙向的、基于記憶體通信的管道,在能力上,能夠很好的滿足gRPC對底層通信的要求。

但是

net.Pipe

僅僅産生了兩個

net.Conn

,即隻産生兩個網絡連接配接,沒有之前提到的Listner,也沒有Dial方法。

于是結合Golang的channel,把

net.Pipe

包裝成了Listner,也提供了Dial方法:

  1. Listener.Accept()

    ,隻需要監聽一個channel,用戶端連接配接過來的時候,把連接配接通過channel傳遞過來即可
  2. Dial方法

    ,調用Pipe,将一端通過channel給服務端(作為服務端連接配接),另一端作為用戶端連接配接

代碼如下:

package main

import (
  "context"
  "errors"
  "net"
  "sync"
  "sync/atomic"
)

var ErrPipeListenerClosed = errors.New(`pipe listener already closed`)

type PipeListener struct {
  ch    chan net.Conn
  close chan struct{}
  done  uint32
  m     sync.Mutex
}

func ListenPipe() *PipeListener {
  return &PipeListener{
    ch:    make(chan net.Conn),
    close: make(chan struct{}),
  }
}

// Accept 等待用戶端連接配接
func (l *PipeListener) Accept() (c net.Conn, e error) {
  select {
  case c = <-l.ch:
  case <-l.close:
    e = ErrPipeListenerClosed
  }
  return
}

// Close 關閉 listener.
func (l *PipeListener) Close() (e error) {
  if atomic.LoadUint32(&l.done) == 0 {
    l.m.Lock()
    defer l.m.Unlock()
    if l.done == 0 {
      defer atomic.StoreUint32(&l.done, 1)
      close(l.close)
      return
    }
  }
  e = ErrPipeListenerClosed
  return
}

// Addr 傳回 listener 的位址
func (l *PipeListener) Addr() net.Addr {
  return pipeAddr(0)
}
func (l *PipeListener) Dial(network, addr string) (net.Conn, error) {
  return l.DialContext(context.Background(), network, addr)
}
func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) {
  // PipeListener是否已經關閉
  if atomic.LoadUint32(&l.done) != 0 {
    e = ErrPipeListenerClosed
    return
  }

  // 建立pipe
  c0, c1 := net.Pipe()
  // 等待連接配接傳遞到服務端接收
  select {
  case <-ctx.Done():
    e = ctx.Err()
  case l.ch <- c0:
    conn = c1
  case <-l.close:
    c0.Close()
    c1.Close()
    e = ErrPipeListenerClosed
  }
  return
}

type pipeAddr int

func (pipeAddr) Network() string {
  return `pipe`
}
func (pipeAddr) String() string {
  return `pipe`
}           

如何用pipe作為gRPC的connection

有了上面的包裝,我們就可以基于此建立一個gRPC的伺服器端和用戶端,來進行基于記憶體的RPC通信了。

首先,我們簡單的建立一個服務,包含了四種調用方式:

syntax = "proto3";

option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
  // unary調用
  rpc SayHello(HelloRequest) returns (HelloReply) {}

  // 服務端流式調用
  rpc SayHelloReplyStream(HelloRequest) returns (stream HelloReply);

  // 用戶端流式調用
  rpc SayHelloRequestStream(stream HelloRequest) returns (HelloReply);

  // 雙向流式調用
  rpc SayHelloBiStream(stream HelloRequest) returns (stream HelloReply);
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}           

然後生成相關的stub代碼:

protoc --go_out=. --go_opt=paths=source_relative \
  --go-grpc_out=. --go-grpc_opt=paths=source_relative \
  helloworld/helloworld.proto           

然後開始寫服務端代碼,基本邏輯就是實作一個demo版本的服務端就好:

package main

import (
  "context"
  "log"

  "github.com/robberphex/grpc-in-memory/helloworld"
  pb "github.com/robberphex/grpc-in-memory/helloworld"
)

// helloworld.GreeterServer 的實作
type server struct {
  // 為了後面代碼相容,必須聚合UnimplementedGreeterServer
  // 這樣以後在proto檔案中新增加一個方法的時候,這段代碼至少不會報錯
  pb.UnimplementedGreeterServer
}

// unary調用的服務端代碼
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  log.Printf("Received: %v", in.GetName())
  return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

// 用戶端流式調用的服務端代碼
// 接收兩個req,然後傳回一個resp
func (s *server) SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error {
  req, err := streamServer.Recv()
  if err != nil {
    log.Printf("error receiving: %v", err)
    return err
  }
  log.Printf("Received: %v", req.GetName())
  req, err = streamServer.Recv()
  if err != nil {
    log.Printf("error receiving: %v", err)
    return err
  }
  log.Printf("Received: %v", req.GetName())
  streamServer.SendAndClose(&pb.HelloReply{Message: "Hello " + req.GetName()})
  return nil
}

// 服務端流式調用的服務端代碼
// 接收一個req,然後發送兩個resp
func (s *server) SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error {
  log.Printf("Received: %v", req.GetName())
  err := streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
  if err != nil {
    log.Printf("error Send: %+v", err)
    return err
  }
  err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName() + "_dup"})
  if err != nil {
    log.Printf("error Send: %+v", err)
    return err
  }
  return nil
}

// 雙向流式調用的服務端代碼
func (s *server) SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error {
  req, err := streamServer.Recv()
  if err != nil {
    log.Printf("error receiving: %+v", err)
    // 及時将錯誤傳回給用戶端,下同
    return err
  }
  log.Printf("Received: %v", req.GetName())
  err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
  if err != nil {
    log.Printf("error Send: %+v", err)
    return err
  }
  // 離開這個函數後,streamServer會關閉,是以不推薦在單獨的goroute發送消息
  return nil
}

// 建立一個服務端實作
func NewServerImpl() *server {
  return &server{}
}           

然後我們建立一個基于pipe連接配接的用戶端來調用服務端。

包含如下幾個步驟:

  1. 建立服務端實作
  2. 基于pipe建立listener,然後基于它建立gRPC server
  3. 基于pipe建立用戶端連接配接,然後建立gRPC client,調用服務
package main

import (
  "context"
  "fmt"
  "log"
  "net"

  pb "github.com/robberphex/grpc-in-memory/helloworld"
  "google.golang.org/grpc"
)

// 将一個服務實作轉化為一個用戶端
func serverToClient(svc *server) pb.GreeterClient {
  // 建立一個基于pipe的Listener
  pipe := ListenPipe()

  s := grpc.NewServer()
  // 注冊Greeter服務到gRPC
  pb.RegisterGreeterServer(s, svc)
  if err := s.Serve(pipe); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
  // 用戶端指定使用pipe作為網絡連接配接
  clientConn, err := grpc.Dial(`pipe`,
    grpc.WithInsecure(),
    grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {
      return pipe.DialContext(c, `pipe`, s)
    }),
  )
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  // 基于pipe連接配接,建立gRPC用戶端
  c := pb.NewGreeterClient(clientConn)
  return c
}

func main() {
  svc := NewServerImpl()
  c := serverToClient(svc)

  ctx := context.Background()

  // unary調用
  for i := 0; i < 5; i++ {
    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d", i)})
    if err != nil {
      log.Fatalf("could not greet: %v", err)
    }
    log.Printf("Greeting: %s", r.GetMessage())
  }

  // 用戶端流式調用
  for i := 0; i < 5; i++ {
    streamClient, err := c.SayHelloRequestStream(ctx)
    if err != nil {
      log.Fatalf("could not SayHelloRequestStream: %v", err)
    }
    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d", i)})
    if err != nil {
      log.Fatalf("could not Send: %v", err)
    }
    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup", i)})
    if err != nil {
      log.Fatalf("could not Send: %v", err)
    }
    reply, err := streamClient.CloseAndRecv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
  }

  // 服務端流式調用
  for i := 0; i < 5; i++ {
    streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d", i)})
    if err != nil {
      log.Fatalf("could not SayHelloReplyStream: %v", err)
    }
    reply, err := streamClient.Recv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
    reply, err = streamClient.Recv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
  }

  // 雙向流式調用
  for i := 0; i < 5; i++ {
    streamClient, err := c.SayHelloBiStream(ctx)
    if err != nil {
      log.Fatalf("could not SayHelloStream: %v", err)
    }
    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d", i)})
    if err != nil {
      log.Fatalf("could not Send: %v", err)
    }
    reply, err := streamClient.Recv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
  }
}           

總結

當然,作為基于記憶體的RPC調用,還可以有更好的方式,比如直接将對象傳遞到服務端,直接通過本地調用方式來通信。

但這種方式破壞了很多約定,比如對象位址、比如gRPC連接配接參數不生效等等。

本文介紹的,基于Pipe的通信方式,除了網絡層走了記憶體傳遞之外,其他都和正常RPC通信行為一緻,比如同樣經曆了序列化、經曆了HTTP/2的流控制等。當然,性能上比原生調用也會差一點,但是好在對于測試、驗證場景,行為上的一緻比較重要些。

本文代碼已經托管到了GitHub

https://github.com/robberphex/grpc-in-memory