天天看點

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

gRPC分享

概述

gRPC 一開始由 google 開發,是一款語言中立、平台中立、開源的遠端過程調用(RPC)系統。

在 gRPC 裡用戶端應用可以像調用本地對象一樣直接調用另一台不同的機器上服務端應用的方法,使得您能夠更容易地建立分布式應用和服務。與許多 RPC 系統類似,gRPC 也是基于以下理念:定義一個服務,指定其能夠被遠端調用的方法(包含參數和傳回類型)。在服務端實作這個接口,并運作一個 gRPC 伺服器來處理用戶端調用。在用戶端擁有一個存根能夠像服務端一樣的方法。

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

特性

基于HTTP/2

HTTP/2 提供了連接配接多路複用、雙向流、伺服器推送、請求優先級、首部壓縮等機制。可以節省帶寬、降低TCP連結次數、節省CPU,幫助移動裝置延長電池壽命等。gRPC 的協定設計上使用了HTTP2 現有的語義,請求和響應的資料使用HTTP Body 發送,其他的控制資訊則用Header 表示。

IDL使用ProtoBuf

gRPC使用ProtoBuf來定義服務,ProtoBuf是由Google開發的一種資料序列化協定(類似于XML、JSON、hessian)。ProtoBuf能夠将資料進行序列化,并廣泛應用在資料存儲、通信協定等方面。壓縮和傳輸效率高,文法簡單,表達力強。

多語言支援(C, C++, Python, PHP, Nodejs, C#, Objective-C、Golang、Java)

gRPC支援多種語言,并能夠基于語言自動生成用戶端和服務端功能庫。目前已提供了C版本grpc、Java版本grpc-java 和 Go版本grpc-go,其它語言的版本正在積極開發中,其中,grpc支援C、C++、Node.js、Python、Ruby、Objective-C、PHP和C#等語言,grpc-java已經支援Android開發。

gRPC已經應用在Google的雲服務和對外提供的API中,其主要應用場景如下:

  • 低延遲、高擴充性、分布式的系統
  • 同雲伺服器進行通信的移動應用用戶端
  • 設計語言獨立、高效、精确的新協定
  • 便于各方面擴充的分層設計,如認證、負載均衡、日志記錄、監控等

HTTP2.0 特性

HTTP/2,也就是超文本傳輸協定第2版,不論是1還是2,HTTP的基本語義是不變的,比如方法語義(GET/PUST/PUT/DELETE),狀态碼(200/404/500等),Range Request,Cacheing,Authentication、URL路徑, 不同的主要是下面幾點:

多路複用 (Multiplexing)

在 HTTP/1.1 協定中 「浏覽器用戶端在同一時間,針對同一域名下的請求有一定數量限制。超過限制數目的請求會被阻塞」。

HTTP/2 的多路複用(Multiplexing) 則允許同時通過單一的 HTTP/2 連接配接發起多重的請求-響應消息。

是以 HTTP/2 可以很容易的去實作多流并行而不用依賴建立多個 TCP 連接配接,HTTP/2 把 HTTP 協定通信的基本機關縮小為一個一個的幀,這些幀對應着邏輯流中的消息。并行地在同一個 TCP 連接配接上雙向交換消息。

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

二進制幀

HTTP/2 傳輸的資料是二進制的。相比 HTTP/1.1 的純文字資料,二進制資料一個顯而易見的好處是:更小的傳輸體積。這就意味着更低的負載。二進制的幀也更易于解析而且不易出錯,純文字幀在解析的時候還要考慮處理空格、大小寫、空行和換行等問題,而二進制幀就不存在這個問題。

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

首部壓縮(Header Compression)

HTTP是無狀态協定。簡而言之,這意味着每個請求必須要攜帶伺服器需要的所有細節,而不是讓伺服器儲存住之前請求的中繼資料。因為http2沒有改變這個範式,是以它也需要這樣(攜帶所有細節),是以 HTTP 請求的頭部需要包含用于辨別身份的資料比如 cookies,而這些資料的量也在随着時間增長。每一個請求的頭部都包含這些大量的重複資料,無疑是一種很大的負擔。對請求頭部進行壓縮,将會大大減輕這種負擔,尤其對移動端來說,性能提高非常明顯。

HTTP/2 使用的壓縮方式是 HPACK。 http://http2.github.io/http2-spec/compression.html

HTTP2.0在用戶端和伺服器端使用“首部表”來跟蹤和存儲之前發送的鍵-值對,對于相同的資料,不再通過每次請求和響應發送;通信期間幾乎不會改變的通用鍵-值對(使用者代理、可接受的媒體類型,等等)隻需發送一次。

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

事實上,如果請求中不包含首部(例如對同一資源的輪詢請求),那麼首部開銷就是零位元組。此時所有首部都自動使用之前請求發送的首部。

如果首部發生變化了,那麼隻需要發送變化了資料在Headers幀裡面,新增或修改的首部幀會被追加到“首部表”。首部表在 HTTP2.0的連接配接存續期内始終存在,由用戶端和伺服器共同漸進地更新。

服務端推送(Server Push)

HTTP/2 的伺服器推送所作的工作就是,伺服器在收到用戶端對某個資源的請求時,會判斷用戶端十有八九還要請求其他的什麼資源,然後一同把這些資源都發送給用戶端,即便用戶端還沒有明确表示它需要這些資源。

用戶端可以選擇把額外的資源放入緩存中(是以這個特點也叫 Cache push),也可以選擇發送一個 RST_STREAM frame 拒絕任何它不想要的資源。

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

主動重置連結

Length的HTTP消息被送出之後,我們就很難中斷它了。當然,通常我們可以斷開整個TCP連結(但也不總是可以這樣),但這樣導緻的代價就是需要重新通過三次握手建立一個新的TCP連接配接。

HTTP/2 引入了一個 RST_STREAM frame 來讓用戶端在已有的連接配接中發送重置請求,進而中斷或者放棄響應。當浏覽器進行頁面跳轉或者使用者取消下載下傳時,它可以防止建立新連接配接,避免浪費所有帶寬。

與其他rpc比較

與thrift,dubbo,motan等比較

* Motan Dubbox thrift gRPC rpcx
開發語言 Java Java 跨語言 跨語言 go
分布式服務治理 Y Y 可以配合zookeeper, Eureka等實作 可以配合etcd(go),zookeeper,consul等實作 自帶服務注冊中心,也支援zookerper,etcd等發現方式
底層協定 motan協定,使用tcp長連接配接 Dubbo 協定、 Rmi 協定、 Hessian 協定、 HTTP 協定、 WebService 協定、Dubbo Thrift 協定、Memcached 協定 tpc/http/frame http2 tcp長連結
消息序列化 hessian2,json hessian2,json,resr,kyro,FST等,可擴充protobuf等 thrift protobuf Gob、Json、MessagePack、gencode、ProtoBuf等
跨語言程式設計 N(支援php client和c server) N Y Y N
負載均衡 ActiveWeight 、Random 、 RoundRobin 、LocalFirst 、 Consistent 、ConfigurableWeight Random 、RoundRobin 、ConsistentHash 、 LeastActive Haproxy, zookerper+用戶端負載均衡等方案 負載均衡軟體HaProxy等 支援随機請求、輪詢、低并發優先、一緻性 Hash等
容錯 Failover 失效切換、Failfast 快速失敗 Failover 、 Failfast 、Failsafe 、 Failback 、 Forking、 Broadcast Failover 具有 Failover 失效切換的容錯政策 失敗重試(Failover)、快速失敗(Failfast)
注冊中心 consul zookeeper zookeeper etcd,zookeeper,consul zookerper,etcd
性能 ★★ ★★ ★★★★ 比grpc快2-5倍 ★★★ 比dubbox,motan快 ★★★★★ 比thrift快1-1.5倍
側重優勢 服務管理 服務管理 跨語言,性能++ 跨語言,性能 性能++,服務治理
用戶端異步調用方案 使用thrift IDL “oneway” 關鍵字(無傳回結果),+callback tcp異步請求 - thrift IDL參數不支援函數或服務 stream傳輸,雙向通信
服務端異步處理 1、TNonblockingServer(java/c++,php);THsHaServer(java/c++); TThreadpoolServer(java/c++);TThreadSelectorServer(java/c++);2、結合消息隊列或中間件;3、swoole/goroutine等多任務支援 同上,使用stream傳輸。Stream對象在傳輸過程中會被當做集合,用Iterator來周遊處理

grpc vs thrift:

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

使用gRPC的公司或項目:

Google

Mochi中國

阿裡OTS

騰訊部分部門

Tensorflow項目中使用了grpc

CoreOS — Production API for etcd v3 is entirely gRPC. etcd v3的接口全部使用grpc

Square — replacement for all of their internal RPC. one of the very first adopters and contributors to gRPC.

ngrok — all 20+ internal services communicate via gRPC 一個内網轉發産品

Netflix

Yik Yak

VSCO

Cockroach

使用Thrift的公司或項目:

Facebook

雪球

餓了麼

今日頭條

evernote

友盟

小米

美團

Quora

Twitter

Pinterest

Foursquare

Maxeler Technologies

gRPC優缺點:

優點:

  • protobuf二進制消息,性能好/效率高(空間和時間效率都很不錯)
  • proto檔案生成目标代碼,簡單易用
  • 序列化反序列化直接對應程式中的資料類,不需要解析後在進行映射(XML,JSON都是這種方式)
  • 支援向前相容(新加字段采用預設值)和向後相容(忽略新加字段),簡化更新
  • 支援多種語言(可以把proto檔案看做IDL檔案)
  • Netty等一些架構內建

缺點:

  • 1)GRPC尚未提供連接配接池,需要自行實作
  • 2)尚未提供“服務發現”、“負載均衡”機制
  • 3)因為基于HTTP2,絕大部多數HTTP Server、Nginx都尚不支援,即Nginx不能将GRPC請求作為HTTP請求來負載均衡,而是作為普通的TCP請求。(nginx1.9版本已支援)
  • 4) Protobuf二進制可讀性差(貌似提供了Text_Fromat功能)
  • 預設不具備動态特性(可以通過動态定義生成消息類型或者動态編譯支援)

grpc坑:

來自https://news.ycombinator.com/item?id=12345223的網友:

http2隻允許單個連結傳輸10億流資料。原因在于:

htt2使用31位整形标示流,服務端使用奇數,用戶端使用偶數,是以總共10億可用。

HTTP/2.0 uses an unsigned 31-bit integer to identity individual streams over a connection.
Server-initiated streams must use even identifiers. 
Client-initiated streams must use odd identifiers.
           

解決思路:超過一定數量的流,需要重新開機連結。

gRPC通信方式

gRPC有四種通信方式:

1、 Simple RPC

簡單rpc

這就是一般的rpc調用,一個請求對象對應一個傳回對象

proto文法:

rpc simpleHello(Person) returns (Result) {}
           

2、 Server-side streaming RPC

服務端流式rpc

一個請求對象,服務端可以傳回多個結果對象

proto文法

rpc serverStreamHello(Person) returns (stream Result) {}
           

3、 Client-side streaming RPC

用戶端流式rpc

用戶端傳入多個請求對象,服務端傳回一個響應結果

proto文法

rpc clientStreamHello(stream Person) returns (Result) {}
           

4、 Bidirectional streaming RPC

雙向流式rpc

結合用戶端流式rpc和服務端流式rpc,可以傳入多個對象,傳回多個響應對象

proto文法

rpc biStreamHello(stream Person) returns (stream Result) {}
           

服務定義及ProtoBuf

gRPC使用ProtoBuf定義服務, 我們可以一次性的在一個 .proto 檔案中定義服務并使用任何支援它的語言去實作用戶端和伺服器,反過來,它們可以在各種環境中,從雲伺服器到你自己的平闆電腦—— gRPC 幫你解決了不同語言及環境間通信的複雜性。使用 protocol buffers 還能獲得其他好處,包括高效的序列号,簡單的 IDL 以及容易進行接口更新。

protoc編譯工具

protoc工具可在https://github.com/google/protobuf/releases 下載下傳到源碼。

linux下安裝

protobuf文法

1、syntax = “proto3”;

檔案的第一行指定了你使用的是proto3的文法:如果你不指定,protocol buffer 編譯器就會認為你使用的是proto2的文法。這個語句必須出現在.proto檔案的非空非注釋的第一行。

2、message SearchRequest {……}

message 定義實體,c/c++/go中的結構體,php中類

3、基本資料類型

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

4、注釋符号: 雙斜線,如://xxxxxxxxxxxxxxxxxxx

5、字段唯一數字辨別(用于在二進制格式中識别各個字段,上線後不宜再變動):Tags

1到15使用一個位元組來編碼,包括辨別數字和字段類型(你可以在Protocol Buffer 編碼中檢視更多詳細);16到2047占用兩個位元組。是以定義proto檔案時應該保留1到15,用作出現最頻繁的消息類型的辨別。記得為将來會繼續增加并可能頻繁出現的元素留一點兒辨別區間,也就是說,不要一下子把1—15全部用完,為将來留一點兒。

辨別數字的合法範圍:最小是1,最大是 229 - 1,或者536,870,911。

另外,不能使用19000 到 19999之間的數字(FieldDescriptor::kFirstReservedNumber through FieldDescriptor::kLastReservedNumber),因為它們被Protocol Buffers保留使用

6、字段修飾符:

required:值不可為空

optional:可選字段

singular:符合文法規則的消息包含零個或者一個這樣的字段(最多一個)

repeated:一個字段在合法的消息中可以重複出現一定次數(包括零次)。重複出現的值的次序将被保留。在proto3中,重複出現的值類型字段預設采用壓縮編碼。你可以在這裡找到更多關于壓縮編碼的東西: Protocol Buffer Encoding。

預設值: optional PhoneType type = 2 [default = HOME];

proto3中,省略required,optional,singular,由protoc自動選擇。

7、代理類生成

1)、C++, 每一個.proto 檔案可以生成一個 .h 檔案和一個 .cc 檔案

2)、Java, 每一個.proto檔案可以生成一個 .java 檔案

3)、Python, 每一個.proto檔案生成一個子產品,其中為每一個消息類型生成一個靜态的描述器,在運作時,和一個metaclass一起使用來建立必要的Python資料通路類

4)、Go, 每一個.proto生成一個 .pb.go 檔案

5)、Ruby, 每一個.proto生成一個 .rb 檔案

6)、Objective-C, 每一個.proto 檔案可以生成一個 pbobjc.h 和一個pbobjc.m 檔案

7)、C#, 每一個.proto檔案可以生成一個.cs檔案.

8)、php, 每一個message消息體生成一個.php類檔案,并在GPBMetadata目錄生成一個對應包名的.php類檔案,用于儲存.proto的二進制中繼資料。

8、字段預設值

  • strings, 預設值是空字元串(empty string)
  • bytes, 預設值是空bytes(empty bytes)
  • bools, 預設值是false
  • numeric, 預設值是0
  • enums, 預設值是第一個枚舉值(value必須為0)
  • message fields, the field is not set. Its exact value is langauge-dependent. See the generated code guide for details.
  • repeated fields,預設值為empty,通常是一個空list

    9、枚舉

// 枚舉類型,必須從0開始,序号可跨越。同一包下不能重名,是以加字首來差別
enum WshExportInstStatus {
    INST_INITED = 0;
    INST_RUNNING = 1;
    INST_FINISH = 2;
    INST_FAILED = 3;
}
           

10、Maps字段類型

map<key_type, value_type> map_field = N;
           

其中key_type可以是任意Integer或者string類型(是以,除了floating和bytes的任意标量類型都是可以的)value_type可以是任意類型。

例如,如果你希望建立一個project的映射,每個Projecct使用一個string作為key,你可以像下面這樣定義:

map<string, Project> projects = 3;
           

Map的字段可以是repeated。

序列化後的順序和map疊代器的順序是不确定的,是以你不要期望以固定順序處理Map

當為.proto檔案産生生成文本格式的時候,map會按照key 的順序排序,數值化的key會按照數值排序。

從序列化中解析或者融合時,如果有重複的key則後一個key不會被使用,當從文本格式中解析map時,如果存在重複的key。

11、預設值

字元串類型預設為空字元串

位元組類型預設為空位元組

布爾類型預設false

數值類型預設為0值

enums類型預設為第一個定義的枚舉值,必須是0

12、服務

服務使用service{}包起來,每個方法使用rpc起一行申明,一個方法包含一個請求消息體和一個傳回消息體

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}
           

更多protobuf參考(google) 

更多protobuf參考(csdn)

golang中使用gRPC

前期準備

go get -u "google.golang.org/grpc"
go get -u "google.golang.org/grpc/reflection"
           

例如我定義的檔案exporttask.proto:

// 微生活導出服務
// 導出功能接口
/*
    1、 建立任務模闆
    2、 查詢、列出任務模闆
    3、 删除任務模闆
    4、 添加導出任務執行個體
    5、 查詢任務狀态
*/
syntax = "proto3";

// java 文法特别選項
option java_multiple_files = true;
option java_package = "io.grpc.welife.WshExportTask";
option java_outer_classname = "WshExportTask";
// 包名 golang包名,php中namespase,
package exporttask;

// 導出任務服務定義
service ExportTask {
  // 建立任務模闆
  rpc CreateTpl(WshExportTaskCreateTplReq) returns (WshExportTaskCreateTplRes) {}
  // 查詢任務模闆
  rpc ListTpl(WshExportTaskListTplReq) returns (WshExportTaskListTplRes) {}
}

// 枚舉類型,必須從0開始,序号可跨越。同一包下不能重名,是以加字首來差別
enum WshExportTplStatus {
    TPL_INITED = 0;
    TPL_NORMAL = 1;
    TPL_DELETED = 9;
}

enum WshExportFormat {
    FMT_DEFAULT = 0;
    FMT_CSV = 1;
    FMT_XLS = 2;
}

message WshExportTpl {
    string etplName = 1;
    string etplTag = 2;
    WshExportFormat etplOutputFormat = 3;
    string etplOutputColumns = 4;
    string etplExpr = 5;
    int32 etplId = 6;
    int32 etplExecTimes = 7;
    int32 etplExecOkTimes = 8;
    int32 etplStatus = 9;
    string etplCreated = 10;
    string etplUpdated = 11;
    string etplDeleted = 12;
    int32 operatorId = 13;
}

message WshExportTaskCreateTplReq {
    string etplName = 1;
    string etplTag = 2;
    string etplExpr = 3;
    string etplOutputColumns = 4;
    WshExportFormat etplOutputFormat = 5;
    int32 operatorId = 6;
}

message WshExportTaskCreateTplRes {
    string errCode = 1;
    string errMsg = 2;
    WshExportTpl data = 3;
}

message WshExportTaskListTplReq {
    int32 etplId = 1;
    string etplName = 2;
    string etplTag = 3;
}

// repeated 表示數組
message WshExportTaskListTplRes {
    string errCode = 1;
    string errMsg = 2;
    repeated WshExportTpl data = 3;
}
           

使用protoc指令生成golang對應的rpc代碼:

#格式 protoc --go_out=plugins=grpc:{go代碼輸出路徑} {proto檔案}
protoc --go_out=plugins=grpc:./ ./exporttask.proto
           

生成對應當exporttask.pb.go

// Code generated by protoc-gen-go. DO NOT EDIT.
// source: exporttask.proto

/*
Package exporttask is a generated protocol buffer package.

包名 golang包名,php中namespase,

It is generated from these files:
    exporttask.proto

It has these top-level messages:
    WshExportTpl
    WshExportTaskCreateTplReq
    WshExportTaskCreateTplRes
    WshExportTaskListTplReq
    WshExportTaskListTplRes
*/
package exporttask

import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"

import (
    context "golang.org/x/net/context"
    grpc "google.golang.org/grpc"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package

// 枚舉類型,必須從0開始,序号可跨越。同一包下不能重名,是以加字首來差別
type WshExportTplStatus int32

const (
    WshExportTplStatus_TPL_INITED  WshExportTplStatus = 0
    WshExportTplStatus_TPL_NORMAL  WshExportTplStatus = 1
    WshExportTplStatus_TPL_DELETED WshExportTplStatus = 9
)

var WshExportTplStatus_name = map[int32]string{
    0: "TPL_INITED",
    1: "TPL_NORMAL",
    9: "TPL_DELETED",
}
var WshExportTplStatus_value = map[string]int32{
    "TPL_INITED":  0,
    "TPL_NORMAL":  1,
    "TPL_DELETED": 9,
}

func (x WshExportTplStatus) String() string {
    return proto.EnumName(WshExportTplStatus_name, int32(x))
}
func (WshExportTplStatus) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }

type WshExportFormat int32

const (
    WshExportFormat_FMT_DEFAULT WshExportFormat = 0
    WshExportFormat_FMT_CSV     WshExportFormat = 1
    WshExportFormat_FMT_XLS     WshExportFormat = 2
)

var WshExportFormat_name = map[int32]string{
    0: "FMT_DEFAULT",
    1: "FMT_CSV",
    2: "FMT_XLS",
}
var WshExportFormat_value = map[string]int32{
    "FMT_DEFAULT": 0,
    "FMT_CSV":     1,
    "FMT_XLS":     2,
}

func (x WshExportFormat) String() string {
    return proto.EnumName(WshExportFormat_name, int32(x))
}
func (WshExportFormat) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }

type WshExportTpl struct {
    EtplName          string          `protobuf:"bytes,1,opt,name=etplName" json:"etplName,omitempty"`
    EtplTag           string          `protobuf:"bytes,2,opt,name=etplTag" json:"etplTag,omitempty"`
    EtplOutputFormat  WshExportFormat `protobuf:"varint,3,opt,name=etplOutputFormat,enum=exporttask.WshExportFormat" json:"etplOutputFormat,omitempty"`
    EtplOutputColumns string          `protobuf:"bytes,4,opt,name=etplOutputColumns" json:"etplOutputColumns,omitempty"`
    EtplExpr          string          `protobuf:"bytes,5,opt,name=etplExpr" json:"etplExpr,omitempty"`
    EtplId            int32           `protobuf:"varint,6,opt,name=etplId" json:"etplId,omitempty"`
    EtplExecTimes     int32           `protobuf:"varint,7,opt,name=etplExecTimes" json:"etplExecTimes,omitempty"`
    EtplExecOkTimes   int32           `protobuf:"varint,8,opt,name=etplExecOkTimes" json:"etplExecOkTimes,omitempty"`
    EtplStatus        int32           `protobuf:"varint,9,opt,name=etplStatus" json:"etplStatus,omitempty"`
    EtplCreated       string          `protobuf:"bytes,10,opt,name=etplCreated" json:"etplCreated,omitempty"`
    EtplUpdated       string          `protobuf:"bytes,11,opt,name=etplUpdated" json:"etplUpdated,omitempty"`
    EtplDeleted       string          `protobuf:"bytes,12,opt,name=etplDeleted" json:"etplDeleted,omitempty"`
    OperatorId        int32           `protobuf:"varint,13,opt,name=operatorId" json:"operatorId,omitempty"`
}

func (m *WshExportTpl) Reset()                    { *m = WshExportTpl{} }
func (m *WshExportTpl) String() string            { return proto.CompactTextString(m) }
func (*WshExportTpl) ProtoMessage()               {}
func (*WshExportTpl) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }

func (m *WshExportTpl) GetEtplName() string {
    if m != nil {
        return m.EtplName
    }
    return ""
}

func (m *WshExportTpl) GetEtplTag() string {
    if m != nil {
        return m.EtplTag
    }
    return ""
}

func (m *WshExportTpl) GetEtplOutputFormat() WshExportFormat {
    if m != nil {
        return m.EtplOutputFormat
    }
    return WshExportFormat_FMT_DEFAULT
}

func (m *WshExportTpl) GetEtplOutputColumns() string {
    if m != nil {
        return m.EtplOutputColumns
    }
    return ""
}

func (m *WshExportTpl) GetEtplExpr() string {
    if m != nil {
        return m.EtplExpr
    }
    return ""
}

func (m *WshExportTpl) GetEtplId() int32 {
    if m != nil {
        return m.EtplId
    }
    return 0
}

func (m *WshExportTpl) GetEtplExecTimes() int32 {
    if m != nil {
        return m.EtplExecTimes
    }
    return 0
}

func (m *WshExportTpl) GetEtplExecOkTimes() int32 {
    if m != nil {
        return m.EtplExecOkTimes
    }
    return 0
}

func (m *WshExportTpl) GetEtplStatus() int32 {
    if m != nil {
        return m.EtplStatus
    }
    return 0
}

func (m *WshExportTpl) GetEtplCreated() string {
    if m != nil {
        return m.EtplCreated
    }
    return ""
}

func (m *WshExportTpl) GetEtplUpdated() string {
    if m != nil {
        return m.EtplUpdated
    }
    return ""
}

func (m *WshExportTpl) GetEtplDeleted() string {
    if m != nil {
        return m.EtplDeleted
    }
    return ""
}

func (m *WshExportTpl) GetOperatorId() int32 {
    if m != nil {
        return m.OperatorId
    }
    return 0
}

type WshExportTaskCreateTplReq struct {
    EtplName          string          `protobuf:"bytes,1,opt,name=etplName" json:"etplName,omitempty"`
    EtplTag           string          `protobuf:"bytes,2,opt,name=etplTag" json:"etplTag,omitempty"`
    EtplExpr          string          `protobuf:"bytes,3,opt,name=etplExpr" json:"etplExpr,omitempty"`
    EtplOutputColumns string          `protobuf:"bytes,4,opt,name=etplOutputColumns" json:"etplOutputColumns,omitempty"`
    EtplOutputFormat  WshExportFormat `protobuf:"varint,5,opt,name=etplOutputFormat,enum=exporttask.WshExportFormat" json:"etplOutputFormat,omitempty"`
    OperatorId        int32           `protobuf:"varint,6,opt,name=operatorId" json:"operatorId,omitempty"`
}

func (m *WshExportTaskCreateTplReq) Reset()                    { *m = WshExportTaskCreateTplReq{} }
func (m *WshExportTaskCreateTplReq) String() string            { return proto.CompactTextString(m) }
func (*WshExportTaskCreateTplReq) ProtoMessage()               {}
func (*WshExportTaskCreateTplReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }

func (m *WshExportTaskCreateTplReq) GetEtplName() string {
    if m != nil {
        return m.EtplName
    }
    return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplTag() string {
    if m != nil {
        return m.EtplTag
    }
    return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplExpr() string {
    if m != nil {
        return m.EtplExpr
    }
    return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplOutputColumns() string {
    if m != nil {
        return m.EtplOutputColumns
    }
    return ""
}

func (m *WshExportTaskCreateTplReq) GetEtplOutputFormat() WshExportFormat {
    if m != nil {
        return m.EtplOutputFormat
    }
    return WshExportFormat_FMT_DEFAULT
}

func (m *WshExportTaskCreateTplReq) GetOperatorId() int32 {
    if m != nil {
        return m.OperatorId
    }
    return 0
}

type WshExportTaskCreateTplRes struct {
    ErrCode string        `protobuf:"bytes,1,opt,name=errCode" json:"errCode,omitempty"`
    ErrMsg  string        `protobuf:"bytes,2,opt,name=errMsg" json:"errMsg,omitempty"`
    Data    *WshExportTpl `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
}

func (m *WshExportTaskCreateTplRes) Reset()                    { *m = WshExportTaskCreateTplRes{} }
func (m *WshExportTaskCreateTplRes) String() string            { return proto.CompactTextString(m) }
func (*WshExportTaskCreateTplRes) ProtoMessage()               {}
func (*WshExportTaskCreateTplRes) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }

func (m *WshExportTaskCreateTplRes) GetErrCode() string {
    if m != nil {
        return m.ErrCode
    }
    return ""
}

func (m *WshExportTaskCreateTplRes) GetErrMsg() string {
    if m != nil {
        return m.ErrMsg
    }
    return ""
}

func (m *WshExportTaskCreateTplRes) GetData() *WshExportTpl {
    if m != nil {
        return m.Data
    }
    return nil
}

type WshExportTaskListTplReq struct {
    EtplId   int32  `protobuf:"varint,1,opt,name=etplId" json:"etplId,omitempty"`
    EtplName string `protobuf:"bytes,2,opt,name=etplName" json:"etplName,omitempty"`
    EtplTag  string `protobuf:"bytes,3,opt,name=etplTag" json:"etplTag,omitempty"`
}

func (m *WshExportTaskListTplReq) Reset()                    { *m = WshExportTaskListTplReq{} }
func (m *WshExportTaskListTplReq) String() string            { return proto.CompactTextString(m) }
func (*WshExportTaskListTplReq) ProtoMessage()               {}
func (*WshExportTaskListTplReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }

func (m *WshExportTaskListTplReq) GetEtplId() int32 {
    if m != nil {
        return m.EtplId
    }
    return 0
}

func (m *WshExportTaskListTplReq) GetEtplName() string {
    if m != nil {
        return m.EtplName
    }
    return ""
}

func (m *WshExportTaskListTplReq) GetEtplTag() string {
    if m != nil {
        return m.EtplTag
    }
    return ""
}

// repeated 表示數組
type WshExportTaskListTplRes struct {
    ErrCode string          `protobuf:"bytes,1,opt,name=errCode" json:"errCode,omitempty"`
    ErrMsg  string          `protobuf:"bytes,2,opt,name=errMsg" json:"errMsg,omitempty"`
    Data    []*WshExportTpl `protobuf:"bytes,3,rep,name=data" json:"data,omitempty"`
}

func (m *WshExportTaskListTplRes) Reset()                    { *m = WshExportTaskListTplRes{} }
func (m *WshExportTaskListTplRes) String() string            { return proto.CompactTextString(m) }
func (*WshExportTaskListTplRes) ProtoMessage()               {}
func (*WshExportTaskListTplRes) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }

func (m *WshExportTaskListTplRes) GetErrCode() string {
    if m != nil {
        return m.ErrCode
    }
    return ""
}

func (m *WshExportTaskListTplRes) GetErrMsg() string {
    if m != nil {
        return m.ErrMsg
    }
    return ""
}

func (m *WshExportTaskListTplRes) GetData() []*WshExportTpl {
    if m != nil {
        return m.Data
    }
    return nil
}

func init() {
    proto.RegisterType((*WshExportTpl)(nil), "exporttask.WshExportTpl")
    proto.RegisterType((*WshExportTaskCreateTplReq)(nil), "exporttask.WshExportTaskCreateTplReq")
    proto.RegisterType((*WshExportTaskCreateTplRes)(nil), "exporttask.WshExportTaskCreateTplRes")
    proto.RegisterType((*WshExportTaskListTplReq)(nil), "exporttask.WshExportTaskListTplReq")
    proto.RegisterType((*WshExportTaskListTplRes)(nil), "exporttask.WshExportTaskListTplRes")
    proto.RegisterEnum("exporttask.WshExportTplStatus", WshExportTplStatus_name, WshExportTplStatus_value)
    proto.RegisterEnum("exporttask.WshExportFormat", WshExportFormat_name, WshExportFormat_value)
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4

// Client API for ExportTask service

type ExportTaskClient interface {
    // 建立任務模闆
    CreateTpl(ctx context.Context, in *WshExportTaskCreateTplReq, opts ...grpc.CallOption) (*WshExportTaskCreateTplRes, error)
    // 查詢任務模闆
    ListTpl(ctx context.Context, in *WshExportTaskListTplReq, opts ...grpc.CallOption) (*WshExportTaskListTplRes, error)
}

type exportTaskClient struct {
    cc *grpc.ClientConn
}

func NewExportTaskClient(cc *grpc.ClientConn) ExportTaskClient {
    return &exportTaskClient{cc}
}

func (c *exportTaskClient) CreateTpl(ctx context.Context, in *WshExportTaskCreateTplReq, opts ...grpc.CallOption) (*WshExportTaskCreateTplRes, error) {
    out := new(WshExportTaskCreateTplRes)
    err := grpc.Invoke(ctx, "/exporttask.ExportTask/CreateTpl", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

func (c *exportTaskClient) ListTpl(ctx context.Context, in *WshExportTaskListTplReq, opts ...grpc.CallOption) (*WshExportTaskListTplRes, error) {
    out := new(WshExportTaskListTplRes)
    err := grpc.Invoke(ctx, "/exporttask.ExportTask/ListTpl", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

// Server API for ExportTask service

type ExportTaskServer interface {
    // 建立任務模闆
    CreateTpl(context.Context, *WshExportTaskCreateTplReq) (*WshExportTaskCreateTplRes, error)
    // 查詢任務模闆
    ListTpl(context.Context, *WshExportTaskListTplReq) (*WshExportTaskListTplRes, error)
}

func RegisterExportTaskServer(s *grpc.Server, srv ExportTaskServer) {
    s.RegisterService(&_ExportTask_serviceDesc, srv)
}

func _ExportTask_CreateTpl_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(WshExportTaskCreateTplReq)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(ExportTaskServer).CreateTpl(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/exporttask.ExportTask/CreateTpl",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(ExportTaskServer).CreateTpl(ctx, req.(*WshExportTaskCreateTplReq))
    }
    return interceptor(ctx, in, info, handler)
}

func _ExportTask_ListTpl_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(WshExportTaskListTplReq)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(ExportTaskServer).ListTpl(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/exporttask.ExportTask/ListTpl",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(ExportTaskServer).ListTpl(ctx, req.(*WshExportTaskListTplReq))
    }
    return interceptor(ctx, in, info, handler)
}

var _ExportTask_serviceDesc = grpc.ServiceDesc{
    ServiceName: "exporttask.ExportTask",
    HandlerType: (*ExportTaskServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "CreateTpl",
            Handler:    _ExportTask_CreateTpl_Handler,
        },
        {
            MethodName: "ListTpl",
            Handler:    _ExportTask_ListTpl_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "exporttask.proto",
}

func init() { proto.RegisterFile("exporttask.proto", fileDescriptor0) }

var fileDescriptor0 = []byte{
    // 550 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x54, 0x5f, 0x8f, 0xd2, 0x4e,
    0x14, 0xa5, 0xb0, 0xc0, 0x72, 0x59, 0x96, 0xfe, 0xe6, 0xe1, 0xe7, 0x88, 0xc6, 0x90, 0xaa, 0x09,
    0xd9, 0x6c, 0x30, 0xc1, 0x57, 0x5f, 0x76, 0xa1, 0x18, 0x92, 0x02, 0x1b, 0x28, 0x6a, 0xe2, 0xc3,
    0xa6, 0x6e, 0x47, 0x24, 0x14, 0x5b, 0x67, 0x86, 0xb8, 0xc6, 0x57, 0x3f, 0x98, 0x5f, 0xc6, 0xef,
    0x61, 0x66, 0xda, 0x61, 0x5a, 0xfe, 0xc4, 0x75, 0x7d, 0x9b, 0x73, 0xe6, 0x94, 0x7b, 0xef, 0x39,
    0x77, 0x00, 0x93, 0xdc, 0x46, 0x21, 0xe5, 0xdc, 0x63, 0xcb, 0x76, 0x44, 0x43, 0x1e, 0x22, 0xd0,
    0x8c, 0xf5, 0xab, 0x00, 0x27, 0x6f, 0xd9, 0x27, 0x5b, 0x32, 0x6e, 0x14, 0xa0, 0x06, 0x1c, 0x13,
    0x1e, 0x05, 0x23, 0x6f, 0x45, 0xb0, 0xd1, 0x34, 0x5a, 0x95, 0xc9, 0x06, 0x23, 0x0c, 0x65, 0x71,
    0x76, 0xbd, 0x39, 0xce, 0xcb, 0x2b, 0x05, 0xd1, 0x6b, 0x30, 0xc5, 0x71, 0xbc, 0xe6, 0xd1, 0x9a,
    0xf7, 0x43, 0xba, 0xf2, 0x38, 0x2e, 0x34, 0x8d, 0xd6, 0x69, 0xe7, 0x51, 0x3b, 0x55, 0x7f, 0x53,
    0x29, 0x96, 0x4c, 0x76, 0x3e, 0x42, 0xe7, 0xf0, 0x9f, 0xe6, 0xba, 0x61, 0xb0, 0x5e, 0x7d, 0x66,
    0xf8, 0x48, 0x16, 0xdb, 0xbd, 0x50, 0xcd, 0xda, 0xb7, 0x11, 0xc5, 0x45, 0xdd, 0xac, 0xc0, 0xe8,
    0x7f, 0x28, 0x89, 0xf3, 0xc0, 0xc7, 0xa5, 0xa6, 0xd1, 0x2a, 0x4e, 0x12, 0x84, 0x9e, 0x41, 0x2d,
    0xd6, 0x90, 0x1b, 0x77, 0xb1, 0x22, 0x0c, 0x97, 0xe5, 0x75, 0x96, 0x44, 0x2d, 0xa8, 0x2b, 0x62,
    0xbc, 0x8c, 0x75, 0xc7, 0x52, 0xb7, 0x4d, 0xa3, 0x27, 0x00, 0x82, 0x9a, 0x72, 0x8f, 0xaf, 0x19,
    0xae, 0x48, 0x51, 0x8a, 0x41, 0x4d, 0xa8, 0x0a, 0xd4, 0xa5, 0xc4, 0xe3, 0xc4, 0xc7, 0x20, 0xdb,
    0x4c, 0x53, 0x4a, 0x31, 0x8b, 0x7c, 0xa9, 0xa8, 0x6a, 0x45, 0x42, 0x29, 0x45, 0x8f, 0x04, 0x44,
    0x28, 0x4e, 0xb4, 0x22, 0xa1, 0x44, 0x17, 0x61, 0x44, 0xa8, 0xc7, 0x43, 0x3a, 0xf0, 0x71, 0x2d,
    0xee, 0x42, 0x33, 0xd6, 0x8f, 0x3c, 0x3c, 0xd4, 0x39, 0x7b, 0x6c, 0x19, 0x17, 0x77, 0xa3, 0x60,
    0x42, 0xbe, 0xdc, 0x33, 0xf4, 0xb4, 0xfb, 0x85, 0x2d, 0xf7, 0xff, 0x2e, 0xc7, 0x7d, 0xeb, 0x53,
    0xbc, 0xcf, 0xfa, 0x64, 0x6d, 0x28, 0xed, 0xd8, 0xf0, 0xfd, 0xb0, 0x0b, 0x4c, 0x4e, 0x4a, 0x69,
    0x37, 0xf4, 0x95, 0x09, 0x0a, 0xca, 0x5d, 0xa2, 0x74, 0xc8, 0x94, 0x05, 0x09, 0x42, 0xe7, 0x70,
    0xe4, 0x7b, 0xdc, 0x93, 0xd3, 0x57, 0x3b, 0x78, 0x6f, 0xaf, 0xe2, 0xc7, 0xa5, 0xca, 0x9a, 0xc3,
    0x83, 0x4c, 0x71, 0x67, 0xc1, 0x78, 0x12, 0x80, 0x5e, 0x56, 0x23, 0xb3, 0xac, 0xe9, 0x60, 0xf2,
    0x87, 0x83, 0x29, 0x64, 0x82, 0xb1, 0xbe, 0x1d, 0x2a, 0xf4, 0x6f, 0x33, 0x16, 0xfe, 0x3c, 0xe3,
    0x99, 0x0d, 0x28, 0xcd, 0x26, 0x6f, 0xe0, 0x14, 0xc0, 0xbd, 0x72, 0xae, 0x07, 0xa3, 0x81, 0x6b,
    0xf7, 0xcc, 0x9c, 0xc2, 0xa3, 0xf1, 0x64, 0x78, 0xe1, 0x98, 0x06, 0xaa, 0x43, 0x55, 0xe0, 0x9e,
    0xed, 0xd8, 0x42, 0x50, 0x39, 0x7b, 0x05, 0xf5, 0xad, 0xb0, 0x85, 0xa6, 0x3f, 0x74, 0xaf, 0x7b,
    0x76, 0xff, 0x62, 0xe6, 0xb8, 0x66, 0x0e, 0x55, 0xa1, 0x2c, 0x88, 0xee, 0xf4, 0x8d, 0x69, 0x28,
    0xf0, 0xce, 0x99, 0x9a, 0xf9, 0xce, 0x4f, 0x03, 0x40, 0x4f, 0x8f, 0xde, 0x43, 0x65, 0x93, 0x33,
    0x7a, 0xbe, 0x7f, 0x80, 0xad, 0x17, 0xd1, 0xb8, 0x93, 0x8c, 0x59, 0x39, 0x34, 0x83, 0x72, 0x62,
    0x2f, 0x7a, 0x7a, 0xf0, 0x1b, 0x9d, 0x74, 0xe3, 0x0e, 0x22, 0x66, 0xe5, 0x2e, 0x5f, 0xc0, 0xe3,
    0x45, 0xd8, 0x9e, 0xd3, 0xe8, 0xa6, 0xfd, 0x95, 0x04, 0x8b, 0x8f, 0x24, 0xab, 0xbd, 0xac, 0x65,
    0xe0, 0x95, 0xf1, 0xa1, 0x24, 0xff, 0xdb, 0x5f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x60, 0x9b,
    0xec, 0x89, 0xef, 0x05, 0x00, 0x00,
}
           

服務端實作proto中的接口:

package main

import (
    "log"
    //"net"

    svcExport "../../common/exporttask" // 包含上面的pb生成的go包
    "./model"
    _ "github.com/go-sql-driver/mysql"
    "golang.org/x/net/context"
    //"google.golang.org/grpc"
    //"google.golang.org/grpc/reflection"
)

// server 這個對象來實作 exporttask 包中的pb定義的rpc服務
// 實作的方式是将服務轉化成本地的資料庫操作
type server struct{}

func (s *server) CreateTpl(ctx context.Context, in *svcExport.WshExportTaskCreateTplReq) (res *svcExport.WshExportTaskCreateTplRes, err error) {
    res = new(svcExport.WshExportTaskCreateTplRes)
    res.Data = new(svcExport.WshExportTpl)

    var etplId int32 = 0

    etplId, err = model.CreateTpl(in.EtplName, in.EtplTag, in.EtplExpr, in.EtplOutputColumns, int32(in.EtplOutputFormat), in.OperatorId)
    //res.Data, err = model.GetTpl(etplId)
    res.Data.EtplId = etplId
    return res, err
}

func (s *server) ListTpl(ctx context.Context, in *svcExport.WshExportTaskListTplReq) (*svcExport.WshExportTaskListTplRes, error) {
    res := new(svcExport.WshExportTaskListTplRes)
    entList, err := model.ListTpl(in.EtplId, in.EtplName, in.EtplTag)

    if err != nil {
        res.ErrMsg = err.Error()
        res.ErrCode = "2"
    }

    for _, ent := range entList {
        t := new(svcExport.WshExportTpl)
        ent.CopyToPb(t)
        res.Data = append(res.Data, t)
    }

    return res, err
}
           

服務端main啟動服務main.go

/**
 * exporttask server main
 * $ go build exporttask.go
 */
package main

import (
    "log"
    "net"

    svcExport "../../common/exporttask"
    //"./model"
    _ "github.com/go-sql-driver/mysql"
    //"golang.org/x/net/context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
)

const (
    port = ":50051"
)

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    // 生成一個rpc伺服器
    s := grpc.NewServer()
    // 使用pb包調用注冊已實作的rpc接口類server
    svcExport.RegisterExportTaskServer(s, &server{})
    // Register reflection service on gRPC server.
    reflection.Register(s)
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
           

Golang gRPC用戶端

用戶端的代碼相對比較簡單

package main

import (
    "flag"
    "log"
    "os"

    svcExport "../../common/exporttask"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

const (
    address     = "127.0.0.1:50052"
    defaultName = "world"
)

func main() {

    // 發起連結
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    // 建立pb包的用戶端
    c := svcExport.NewExportTaskClient(conn)

    name := defaultName
    if len(os.Args) > 1 {
        name = os.Args[1]
    }

    // 發起請求
    var r2 *svcExport.WshExportTaskCreateTplRes
    req := svcExport.WshExportTaskCreateTplReq{EtplName: name, EtplTag: "mall", EtplExpr: "select EtplName from welife_export_tpl", EtplOutputFormat: svcExport.WshExportFormat_FMT_CSV, EtplOutputColumns: ""}
    r2, err = c.CreateTpl(context.Background(), &req)
    // 列印結果
    log.Println("create tpl: r=", r2, err)
}
           

php中使用gRPC

php需要安裝grpc擴充。

使用protoc指令生成對應的php代碼:

protoc --php_out=plugins=grpc:./ exporttask.proto
           

生成代碼包括:

Exporttask/
  |-WshExportFormat.php
  |-WshExportTaskCreateTplReq.php
  |-WshExportTaskCreateTplRes.php
  |-WshExportTaskListTplReq.php
  |-WshExportTaskListTplRes.php
  |-WshExportTpl.php
  \-WshExportTplStatus.php
GPBMetadata/
  \-Exporttask.php
           

每一個message對應生成一個類,在Exporttask命名空間下。

這裡就不都貼出來了,隻貼一個WshExportTpl.php:

<?php
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: exporttask.proto

namespace Exporttask;

use Google\Protobuf\Internal\GPBType;
use Google\Protobuf\Internal\RepeatedField;
use Google\Protobuf\Internal\GPBUtil;

/**
 * Protobuf type <code>exporttask.WshExportTpl</code>
 */
class WshExportTpl extends \Google\Protobuf\Internal\Message
{
    /**
     * <code>string etplName = 1;</code>
     */
    private $etplName = '';
    /**
     * <code>string etplTag = 2;</code>
     */
    private $etplTag = '';
    /**
     * <code>.exporttask.WshExportFormat etplOutputFormat = 3;</code>
     */
    private $etplOutputFormat = 0;
    /**
     * <code>string etplOutputColumns = 4;</code>
     */
    private $etplOutputColumns = '';
    /**
     * <code>string etplExpr = 5;</code>
     */
    private $etplExpr = '';
    /**
     * <code>int32 etplId = 6;</code>
     */
    private $etplId = 0;
    /**
     * <code>int32 etplExecTimes = 7;</code>
     */
    private $etplExecTimes = 0;
    /**
     * <code>int32 etplExecOkTimes = 8;</code>
     */
    private $etplExecOkTimes = 0;
    /**
     * <code>int32 etplStatus = 9;</code>
     */
    private $etplStatus = 0;
    /**
     * <code>string etplCreated = 10;</code>
     */
    private $etplCreated = '';
    /**
     * <code>string etplUpdated = 11;</code>
     */
    private $etplUpdated = '';
    /**
     * <code>string etplDeleted = 12;</code>
     */
    private $etplDeleted = '';
    /**
     * <code>int32 operatorId = 13;</code>
     */
    private $operatorId = 0;

    public function __construct() {
        \GPBMetadata\Exporttask::initOnce();
        parent::__construct();
    }

    /**
     * <code>string etplName = 1;</code>
     */
    public function getEtplName()
    {
        return $this->etplName;
    }

    /**
     * <code>string etplName = 1;</code>
     */
    public function setEtplName($var)
    {
        GPBUtil::checkString($var, True);
        $this->etplName = $var;
    }
    // ... 其他省略
           
<?php
  $client = new \Exporttask\GreeterClient('127.0.0.1:50051', [
        'credentials' => \Grpc\ChannelCredentials::createInsecure(),
    ]);
  $request = new Exporttask\WshExportTaskCreateTplReq();
  $request->setEtplName($name);
  list($reply, $status) = $client->createTpl($request)->wait();
  $message = $reply->getMessage();
  var_dump($message);
           

gRPC服務發現與服務治理的方案

目前gRPC主流分布式方案有這麼幾種: etcd, zookeeper, consul.

1、集中式LB(Proxy Model)

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

在服務消費者和服務提供者之間有一個獨立的LB,通常是專門的硬體裝置如 F5,或者基于軟體如 LVS,HAproxy等實作。LB上有所有服務的位址映射表,通常由運維配置注冊,當服務消費方調用某個目标服務時,它向LB發起請求,由LB以某種政策,比如輪詢(Round-Robin)做負載均衡後将請求轉發到目标服務。LB一般具備健康檢查能力,能自動摘除不健康的服務執行個體。 該方案主要問題:

  • 1、 單點問題,所有服務調用流量都經過LB,當服務數量和調用量大的時候,LB容易成為瓶頸,且一旦LB發生故障影響整個系統;
  • 2、服務消費方、提供方之間增加了一級,有一定性能開銷。

2、程序内LB(Balancing-aware Client)

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

針對第一個方案的不足,此方案将LB的功能內建到服務消費方程序裡,也被稱為軟負載或者用戶端負載方案。服務提供方啟動時,首先将服務位址注冊到服務系統資料庫,同時定期報心跳到服務系統資料庫以表明服務的存活狀态,相當于健康檢查,服務消費方要通路某個服務時,它通過内置的LB元件向服務系統資料庫查詢,同時緩存并定期重新整理目标服務位址清單,然後以某種負載均衡政策選擇一個目标服務位址,最後向目标服務發起請求。LB和服務發現能力被分散到每一個服務消費者的程序内部,同時服務消費方和服務提供方之間是直接調用,沒有額外開銷,性能比較好。該方案主要問題:

  • 1、開發成本,該方案将服務調用方內建到用戶端的程序裡頭,如果有多種不同的語言棧,就要配合開發多種不同的用戶端,有一定的研發和維護成本;
  • 2、另外生産環境中,後續如果要對客戶庫進行更新,勢必要求服務調用方修改代碼并重新釋出,更新較複雜。

3、獨立 LB 程序(External Load Balancing Service)

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作

該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。

不同之處是将LB和服務發現功能從程序内移出來,變成主機上的一個獨立程序。主機上的一個或者多個服務要通路目标服務時,他們都通過同一主機上的獨立LB程序做服務發現和負載均衡。該方案也是一種分布式方案沒有單點問題,一個LB程序挂了隻影響該主機上的服務調用方,服務調用方和LB之間是程序内調用性能好,同時該方案還簡化了服務調用方,不需要為不同語言開發客戶庫,LB的更新不需要服務調用方改代碼。

該方案主要問題:部署較複雜,環節多,出錯調試排查問題不友善。

服務發現負載均衡實作

gRPC開源元件官方并未直接提供服務注冊與發現的功能實作,但其設計文檔已提供實作的思路,并在不同語言的gRPC代碼API中已提供了命名解析和負載均衡接口供擴充。

grpc應用詳解gRPC分享特性HTTP2.0 特性多路複用 (Multiplexing)二進制幀首部壓縮(Header Compression)服務端推送(Server Push)主動重置連結與其他rpc比較使用gRPC的公司或項目:使用Thrift的公司或項目:gRPC優缺點:grpc坑:gRPC通信方式gRPC有四種通信方式:服務定義及ProtoBufgolang中使用gRPCgRPC服務發現與服務治理的方案服務發現負載均衡實作
  • 1、服務啟動後gRPC用戶端向命名伺服器發出名稱解析請求,名稱将解析為一個或多個IP位址,每個IP位址标示它是伺服器位址還是負載均衡器位址,以及标示要使用那個用戶端負載均衡政策或服務配置。
  • 2、用戶端執行個體化負載均衡政策,如果解析傳回的位址是負載均衡器位址,則用戶端将使用grpclb政策,否則用戶端使用服務配置請求的負載均衡政策。
  • 3、負載均衡政策為每個伺服器位址建立一個子通道(channel)。
  • 4、當有rpc請求時,負載均衡政策決定那個子通道即grpc伺服器将接收請求,當可用伺服器為空時用戶端的請求将被阻塞。

1)命名解析實作:resolver.go

package etcdv3

import (
    "errors"
    "fmt"
    "strings"

    etcd3 "github.com/coreos/etcd/clientv3"
    "google.golang.org/grpc/naming"
)

// resolver is the implementaion of grpc.naming.Resolver
type resolver struct {
    serviceName string // service name to resolve
}

// NewResolver return resolver with service name
func NewResolver(serviceName string) *resolver {
    return &resolver{serviceName: serviceName}
}

// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (re *resolver) Resolve(target string) (naming.Watcher, error) {
    if re.serviceName == "" {
        return nil, errors.New("grpclb: no service name provided")
    }

    // generate etcd client
    client, err := etcd3.New(etcd3.Config{
        Endpoints: strings.Split(target, ","),
    })
    if err != nil {
        return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
    }

    // Return watcher
    return &watcher{re: re, client: *client}, nil
}
           

2)服務發現實作:watcher.go

package etcdv3

import (
    "fmt"
    etcd3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
    "google.golang.org/grpc/naming"
    "github.com/coreos/etcd/mvcc/mvccpb"
)

// watcher is the implementaion of grpc.naming.Watcher
type watcher struct {
    re            *resolver // re: Etcd Resolver
    client        etcd3.Client
    isInitialized bool
}

// Close do nothing
func (w *watcher) Close() {
}

// Next to return the updates
func (w *watcher) Next() ([]*naming.Update, error) {
    // prefix is the etcd prefix/value to watch
    prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)

    // check if is initialized
    if !w.isInitialized {
        // query addresses from etcd
        resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())
        w.isInitialized = true
        if err == nil {
            addrs := extractAddrs(resp)
            //if not empty, return the updates or watcher new dir
            if l := len(addrs); l != 0 {
                updates := make([]*naming.Update, l)
                for i := range addrs {
                    updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
                }
                return updates, nil
            }
        }
    }

    // generate etcd Watcher
    rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case mvccpb.PUT:
                return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil
            case mvccpb.DELETE:
                return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil
            }
        }
    }
    return nil, nil
}

func extractAddrs(resp *etcd3.GetResponse) []string {
    addrs := []string{}

    if resp == nil || resp.Kvs == nil {
        return addrs
    }

    for i := range resp.Kvs {
        if v := resp.Kvs[i].Value; v != nil {
            addrs = append(addrs, string(v))
        }
    }

    return addrs
}
           

3)服務注冊實作:register.go

package etcdv3

import (
    "fmt"
    "log"
    "strings"
    "time"

    etcd3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
    "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)

// Prefix should start and end with no slash
var Prefix = "etcd3_naming"
var client etcd3.Client
var serviceKey string

var stopSignal = make(chan bool, 1)

// Register
func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {
    serviceValue := fmt.Sprintf("%s:%d", host, port)
    serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)

    // get endpoints for register dial address
    var err error
    client, err := etcd3.New(etcd3.Config{
        Endpoints: strings.Split(target, ","),
    })
    if err != nil {
        return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
    }

    go func() {
        // invoke self-register with ticker
        ticker := time.NewTicker(interval)
        for {
            // minimum lease TTL is ttl-second
            resp, _ := client.Grant(context.TODO(), int64(ttl))
            // should get first, if not exist, set it
            _, err := client.Get(context.Background(), serviceKey)
            if err != nil {
                if err == rpctypes.ErrKeyNotFound {
                    if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                        log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())
                    }
                } else {
                    log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())
                }
            } else {
                // refresh set to true for not notifying the watcher
                if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                    log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())
                }
            }
            select {
            case <-stopSignal:
                return
            case <-ticker.C:
            }
        }
    }()

    return nil
}

// UnRegister delete registered service from etcd
func UnRegister() error {
    stopSignal <- true
    stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock
    var err error;
    if _, err := client.Delete(context.Background(), serviceKey); err != nil {
        log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())
    } else {
        log.Printf("grpclb: deregister '%s' ok.", serviceKey)
    }
    return err
}
           

4)接口描述檔案:helloworld.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.midea.jr.test.grpc";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {
    }
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}
           

5)實作服務端接口:helloworldserver.go

package main

import (
    "flag"
    "fmt"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"

    "golang.org/x/net/context"
    "google.golang.org/grpc"

    grpclb "com.midea/jr/grpclb/naming/etcd/v3"
    "com.midea/jr/grpclb/example/pb"
)

var (
    serv = flag.String("service", "hello_service", "service name")
    port = flag.Int("port", 50001, "listening port")
    reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)

func main() {
    flag.Parse()

    lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
    if err != nil {
        panic(err)
    }

    err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)
    if err != nil {
        panic(err)
    }

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    go func() {
        s := <-ch
        log.Printf("receive signal '%v'", s)
        grpclb.UnRegister()
        os.Exit(1)
    }()

    log.Printf("starting hello service at %d", *port)
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    s.Serve(lis)
}

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
    return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
           

6)實作用戶端接口:helloworldclient.go

package main

import (
    "flag"
    "fmt"
    "time"

    grpclb "com.midea/jr/grpclb/naming/etcd/v3"
    "com.midea/jr/grpclb/example/pb"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "strconv"
)

var (
    serv = flag.String("service", "hello_service", "service name")
    reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)

func main() {
    flag.Parse()
    r := grpclb.NewResolver(*serv)
    b := grpc.RoundRobin(r)

    ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
    conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))
    if err != nil {
        panic(err)
    }

    ticker := time.NewTicker(1 * time.Second)
    for t := range ticker.C {
        client := pb.NewGreeterClient(conn)
        resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
        if err == nil {
            fmt.Printf("%v: Reply is %s\n", t, resp.Message)
        }
    }
}
           

參考:

gRPC