搗鼓hdfs、yarn、hbase、zookeeper的代碼一年多了,是時候整理一下了。在hadoop (2.5.2)中protobuf是節點之間以及用戶端和各個節點通信的基礎序列化架構(協定),而基于avro和writable的序列化架構則是這個協定裡的payload,因而這一系列的文章打算從protobuf這個架構開始入手(版本2.5.0)。
從抽象的角度來說,protobuf架構是類執行個體序列化和遠端調用的一種實作。所謂執行個體序列化簡單來說就是将一個類執行個體轉換成位元組數組或位元組流來表達,這個位元組數組或位元組流可以通過檔案形式儲存或者通過網絡發送給一個接收方;而另一個程式可以讀取檔案中的位元組或在接收到位元組流後反序列化并構造出一個新的和原來相等的執行個體(這裡的相等主要是值);除了protobuf,目前比較流行的序列化反序列化架構有java自帶序列化、反序列化架構(本文預設使用java,因而忽略c++、python等語言的相關内容)、json格式的序列化反序列化架構、xml格式的序列化反序列化架構以及上文提到的從hadoop中分支出來的avro架構(關于這些架構的孰優孰略不是本文要讨論的範圍,相信關于它們的比較網上也很容易找到)。在protobuf中使用message/messagelite來抽象一個可序列化和反序列化的執行個體,為了提升性能,使用者一般需要定義一些.proto檔案,并使用protobuf自帶的代碼生成器(編譯器)來生成對具體類的序列化和反序列化代碼;并且它使用将每個字段編号的方式來排列字段在序列化後的排列順序以及處理不同版本的相容性問題。對于遠端調用來說,protobuf并沒有多少實作,它用rpcchannel接口來抽象通信層的協定,而使用service接口來抽象每個具體的可調用接口,rpcchannel需要我們自己實作,而具體的service可以定義在.proto檔案中,由protobuf自帶的代碼生成器生成。代碼生成器同時支援blockingservice和service(async)的實作,其中異步方式通過注冊rpccallback來擷取傳回值。
抽象來說,消息其實是一段段具有特定含義的資料的集合。這些一段段具有特定含義的資料可以是基本類型,如int、string、double等,也可以是幾個基本類型聚合而成的另一個消息,即消息中可以内嵌消息。對一個消息字段來說,我們需要定義它的類型、字段名,在protobuf中還需要定義字段規則(field rule)和字段的唯一辨別号(tag)。其中字段規則有:required表示該字段必須存在,在調用builder的build方法時,如果有required的字段還未被設定,會抛出uninitializedmessageexception;optional表示該字段是可選的,因而在build()方法中不會對它做檢查,在isinitialized()方法中也不會考慮它的設值情況;repeated表示該字段可能存在多個值,在java版本中采用list來表達(protobuf在這裡采用了比較簡單的文法,因而我們無法選擇集合類型、集合中元素個數的限制等)。字段唯一辨別号用于定位一個字段,在xml和json中直接使用字段名來維護序列化後和類執行個體字段之間的映射,而且它們的字段類型(如果存儲的話)一般也以字元串的形式儲存在序列化後的位元組流中,這樣做的一個好處是序列化後的消息和消息的定義是相對獨立的,并且消息可讀性非常好,然而它的壞處也是比較明顯的,序列化後的消息位元組數會很大,序列化和反序列化的效率也會降低很多,為了解決這個問題,protobuf采用這個字段唯一辨別号來定位一個字段,對每個字段在protobuf内部還定義了其類型值,進而在對無法識别的字段編碼時可以通過這個類型資訊判斷使用那種方式解析這個未知字段,字段唯一辨別号和類型值一同組成一個int類型的值tag,其中類型值占最低三個bit,字段唯一辨別号占剩下29bit,即protobuf最大支援的字段數是2^29-1(536870911,然而19000到19999是系統保留的,因而不可以使用)。因而在每次寫入一個字段時都需要先寫入這個tag值,而每次讀取一個字段時也先讀取這個tag值以判斷這個字段的辨別号以及類型。protobuf使用可變長的整型編碼這個tag值,即對1-15的辨別号值隻需要一個位元組(字段類型占用3個bit),16-2047需要兩個位元組,因而推薦将1-15的辨別号值指派給使用最頻繁的字段,并且推薦保留一些空間給将來添加新的使用比較頻繁的字段。
關于required字段,如果将一個字段定義為required,在使用它時必須對它進行設值,并且出于相容性的考慮,以後需要一直保持它的required屬性。出于這個原因考慮,在google内部有人提出不要使用required字段,雖然這種提議并不是所有人都贊同。
最後,protobuf還支援optional字段的預設值設定,即在定義一個optional字段時在其後加入一個[…]的修飾,指定預設值(這個預設值應該隻支援基本類型和枚舉類型,對消息類型貌似沒法定義),此時如果該字段沒有被指派,則它傳回這個預設值,然而在序列化時這個預設值不會在序列化後的位元組流中出現,因而如果.proto檔案定義時的預設值發生改變,可能會出現序列化和反序列化出來某個字段值不一樣的情況,需要特别注意。對于沒有指定預設值的字段,protobuf采用預定義預設值,即string的預設值是空字元串、bool預設值是false、數值類型預設值0、枚舉類型預設值是定義的第一個枚舉項。
protobuf使用message來抽象序列化、反序列化對象,通常protobuf的用法是在.proto檔案中定義需要的message對象,然後使用protobuf提供的protoc将定義的message對象編譯成java/c++/python對象;在實際項目中引入這些生成的類,對這些message對象指派,并使用架構提供的方法實作序列化、反序列化。如一個比較簡單的searchrequest例子,其定義如下:
package levin.protobuf;
option java_package = "org.levin.protobuf.generated.simple";
option java_outer_classname = "searchrequestprotos";
message searchrequest {
required string query_string = 1;
optional int32 page_number = 2;
optional int32 result_per_page = 3 [ default = 50 ];
}
使用一下指令編譯并在test目錄下生成org.levin.protobuf.generated.simple.searchrequestprotos類:
protoc --java_out test searchrequest.proto
protobuf編譯器為每個message對象生成一個<message>orbuilder接口,該接口定義了message中所有字段的get方法和has<field>方法(用以判斷是否某個字段已經設值);對string類型字段,它還包含了bytestring傳回類型的get方法,bytestring是protobuf中對位元組數組的一種抽象,它類似string,是一個不可變對象,它有不同的實作,如literalbytestring隻是對位元組數組的封裝,boundedbytestring則可以從一個位元組數組中取處一段作為地層内容,而ropebytestring則采用樹狀結構來連接配接一系列的bytestring,以支援大容量并且無需拷貝的字元串。在bytestring中定義了多個方法來實作bytestring和位元組數組/字元串互相轉換:copyfrom()、copyto()、tobytearray()、tostring()、tostringutf8()等。在以上的searchrequst中的接口就是:searchrequestorbuilder,它繼承自messageorbuilder接口,其定義如下:
public interface searchrequestorbuilder extends messageorbuilder {
boolean hasquerystring();
string getquerystring();
bytestring getquerystringbytes();
boolean haspagenumber();
int getpagenumber();
boolean hasresultperpage();
int getresultperpage();
在生成searchrequestorbuilder接口後,protobuf編譯器會繼續生成定義的message對象:searchrequest,這個message對象也是一個不可變對象(immutable),它包含bitfield<x>_字段,該字段的每一個bit用于表示一個字段是否被已經被設值,因而其個數取決于字段的個數;而後每個message字段都會有一個對應的字段(其中string類型的字段采用object表達因為它有可能是string類型或者bytestring類型);之後是兩個緩存字段:memoizedisinitialized和memoizedserializedsize,用于緩存是否已經初始化以及序列化後的位元組數(message對象是不可變的);最後每個message對象都包含一個unknownfields字段用于儲存無法識别的字段,以及一個靜态的default執行個體,用以儲存message對象初始化狀态下的對象執行個體。編譯生成的searchrequest繼承自generatedmessage類并實作了searchrequestorbuilder接口中的所有方法,它定義了幾個靜态方法以擷取searchrequest在初始化狀态時的執行個體,擷取一個新的builder執行個體用于構造searchrequest執行個體,以及從位元組數組、bytestring、inputstream中解析出searchrequest執行個體等。
之後protobuf編譯器還會在每個message對象内部生成一個靜态的builder類,它繼承自generatedmessage.builder類,并實作了searchrequestorbuilder接口,該builder類和searchrequest消息類有類似的字段,并實作了各個字段的set方法以及build()方法用于build消息對象:searchrequest。另外,它還提供了mergefrom()方法,可以從位元組數組、bytestring、inputstream等解析位元組資料。
最後protobuf編譯器還會為每一個message對象生成用于描述該message對象的字元串,用filedescriptor.internalbuildgeneratedfilefrom()方法将其解析成一個descriptor和fileaccessortable執行個體。
在這個searchrequest消息定義的例子中,protobuf編譯器生成的類結構如下:
在使用protobuf編譯器生成這些類以後,使用起來非常簡單,建構searchrequest隻需要使用其靜态方法建立builder執行個體,設定各個字段的值,然後調用其build()方法即可。
searchrequest.builder builder = searchrequest.newbuilder();
builder.setquerystring("param1=value1&param2=value2");
builder.setpagenumber(10);
builder.setresultperpage(100);
searchrequest request = builder.build();
system.out.println(request);
這段代碼的輸出結果為:
query_string: "param1=value1&param2=value2"
page_number: 10
result_per_page: 100
在序列化時,調用可調用messagelite中的writeto()方法,反序列化時可調用messagelite.builder中的mergefrom()方法:
bytearrayoutputstream bytes = new bytearrayoutputstream();
request.writeto(bytes);
system.out.println(arrays.tostring(bytes.tobytearray()));
//output: [10, 27, 112, 97, 114, 97, 109, 49, 61, 118, 97, 108, 117, 101, 49, 38, 112, 97, 114, 97, 109, 50, 61, 118, 97, 108, 117, 101, 50, 16, 10, 24, 100]
sys-tem.out.println(searchrequest.newbuilder().mergefrom(request.tobytearray()).build());
//output:
// query_string: "param1=value1&param2=value2"
// page_number: 10
// result_per_page: 100
在protobuf編譯生成的代碼中,作為序列化的核心實作比較簡單,它隻是将每個已經指派的字段的fieldnumber和值一起寫入到codedoutputstream中:
public void writeto(codedoutputstream output) throws ja-va.io.ioexception {
getserializedsize();
if (((bitfield0_ & 0x00000001) == 0x00000001)) {
output.writebytes(1, getquerystringbytes());
}
if (((bitfield0_ & 0x00000002) == 0x00000002)) {
output.writeint32(2, pagenumber_);
if (((bitfield0_ & 0x00000004) == 0x00000004)) {
output.writeint32(3, resultperpage_);
getunknownfields().writeto(output);
而反序列化的核心代碼也基本上是從codedinputstream讀一個tag值,根據從tag值解析出來的fieldnumber值讀取對應字段類型的資料并給字段指派:
private searchrequest(codedinputstream input, extensionregistrylite extensionregistry)
throws invalidprotocolbufferexception {
initfields();
unknownfieldset.builder unknownfields = unknownfieldset.newbuilder();
boolean done = false;
while (!done) {
int tag = input.readtag();
switch (tag) {
case 0: { done = true; break; }
case 10: {
bitfield0_ |= 0x00000001;
querystring_ = input.readbytes();
break;
}
case 16: {
bitfield0_ |= 0x00000002;
pagenumber_ = input.readint32();
case 24: {
bitfield0_ |= 0x00000004;
resultperpage_ = input.readint32();
default: {
if (!parseunknownfield(input, unknownfields, extensionregistry, tag)) {
done = true;
}
}
this.unknownfields = unknownfields.build();
makeextensionsimmutable();
在protobuf消息定義中還支援枚舉類型,使用enum作為關鍵字,并且需要給每個枚舉項定義一個唯一的值,進而在序列化時protobuf實際上是将這個唯一的int值以int32可變長編碼寫入位元組流中,進而節省空間,也正是因為這個值采用int32的編碼方式,因而不推薦給枚舉項賦負數值,因為int32的負數編碼要占用10個位元組空間。
enum corpus {
universal = 10;
web = 11;
images = 12;
protobuf編譯生成的代碼中也會生成一個corpus的枚舉類型,它實作protobufmessageenum接口,并包含index和value字段:
public enum corpus implements protocolmessageenum {
universal(0, 10),
web(1, 11),
images(2, 12);
public final int getnumber() { return value; }
private final int index;
private final int value;
private corpus(int index, int value) {
this.index = index;
this.value = value;
而protocolmessageenum的接口定義了擷取一個枚舉項的值以及其相關的enumvaluedescriptor和enumdescriptor(descriptor将在後面小結中詳細講解):
public interface protocolmessageenum extends internal.enumlite {
int getnumber();
enumvaluedescriptor getvaluedescriptor();
enumdescriptor getdescriptorfortype();
在protobuf中還可以定義一個字段為repeated字段,表示該字段時一個集合,在java版本中使用list表達。對repeated字段,在序列化時對每個值,同時寫入tag值和字段值本身:
message searchresponse {
repeated result result = 1;
repeated int32 stats = 2;
for (int i = 0; i < result_.size(); i++) {
output.writemessage(1, result_.get(i));
for (int i = 0; i < stats_.size(); i++) {
output.writeint32(2, stats_.get(i));
然而這種編碼方式對基本類型來說效率太低,因為每項都要同時包含tag值,是以對基本類型,protobuf還隻是使用packed來提升編碼效率(在反序列化時不管有沒有加packed關鍵字,它都同時支援兩種編碼方式的讀取):
repeated int32 stats = 2 [ packed = true ];
if (getstatslist().size() > 0) {
output.writerawvarint32(18);
output.writerawvarint32(statsmemoizedserializedsize);
output.writeint32notag(stats_.get(i));
在寫架構代碼時,經常由擴充性的需求,在java中,隻需要簡單的定義一個父類或接口即可解決,如果架構本身還負責建構執行個體本身,可以使用反射或暴露factory類也可以順利實作,然而對序列化來說,就很難提供這種動态plugin機制了。然而protobuf還是提出來一個相對可以接受的機制(文法有點怪異,但是至少可以用):在一個message中定義它支援的可擴充字段值的範圍,然後使用者可以使用extend關鍵字擴充該message定義:
message foo {
optional int32 field1 = 1;
extensions 100 to 199;
extend foo {
optional int32 bar = 126;
在protobuf編譯器生成的代碼中,在序列化時,在序列化未知字段之前需要先序列化已經寫入的可擴充字段:
generatedmessage.extendablemessage<foo>.extensionwriter extensionwriter = newextensionwriter();
output.writeint32(1, field1_);
extensionwriter.writeuntil(200, output);
在反序列化時由于可擴充字段在parseunknownfield()方法中解析,因而沒有多少差別,然而在該方法中會使用到extensionregistry執行個體。另外生成的消息類foo繼承自extendablemessage而不是generatedmessage,foo.builder繼承自extendablebuilder而不是generatedmessage.builder。最後還會生成generatedextension<foo, integer>類型的靜态字段bar以及在靜态方法registerallextensions()将該bar字段注冊到extensionregistry執行個體中以供反序列化時使用:
foo foo = foo.newbuilder().setfield1(10)
.setextension(extensionsprotos.bar, 20)
.build();
system.out.println(foo);
// field1: 10
// [levin.protobuf.bar]: 20
extensionregistry registry = extensionregistry.newinstance();
extensionsprotos.registerallextensions(registry);
foo foo2 = foo.newbuilder()
.mergefrom(foo.tobytearray(), registry)
.build();
system.out.println(foo2);
在protobuf中預設的optimize_for選項的值是speed,然而有些時候我們隻需要使用messagelite的功能即可,不需要descriptor和反射,這個時候可以指定該值為lite_runtime。使用該選項,生成的message類直接繼承自generatedmessagelite,并且不會生成那些descriptor資訊,此時生成message類的tostring()方法隻能列印類執行個體因為無法通過反射獲知類的中繼資料資訊。optimize_for選項的另一個可選值是code_size,該選項生成的message類中不會實作writeto()方法,它使用abstractmessage類中實作的writeto()方法,該方法周遊所有有指派的字段,使用反射擷取字段的值,并寫入codedoutputstream中。