Apache Dubbo 有injvm方式的通信,能夠避免網絡帶來的延遲,同時也不占用本地端口,對測試、本地驗證而言,是一種比較友善的RPC通信方式。
最近看到 containerd 的代碼,發現它也有類似的需求,那麼就考察了下gRPC有沒有類似的,基于記憶體的通信方式。發現pipe非常好用,是以記錄了下。
Golang/gRPC對網絡的抽象
首先,我們先看一下gRPC一次調用的架構圖。當然,這個架構圖目前隻關注了網絡抽象分布。

我們重點關注網絡部分。
作業系統系統抽象
首先,在網絡包之上,系統抽象出來了
socket,代表一條虛拟連接配接,對于UDP,這個虛拟連接配接是不可靠的,對于TCP,這個連結是盡力可靠的。
對于網絡程式設計而言,僅僅有連接配接是不夠的,還需要告訴開發者如何建立、關閉連接配接。
對于服務端,系統提供了
accept
方法 ,用來接收連接配接。
對于用戶端,系統提供了
connect
,用于和服務端建立連接配接。
Golang抽象
在Golang中,socket對等的概念叫
net.Conn
,代表了一條虛拟連接配接。
接下來,對于服務端,accept這個行為被包裝成了
net.Listener
接口 ;對于用戶端,Golang則基于connect提供了
net.Dial
。
type Listener interface {
// 接收來自用戶端的網絡連接配接
Accept() (Conn, error)
Close() error
Addr() Addr
}
gRPC使用
那麼gRPC是怎麼使用Listener和Dial的呢?
對于gRPC服務端,
Serve
接收一個Listener,表示在這個Listener上提供服務。
對于gRPC用戶端,網絡本質上就是一個能夠連接配接到某個地方的東西就可以,是以隻需要一個
dialer func(context.Context, string) (net.Conn, error)
函數就行了。
什麼是pipe
在作業系統層面,
pipe
表示一個資料管道,而這個管道兩端都在本程式中,可以很好的滿足我們的要求:基于記憶體的網絡通信。
Golang也基于pipe提供了
net.Pipe()
函數 建立了一個雙向的、基于記憶體通信的管道,在能力上,能夠很好的滿足gRPC對底層通信的要求。
但是
net.Pipe
僅僅産生了兩個
net.Conn
,即隻産生兩個網絡連接配接,沒有之前提到的Listner,也沒有Dial方法。
于是結合Golang的channel,把
net.Pipe
包裝成了Listner,也提供了Dial方法:
-
,隻需要監聽一個channel,用戶端連接配接過來的時候,把連接配接通過channel傳遞過來即可Listener.Accept()
-
,調用Pipe,将一端通過channel給服務端(作為服務端連接配接),另一端作為用戶端連接配接Dial方法
代碼如下:
package main
import (
"context"
"errors"
"net"
"sync"
"sync/atomic"
)
var ErrPipeListenerClosed = errors.New(`pipe listener already closed`)
type PipeListener struct {
ch chan net.Conn
close chan struct{}
done uint32
m sync.Mutex
}
func ListenPipe() *PipeListener {
return &PipeListener{
ch: make(chan net.Conn),
close: make(chan struct{}),
}
}
// Accept 等待用戶端連接配接
func (l *PipeListener) Accept() (c net.Conn, e error) {
select {
case c = <-l.ch:
case <-l.close:
e = ErrPipeListenerClosed
}
return
}
// Close 關閉 listener.
func (l *PipeListener) Close() (e error) {
if atomic.LoadUint32(&l.done) == 0 {
l.m.Lock()
defer l.m.Unlock()
if l.done == 0 {
defer atomic.StoreUint32(&l.done, 1)
close(l.close)
return
}
}
e = ErrPipeListenerClosed
return
}
// Addr 傳回 listener 的位址
func (l *PipeListener) Addr() net.Addr {
return pipeAddr(0)
}
func (l *PipeListener) Dial(network, addr string) (net.Conn, error) {
return l.DialContext(context.Background(), network, addr)
}
func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) {
// PipeListener是否已經關閉
if atomic.LoadUint32(&l.done) != 0 {
e = ErrPipeListenerClosed
return
}
// 建立pipe
c0, c1 := net.Pipe()
// 等待連接配接傳遞到服務端接收
select {
case <-ctx.Done():
e = ctx.Err()
case l.ch <- c0:
conn = c1
case <-l.close:
c0.Close()
c1.Close()
e = ErrPipeListenerClosed
}
return
}
type pipeAddr int
func (pipeAddr) Network() string {
return `pipe`
}
func (pipeAddr) String() string {
return `pipe`
}
如何用pipe作為gRPC的connection
有了上面的包裝,我們就可以基于此建立一個gRPC的伺服器端和用戶端,來進行基于記憶體的RPC通信了。
首先,我們簡單的建立一個服務,包含了四種調用方式:
syntax = "proto3";
option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// unary調用
rpc SayHello(HelloRequest) returns (HelloReply) {}
// 服務端流式調用
rpc SayHelloReplyStream(HelloRequest) returns (stream HelloReply);
// 用戶端流式調用
rpc SayHelloRequestStream(stream HelloRequest) returns (HelloReply);
// 雙向流式調用
rpc SayHelloBiStream(stream HelloRequest) returns (stream HelloReply);
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
然後生成相關的stub代碼:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
helloworld/helloworld.proto
然後開始寫服務端代碼,基本邏輯就是實作一個demo版本的服務端就好:
package main
import (
"context"
"log"
"github.com/robberphex/grpc-in-memory/helloworld"
pb "github.com/robberphex/grpc-in-memory/helloworld"
)
// helloworld.GreeterServer 的實作
type server struct {
// 為了後面代碼相容,必須聚合UnimplementedGreeterServer
// 這樣以後在proto檔案中新增加一個方法的時候,這段代碼至少不會報錯
pb.UnimplementedGreeterServer
}
// unary調用的服務端代碼
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
// 用戶端流式調用的服務端代碼
// 接收兩個req,然後傳回一個resp
func (s *server) SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error {
req, err := streamServer.Recv()
if err != nil {
log.Printf("error receiving: %v", err)
return err
}
log.Printf("Received: %v", req.GetName())
req, err = streamServer.Recv()
if err != nil {
log.Printf("error receiving: %v", err)
return err
}
log.Printf("Received: %v", req.GetName())
streamServer.SendAndClose(&pb.HelloReply{Message: "Hello " + req.GetName()})
return nil
}
// 服務端流式調用的服務端代碼
// 接收一個req,然後發送兩個resp
func (s *server) SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error {
log.Printf("Received: %v", req.GetName())
err := streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
if err != nil {
log.Printf("error Send: %+v", err)
return err
}
err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName() + "_dup"})
if err != nil {
log.Printf("error Send: %+v", err)
return err
}
return nil
}
// 雙向流式調用的服務端代碼
func (s *server) SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error {
req, err := streamServer.Recv()
if err != nil {
log.Printf("error receiving: %+v", err)
// 及時将錯誤傳回給用戶端,下同
return err
}
log.Printf("Received: %v", req.GetName())
err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
if err != nil {
log.Printf("error Send: %+v", err)
return err
}
// 離開這個函數後,streamServer會關閉,是以不推薦在單獨的goroute發送消息
return nil
}
// 建立一個服務端實作
func NewServerImpl() *server {
return &server{}
}
然後我們建立一個基于pipe連接配接的用戶端來調用服務端。
包含如下幾個步驟:
- 建立服務端實作
- 基于pipe建立listener,然後基于它建立gRPC server
- 基于pipe建立用戶端連接配接,然後建立gRPC client,調用服務
package main
import (
"context"
"fmt"
"log"
"net"
pb "github.com/robberphex/grpc-in-memory/helloworld"
"google.golang.org/grpc"
)
// 将一個服務實作轉化為一個用戶端
func serverToClient(svc *server) pb.GreeterClient {
// 建立一個基于pipe的Listener
pipe := ListenPipe()
s := grpc.NewServer()
// 注冊Greeter服務到gRPC
pb.RegisterGreeterServer(s, svc)
if err := s.Serve(pipe); err != nil {
log.Fatalf("failed to serve: %v", err)
}
// 用戶端指定使用pipe作為網絡連接配接
clientConn, err := grpc.Dial(`pipe`,
grpc.WithInsecure(),
grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {
return pipe.DialContext(c, `pipe`, s)
}),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
// 基于pipe連接配接,建立gRPC用戶端
c := pb.NewGreeterClient(clientConn)
return c
}
func main() {
svc := NewServerImpl()
c := serverToClient(svc)
ctx := context.Background()
// unary調用
for i := 0; i < 5; i++ {
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d", i)})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
// 用戶端流式調用
for i := 0; i < 5; i++ {
streamClient, err := c.SayHelloRequestStream(ctx)
if err != nil {
log.Fatalf("could not SayHelloRequestStream: %v", err)
}
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d", i)})
if err != nil {
log.Fatalf("could not Send: %v", err)
}
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup", i)})
if err != nil {
log.Fatalf("could not Send: %v", err)
}
reply, err := streamClient.CloseAndRecv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
}
// 服務端流式調用
for i := 0; i < 5; i++ {
streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d", i)})
if err != nil {
log.Fatalf("could not SayHelloReplyStream: %v", err)
}
reply, err := streamClient.Recv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
reply, err = streamClient.Recv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
}
// 雙向流式調用
for i := 0; i < 5; i++ {
streamClient, err := c.SayHelloBiStream(ctx)
if err != nil {
log.Fatalf("could not SayHelloStream: %v", err)
}
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d", i)})
if err != nil {
log.Fatalf("could not Send: %v", err)
}
reply, err := streamClient.Recv()
if err != nil {
log.Fatalf("could not Recv: %v", err)
}
log.Println(reply.GetMessage())
}
}
總結
當然,作為基于記憶體的RPC調用,還可以有更好的方式,比如直接将對象傳遞到服務端,直接通過本地調用方式來通信。
但這種方式破壞了很多約定,比如對象位址、比如gRPC連接配接參數不生效等等。
本文介紹的,基于Pipe的通信方式,除了網絡層走了記憶體傳遞之外,其他都和正常RPC通信行為一緻,比如同樣經曆了序列化、經曆了HTTP/2的流控制等。當然,性能上比原生調用也會差一點,但是好在對于測試、驗證場景,行為上的一緻比較重要些。
本文代碼已經托管到了GitHub
https://github.com/robberphex/grpc-in-memory