天天看點

服務網格GRPC協定多種程式設計語言實踐-2.GRPC協定示例的實作

1 Generated-code

首先本地開發環境需要安裝grpc和protobuf,這裡以macos為例:

brew install grpc protobuf

無論使用什麼程式設計語言實作GRPC協定的服務,都需要将protobuf定義轉換為該語言的代碼。

  • Java的建構工具Maven提供了自動轉換插件

    protobuf-maven-plugin

    ,執行

    mvn package

    會自動使用protoc-gen-grpc-java建立grpc的模闆代碼。詳見

    hello-grpc-java/pom.xml

  • Go需要執行

    go get github.com/golang/protobuf/protoc-gen-go

    安裝,然後使用

    protoc

    指令生成grpc代碼。詳見

    hello-grpc-go/proto2go.sh

  • NodeJs需要執行

    npm install -g grpc-tools

    安裝

    grpc_tools_node_protoc

    ,然後使用

    protoc

    hello-grpc-nodejs/proto2js.sh

  • Python需要執行

    pip install grpcio-tools

    grpcio-tools

    protoc

    hello-grpc-python/proto2py.sh

在示例工程中,每種語言的代碼目錄中都有一個proto目錄,其中的

landing.proto

檔案是示例根目錄下

proto/landing.proto

檔案的軟連接配接,這樣有利于統一更新protobuf的定義。

2 通信實作

hello數組

private final List<String> HELLO_LIST = Arrays.asList("Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요");

kv.put("data", HELLO_LIST.get(index));           
var helloList = []string{"Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"}

kv["data"] = helloList[index]           
let hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]

kv.set("data", hellos[index])           
hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]

result.kv["data"] = hellos[index]           

talk

Unary RPC類型的實作是最經典的,阻塞式一發一收。

// 使用blockingStub與服務端通信
public TalkResponse talk(TalkRequest talkRequest) {
    return blockingStub.talk(talkRequest);
}
//服務端處理請求後觸發StreamObserver執行個體的兩個事件onNext和onCompleted
public void talk(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
    ...
    responseObserver.onNext(response);
    responseObserver.onCompleted();
}           
func talk(client pb.LandingServiceClient, request *pb.TalkRequest) {
    r, err := client.Talk(context.Background(), request)
}

func (s *ProtoServer) Talk(ctx context.Context, request *pb.TalkRequest) (*pb.TalkResponse, error) {
    return &pb.TalkResponse{
        Status:  200,
        Results: []*pb.TalkResult{s.buildResult(request.Data)},
    }, nil
}           
function talk(client, request) {
    client.talk(request, function (err, response) {
            ...
    })
}
                
function talk(call, callback) {
    const talkResult = buildResult(call.request.getData())
    ...
    callback(null, response)
}                           
def talk(stub):
    response = stub.talk(request)
    
def talk(self, request, context):
    result = build_result(request.data)
    ...
    return response           

talkOneAnswerMore

Server streaming RPC類型的實作重點是用戶端在發送請求後如何處理流式響應資料,以及服務端的流式傳回響應。

public List<TalkResponse> talkOneAnswerMore(TalkRequest request) {
    Iterator<TalkResponse> talkResponses = blockingStub.talkOneAnswerMore(request);
    talkResponses.forEachRemaining(talkResponseList::add);
    return talkResponseList;
}

public void talkOneAnswerMore(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
    String[] datas = request.getData().split(",");
    for (String data : datas) {...}
    talkResponses.forEach(responseObserver::onNext);
    responseObserver.onCompleted();
}           
func talkOneAnswerMore(client pb.LandingServiceClient, request *pb.TalkRequest) {
    stream, err := client.TalkOneAnswerMore(context.Background(), request)
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            break
        }
    ...
    }
}

func (s *ProtoServer) TalkOneAnswerMore(request *pb.TalkRequest, stream pb.Landing..Server) error {
    datas := strings.Split(request.Data, ",")
    for _, d := range datas {
        stream.Send(&pb.TalkResponse{...})
}           
function talkOneAnswerMore(client, request) {
    let call = client.talkOneAnswerMore(request)
    call.on('data', function (response) {
        ...
    })
}

function talkOneAnswerMore(call) {
    let datas = call.request.getData().split(",")
    for (const data in datas) {
        ...
        call.write(response)
    }
    call.end()
}           
def talk_one_answer_more(stub):
    responses = stub.talkOneAnswerMore(request)
    for response in responses:
        logger.info(response)

def talkOneAnswerMore(self, request, context):
    datas = request.data.split(",")
    for data in datas:
        yield response           

talkMoreAnswerOne

Client streaming RPC類型的實作重點是用戶端以流式發送請求後,告訴服務端請求結束,服務端會将多次請求分别處理,在收到結束時一次傳回給用戶端。

public void talkMoreAnswerOne(List<TalkRequest> requests) throws InterruptedException {
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() {
        @Override
        public void onNext(TalkResponse talkResponse) {
            log.info("Response=\n{}", talkResponse);
        }
        @Override
        public void onCompleted() {
            finishLatch.countDown();
        }
    };
    final StreamObserver<TalkRequest> requestObserver = asyncStub.talkMoreAnswerOne(responseObserver);
    try {
        requests.forEach(request -> {
            if (finishLatch.getCount() > 0) {
                requestObserver.onNext(request);
        });
    requestObserver.onCompleted();
}
                         
public StreamObserver<TalkRequest> talkMoreAnswerOne(StreamObserver<TalkResponse> responseObserver) {
    return new StreamObserver<TalkRequest>() {
        @Override
        public void onNext(TalkRequest request) {
            talkRequests.add(request);
        }
        @Override
        public void onCompleted() {
            responseObserver.onNext(buildResponse(talkRequests));
            responseObserver.onCompleted();
        }
    };
}           
func talkMoreAnswerOne(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
    stream, err := client.TalkMoreAnswerOne(context.Background())
    for _, request := range requests {
        stream.Send(request)
    }
    r, err := stream.CloseAndRecv()
}

func (s *ProtoServer) TalkMoreAnswerOne(stream pb.LandingService_TalkMoreAnswerOneServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            talkResponse := &pb.TalkResponse{
                Status:  200,
                Results: rs,
            }
            stream.SendAndClose(talkResponse)
            return nil
        }
        rs = append(rs, s.buildResult(in.Data))
    }
}           
function talkMoreAnswerOne(client, requests) {
    let call = client.talkMoreAnswerOne(function (err, response) {
        ...
    })
    requests.forEach(request => {
        call.write(request)
    })
    call.end()
}

function talkMoreAnswerOne(call, callback) {
    let talkResults = []
    call.on('data', function (request) {
        talkResults.push(buildResult(request.getData()))
    })
    call.on('end', function () {
        let response = new messages.TalkResponse()
        response.setStatus(200)
        response.setResultsList(talkResults)
        callback(null, response)
    })
}           
def talk_more_answer_one(stub):
    response_summary = stub.talkMoreAnswerOne(request_iterator)

def generate_request():
    for _ in range(0, 3):
        yield request
        
def talkMoreAnswerOne(self, request_iterator, context):
    for request in request_iterator:
        response.results.append(build_result(request.data))
    return response             

talkBidirectional

Bidirectional streaming RPC類型的實作重點是用戶端以流式發送請求後,告訴服務端請求結束,服務端會在每次請求後将結果傳回,并在收到結束時,告訴用戶端結束。

public void talkBidirectional(List<TalkRequest> requests) throws InterruptedException {
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() {
        @Override
        public void onNext(TalkResponse talkResponse) {
            log.info("Response=\n{}", talkResponse);
        }
        @Override
        public void onCompleted() {
            finishLatch.countDown();
        }
    };
    final StreamObserver<TalkRequest> requestObserver = asyncStub.talkBidirectional(responseObserver);
    try {
        requests.forEach(request -> {
            if (finishLatch.getCount() > 0) {
                requestObserver.onNext(request);
    ...
    requestObserver.onCompleted();
}

public StreamObserver<TalkRequest> talkBidirectional(StreamObserver<TalkResponse> responseObserver) {
    return new StreamObserver<TalkRequest>() {
        @Override
        public void onNext(TalkRequest request) {
            responseObserver.onNext(TalkResponse.newBuilder()
                    .setStatus(200)
                    .addResults(buildResult(request.getData())).build());
        }
        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }
    };
}           
func talkBidirectional(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
    stream, err := client.TalkBidirectional(context.Background())
    waitc := make(chan struct{})
    go func() {
        for {
            r, err := stream.Recv()
            if err == io.EOF {
                // read done.
                close(waitc)
                return
            }
        }
    }()
    for _, request := range requests {
        stream.Send(request)
    }
    stream.CloseSend()
    <-waitc
}

func (s *ProtoServer) TalkBidirectional(stream pb.LandingService_TalkBidirectionalServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        stream.Send(talkResponse)
    }
}           
function talkBidirectional(client, requests) {
    let call = client.talkBidirectional()
    call.on('data', function (response) {
        ...
    })
    requests.forEach(request => {
        call.write(request)
    })
    call.end()
}

function talkBidirectional(call) {
    call.on('data', function (request) {
        call.write(response)
    })
    call.on('end', function () {
        call.end()
    })
}           
def talk_bidirectional(stub):
    responses = stub.talkBidirectional(request_iterator)
    for response in responses:
        logger.info(response)

def talkBidirectional(self, request_iterator, context):
    for request in request_iterator:
        yield response           

2 要點實作

環境變量

private static String getGrcServer() {
    String server = System.getenv("GRPC_SERVER");
    if (server == null) {
        return "localhost";
    }
    return server;
}           
func grpcServer() string {
    server := os.Getenv("GRPC_SERVER")
    if len(server) == 0 {
        return "localhost"
    } else {
        return server
    }
}           
function grpcServer() {
    let server = process.env.GRPC_SERVER;
    if (typeof server !== 'undefined' && server !== null) {
        return server
    } else {
        return "localhost"
    }
}           
def grpc_server():
    server = os.getenv("GRPC_SERVER")
    if server:
        return server
    else:
        return "localhost"           

随機數

public static String getRandomId() {
    return String.valueOf(random.nextInt(5));
}           
func randomId(max int) string {
    return strconv.Itoa(rand.Intn(max))
}           
function randomId(max) {
    return Math.floor(Math.random() * Math.floor(max)).toString()
}           
def random_id(end):
    return str(random.randint(0, end))           

時間戳

TalkResult.newBuilder().setId(System.nanoTime())           
result.Id = time.Now().UnixNano()           
result.setId(Math.round(Date.now() / 1000))           
result.id = int((time.time()))           

UUID

kv.put("id", UUID.randomUUID().toString());           
import (
    "github.com/google/uuid"
)
kv["id"] = uuid.New().String()           
kv.set("id", uuid.v1())           
result.kv["id"] = str(uuid.uuid1())           

Sleep

TimeUnit.SECONDS.sleep(1);           
time.Sleep(2 * time.Millisecond)           
let sleep = require('sleep')

sleep.msleep(2)           
time.sleep(random.uniform(0.5, 1.5))           

3 驗證

功能驗證

完成功能開發後,我們在一個終端啟動GRPC服務,在另一個終端啟動用戶端。用戶端分别對4個通信接口進行請求。

java
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"           
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"           
go
go run server.go           
go run client/proto_client.go           
nodejs
node proto_server.js           
node proto_client.js           
python
python server/protoServer.py           
python client/protoClient.py           

交叉通信

在功能驗證基礎上,我們啟動任意一種程式設計語言實作的Server端,然後使用其他4種用戶端進行驗證。以確定不同程式設計語言實作的GRPC通信行為一緻。這步驗證是後續容器化和網格化的基礎,因為每種程式設計語言的Server端都會作為同一個Kubernetes Service的一個版本的Deployment釋出,它們的行為必須一緻,以保證路由到不同版本,結果是一緻的。

4 建構和分發

建構

通信功能驗證完畢後,接下來是将4個工程編譯、建構、打包。這一步的輸出是制作鏡像的輸入。

分别建構服務端和用戶端的jar,将其拷貝到docker目錄備用。

mvn clean install -DskipTests -f server_pom
cp target/hello-grpc-java.jar ../docker/

mvn clean install -DskipTests -f client_pom
cp target/hello-grpc-java.jar ../docker/           

go編譯的二進制是平台相關的,因為我們最終要部署到linux上,是以建構指令如下。然後将二進制拷貝到docker目錄備用。

env GOOS=linux GOARCH=amd64 go build -o proto_server server.go
mv proto_server ../docker/

env GOOS=linux GOARCH=amd64 go build -o proto_client client/proto_client.go
mv proto_client ../docker/           

node需要在docker鏡像中進行建構,才能支援運作時所需的各種c++依賴。是以這一步主要是拷貝備用。

cp ../hello-grpc-nodejs/proto_server.js node
cp ../hello-grpc-nodejs/package.json node
cp -R ../hello-grpc-nodejs/common node
cp -R ../proto node
cp ../hello-grpc-nodejs/*_client.js node           

python無需編譯,直接拷貝備用即可。

cp -R ../hello-grpc-python/server py
cp ../hello-grpc-python/start_server.sh py
cp -R ../proto py
cp ../hello-grpc-python/proto2py.sh py
cp -R ../hello-grpc-python/client py
cp ../hello-grpc-python/start_client.sh py           

dockerfile

建構完畢後,docker路徑下存儲了dockerfile所需的全部檔案。這裡将dockerfile中重點資訊說明如下。

  • 基礎鏡像我們盡量選擇

    alpine

    ,因為尺寸最小。python的基礎鏡像注意選擇2.7版本的

    python:2

    ,因為示例使用的是python2。如果對python3有強需求,請基于示例代碼酌情修改。
  • nodejs需要安裝c++及編譯器make,npm包需要安裝grpc-tools。

這裡以nodejs server的鏡像作為示例,說明建構鏡像的過程。

grpc-server-node.dockerfile
FROM node:14.11-alpine
RUN apk add --update \
      python \
      make \
      g++ \
  && rm -rf /var/cache/apk/*
RUN npm config set registry https://registry.npm.taobao.org && npm install -g node-pre-gyp grpc-tools --unsafe-perm
COPY node/package.json .
RUN npm install --unsafe-perm
COPY node .
ENTRYPOINT ["node","proto_server.js"]           
docker build
docker build -f grpc-server-node.dockerfile -t registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0 .           

鏡像清單

最終我們會建構出8個鏡像,使用push指令将鏡像分發到到阿裡雲ACR服務,作為下一篇kube的基礎。

  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_java:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_java:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_go:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_go:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_node:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_python:1.0.0
  • registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_python:1.0.0