天天看點

基于google protobuf的gRPC實作

1.Protobuf簡介

Protobuf(Google Protocol Buffers)提供一種靈活、高效、自動化的機制,用于序列化結構資料。Protobuf僅需自定義一次所需要的資料格式,然後我們就可以使用Protobuf編譯器自動生成各種語言的源碼,友善我們讀寫自定義的格式化資料。另外Protobuf的使用與平台和語言無關,可以在不破壞原資料格式的基礎上,擴充新的資料。

我們可以将Protobuf與XML進行對比,但Protobuf更小、更快、更加簡單。總結來說具有一下特點:

  • 性能好、效率高。Protobuf作用與XML、json類似,但它是二進制格式,是以性能更好。但同時因為是二進制格式,是以缺點也就是可讀性差。
  • 支援多種語言,例C++、C#、Go、Java、Python等。
  • 代碼生成機制,易于使用。
  • 向前相容,向後相容。
  • 解析速度快。

2.Protobuf安裝

Mac使用者可以使用brew進行安裝,指令如下所示。

brew install protobuf           

複制

如需要安裝特定版本,可以先進行搜尋有哪些版本,指令如下所示。搜尋完成之後,采用上述brew安裝方法,安裝特定版本即可。

brew search protobuf           

複制

安裝完成之後,可以通過protoc --version檢視是否安裝成功。

protoc --version
libprotoc 3.6.0           

複制

另外可以通過which protoc指令檢視protoc安裝所在的位置。

which protoc
/usr/local/bin/protoc           

複制

3.Protobuf執行個體

3.1編譯.proto檔案

首先我們需要建立一個以.proto結尾的檔案,可以在其中定義message來指定所需要序列化的資料格式。每一個message都是一個小的資訊邏輯單元,包含一系列的name-value值對。以官網上的示例,我們建立一個addressbook.proto檔案,内容如下所示。

syntax = "proto2";

package tutorial;

message Person {
  required string name = 1;
  required int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    required string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}           

複制

  • syntax=”proto2”代表版本,目前支援proto2和proto3,不寫預設proto2。
  • package類似于C++中的namespace概念。
  • message是包含了各種類型字段的聚集,相當于struct,并且可以嵌套。
  • proto3版本去掉了required和optional類型,保留了repeated(數組)。其中“=1”,“=2”表示每個元素的辨別号,它會用在二進制編碼中對域的辨別,[1,15]之内的标志符在使用時占用一個位元組,[16,2047]之内的辨別号則占用2個位元組,是以從最優化角度考慮,可以将[1,15]使用在一些較常用或repeated的元素上。同時為了考慮将來可能會增加新的标志符,我們要事先預留一些标志符。

建構好addressbook.proto檔案後,運作Protobuf編譯器編譯.proto檔案,運作方法如下所示。其中-I表示.protoc所在的路徑,--python_out表示指定生成的目标檔案存在的路徑,最後的參數表示要編譯的.proto檔案。

protoc -I=$SRC_DIR --python_out=$DST_DIR $SRC_DIR/addressbook.proto           

複制

其中SRC_DIR為目錄,如果處于目前目錄的話,可通過如下所示指令來編譯.proto檔案。

protoc -I=. --python_out=. addressbook.proto           

複制

編譯完成之後會生成addressbook_pb2.py檔案,裡面包含序列化和反序列化等方法。

3.2序列化

import addressbook_pb2
import sys

def PromptForAddress(person):
  person.id = int(raw_input("Enter person ID number: "))
  person.name = raw_input("Enter name: ")

  email = raw_input("Enter email address (blank for none): ")
  if email != "":
    person.email = email

  while True:
    number = raw_input("Enter a phone number (or leave blank to finish): ")
    if number == "":
        break

    phone_number = person.phones.add()
    phone_number.number = number

    type = raw_input("Is this a mobile, home, or work phone? ")
    if type == "mobile":
        phone_number.type = addressbook_pb2.Person.MOBILE
    elif type == "home":
        phone_number.type = addressbook_pb2.Person.HOME
    elif type == "work":
        phone_number.type = addressbook_pb2.Person.WORK
    else:
        print "Unknown phone type; leaving as default value."


if len(sys.argv) != 2:
  print "Usage:", sys.argv[0], "ADDRESS_BOOK_FILE"
  sys.exit(-1)

address_book = addressbook_pb2.AddressBook()

# Read the existing address book.
try:
  f = open(sys.argv[1], "rb")
  address_book.ParseFromString(f.read())
  f.close()
except IOError:
  print sys.argv[1] + ": Could not open file.  Creating a new one."

# Add an address.
PromptForAddress(address_book.people.add())

# Write the new address book back to disk.
f = open(sys.argv[1], "wb")
f.write(address_book.SerializeToString())
f.close()           

複制

建立add_person.py檔案,代碼如上所示,然後通過SerializeToString()方法來進行序列化addressbook.proto中所定義的資訊。如果想要運作上述代碼的話,我們首先需要建立一個輸入檔案,例如命名為input.txt,不需輸入值。然後采用

python add_person input.txt

,便可進行序列化所輸入的資料。如果運作

python add_person

的話,不指定輸入檔案,則會報錯。

Enter person ID number: 1001
Enter name: 1001
Enter email address (blank for none): [email protected]
Enter a phone number (or leave blank to finish): 10010
Is this a mobile, home, or work phone? work
Enter a phone number (or leave blank to finish):           

複制

3.3反序列化

#! /usr/bin/python
import addressbook_pb2
import sys

# Iterates though all people in the AddressBook and prints info about them.
def ListPeople(address_book):
  for person in address_book.people:
    print "Person ID:", person.id
    print "  Name:", person.name
    if person.HasField('email'):
      print "  E-mail address:", person.email

    for phone_number in person.phones:
      if phone_number.type == addressbook_pb2.Person.MOBILE:
        print "  Mobile phone #: ",
      elif phone_number.type == addressbook_pb2.Person.HOME:
        print "  Home phone #: ",
      elif phone_number.type == addressbook_pb2.Person.WORK:
        print "  Work phone #: ",
      print phone_number.number

# Main procedure:  Reads the entire address book from a file and prints all
#   the information inside.

if len(sys.argv) != 2:
  print "Usage:", sys.argv[0], "ADDRESS_BOOK_FILE"
  sys.exit(-1)

address_book = addressbook_pb2.AddressBook()

# Read the existing address book.
f = open(sys.argv[1], "rb")
address_book.ParseFromString(f.read())
f.close()

ListPeople(address_book)           

複制

建立list_person.py檔案來進行反序列化,代碼如上所示。通過

python list_person.py input.txt

指令來執行上述代碼,輸出結果如下所示。

Person ID: 1001
Name: 1001
E-mail address: [email protected]
Work phone #:  10010           

複制

4.RPC簡介

這裡引用知乎使用者用心閣關于誰能用通俗的語言解釋一下什麼是 RPC 架構?的問題答案來解釋什麼是RPC。RPC(Remote Procedure Call)是指遠端過程調用,也就是說兩台伺服器A、B,一個應用部署在A伺服器上,想要調用B伺服器上應用提供的函數/方法,由于不在一個記憶體空間上,不能直接調用,需要通過網絡來表達調用的語義和傳達調用的資料。如果需要實作RPC,那麼需要解決如下幾個問題。

  • 通訊:主要是通過在用戶端和伺服器之間建立TCP連接配接,遠端過程調用的所有交換的資料都在這個連接配接裡傳輸。連接配接可以是按需連接配接,調用結束後就斷掉,也可以是長連接配接,多個遠端過程調用共享同一個連接配接。
  • 尋址:A伺服器上的應用怎麼告訴底層的RPC架構,如何連接配接到B伺服器(如主機或IP位址)以及特定的端口,方法的名稱名稱是什麼。
  • 序列化:當A伺服器上的應用發起遠端過程調用時,方法的參數需要通過底層的網絡協定,如TCP傳遞到B伺服器。由于網絡協定是基于二進制的,記憶體中的參數值要序列化成二進制的形式,也就是序列化(Serialize)或編組(marshal),通過尋址和傳輸将序列化的二進制發送給B伺服器。 B伺服器收到請求後,需要對參數進行反序列化,恢複為記憶體中的表達方式,然後找到對應的方法進行本地調用,然後得到傳回值。 傳回值還要發送回伺服器A上的應用,也要經過序列化的方式發送,伺服器A接到後,再反序列化,恢複為記憶體中的表達方式,交給A伺服器上的應用 。
基于google protobuf的gRPC實作

總結來說,RPC提供一種透明調用機制讓使用者不必顯示區分本地調用還是遠端調用。如上圖所示,客戶方像調用本地方法一樣去調用遠端接口方法,RPC 架構提供接口的代理實作,實際的調用将委托給代理

RpcProxy

。代理封裝調用資訊并将調用轉交給

RpcInvoker

去實際執行。在用戶端的

RpcInvoker

通過連接配接器

RpcConnector

去維持與服務端的通道

RpcChannel

,并使用

RpcProtocol

執行協定編碼(encode)并将編碼後的請求消息通過通道發送給服務方。RPC 服務端接收器

RpcAcceptor

接收用戶端的調用請求,同樣使用

RpcProtocol

執行協定解碼(decode)。解碼後的調用資訊傳遞給

RpcProcessor

去控制處理調用過程,最後再委托調用給

RpcInvoker

去實際執行并傳回調用結果。

5.基于google protobuf的gRPC實作

我們可以利用protobuf實作序列化和反序列化,但如何實作RPC通信呢。為簡單起見,我們先介紹gRPC,gRPC是google建構的RPC架構,這樣我們就不再考慮如何寫通信方法。

5.1gRPC安裝

首先安裝gRPC,安裝指令如下所示。

pip install grpcio           

複制

然後安裝protobuf相關的依賴庫。

pip install protobuf           

複制

然後安裝python gRPC相關的protobuf相關檔案。

pip install grpcio-tools           

複制

5.2gRPC執行個體

建立三個檔案夾,名稱為example、server、client,裡面内容如下所示,具體含義在後面解釋。

example
  __init__.py
  data.proto
  data_pb2.py
  data_pb2_grpc.py
server
  server.py
client
  client.py           

複制

5.2.1 example

example主要用于編寫.proto檔案并生成data接口,其中__init__.py的作用是友善其他檔案夾引用example檔案夾中檔案,data.proto檔案内容如下所示。

syntax="proto3";
package example;

message Data{
    string text=1;
}

service FormatData{
    rpc DoFormat(Data) returns (Data) {}
}           

複制

然後在example目錄下利用下述指令生成data_pb2.py和data_pb2_grpc.py檔案。data_pb2.py用于序列化資訊,data_pb2_grpc.py用于通信。

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./data.proto           

複制

5.2.2 server

server為伺服器端,server.py實作接受用戶端發送的資料,并對資料進行處理後傳回給用戶端。FormatData的作用是将伺服器端傳過來的資料轉換為大寫,具體含義見相關代碼和注釋。

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import grpc
import time
from concurrent import futures #具有線程池和程序池、管理并行程式設計任務、處理非确定性的執行流程、程序/線程同步等功能
from example import data_pb2
from example import data_pb2_grpc

_ONE_DAY_IN_SECONDS = 60*60*24
_HOST='localhost'
_PORT='8080'

class FormatData(data_pb2_grpc.FormatDataServicer):
    def DoFormat(self,request,context):
        str=request.text
        return data_pb2.Data(text=str.upper())

def serve():
    grpcServer=grpc.server(futures.ThreadPoolExecutor(max_workers=4))#最多有多少work并行執行任務
    data_pb2_grpc.add_FormatDataServicer_to_server(FormatData(),grpcServer)# 添加函數方法和伺服器,伺服器端會進行反序列化。
    grpcServer.add_insecure_port(_HOST+':'+_PORT) #建立伺服器和端口
    grpcServer.start()# 啟動服務端
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        grpcServer.stop(0)


if __name__=='__main__':
    serve()           

複制

5.2.3 client

clinet為用戶端,client.py實作用戶端發送資料,并接受server處理後傳回的資料,具體含義見相關代碼和注釋。

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import grpc
from example import data_pb2,data_pb2_grpc

_HOST='localhost'
_PORT='8080'

def run():
    conn=grpc.insecure_channel(_HOST+':'+_PORT)# 伺服器資訊
    client=data_pb2_grpc.FormatDataStub(channel=conn) #用戶端建立連接配接
    for i in range(0,5):
        respnse = client.DoFormat(data_pb2.Data(text='hello,world!'))  # 序列化資料傳遞過去
        print("received: " + respnse.text)

if __name__=='__main__':
    run()           

複制

接下來運作server.py來啟動伺服器,然後運作client.py便可以得到結果,可以看到所有資料均已大寫。最後需要關閉伺服器端,否則一直會處于運作狀态。

received: HELLO,WORLD!
received: HELLO,WORLD!
received: HELLO,WORLD!
received: HELLO,WORLD!
received: HELLO,WORLD!           

複制

6.基于google protobuf的RPC實作

因為RPC需要我們實作通信,是以會有一定難度,代碼量很大程度上也有增加,不友善在文中展現出來。是以我把代碼放到了github上面,位址在https://github.com/weizhixiaoyi/google-protobuf-service,有興趣的可以看下。

總的來說,protobuf RPC定義了一個抽象的RPC架構,RpcServiceStub和RpcService類是protobuf編譯器根據proto定義生成的類,RpcService定義了服務端暴露給用戶端的函數接口,具體實作需要使用者自己繼承這個類來實作。RpcServiceStub定義了服務端暴露函數的描述,并将用戶端對RpcServiceStub中函數的調用統一轉換到調用RpcChannel中的CallMethod方法,CallMethod通過RpcServiceStub傳過來的函數描述符和函數參數對該次rpc調用進行encode,最終通過RpcConnecor發送給服務方。對方以用戶端相反的過程最終調用RpcSerivice中定義的函數。

基于google protobuf的gRPC實作

事實上,protobuf rpc的架構隻是RpcChannel中定義了空的CallMethod,是以具體怎樣進行encode和調用RpcConnector都要自己實作。RpcConnector在protobuf中沒有定義,是以這個完成由使用者自己實作,它的作用就是收發rpc消息包。在服務端,RpcChannel通過調用RpcService中的CallMethod來具體調用RpcService中暴露給用戶端的函數。

參考

用心閣-誰能用通俗的語言解釋一下什麼是 RPC 架構?

在于思考-python通過protobuf實作rpc