高效的序列化/反序列化資料方式 Protobuf
github位址
目錄
- protocolBuffers 序列化
- Int32
- String
- Map
- slice
- 序列化小結
- protocolBuffers 反序列化
- Int32
- String
- Map
- slice
- 序列化小結
- 序列化/反序列化性能
- 最後

protocolBuffers序列化
上篇文章中其實已經講過了
encode
的過程,這篇文章以
golang
為例,從代碼實作的層面講講序列化和反序列化的過程。
舉個
go
使用
protobuf
進行資料序列化和反序列化的例子,本篇文章從這個例子開始。
先建立一個
example
的
message
:
syntax = "proto2";
package example;
enum FOO { X = 17; };
message Test {
required string label = 1;
optional int32 type = 2 [default=77];
repeated int64 reps = 3;
optional group OptionalGroup = 4 {
required string RequiredField = 5;
}
}
利用
protoc-gen-go
生成對應的
get/set
方法。代碼中就可以用生成的代碼進行序列化和反序列化了。
package main
import (
"log"
"github.com/golang/protobuf/proto"
"path/to/example"
)
func main() {
test := &example.Test {
Label: proto.String("hello"),
Type: proto.Int32(17),
Reps: []int64{1, 2, 3},
Optionalgroup: &example.Test_OptionalGroup {
RequiredField: proto.String("good bye"),
},
}
data, err := proto.Marshal(test)
if err != nil {
log.Fatal("marshaling error: ", err)
}
newTest := &example.Test{}
err = proto.Unmarshal(data, newTest)
if err != nil {
log.Fatal("unmarshaling error: ", err)
}
// Now test and newTest contain the same data.
if test.GetLabel() != newTest.GetLabel() {
log.Fatalf("data mismatch %q != %q", test.GetLabel(), newTest.GetLabel())
}
// etc.
}
上面代碼中
proto.Marshal()
是序列化過程。
proto.Unmarshal()
是反序列化過程。這一章節先看看序列化過程的實作,下一章節再分析反序列化過程的實作。
// Marshal takes the protocol buffer
// and encodes it into the wire format, returning the data.
func Marshal(pb Message) ([]byte, error) {
// Can the object marshal itself?
if m, ok := pb.(Marshaler); ok {
return m.Marshal()
}
p := NewBuffer(nil)
err := p.Marshal(pb)
if p.buf == nil && err == nil {
// Return a non-nil slice on success.
return []byte{}, nil
}
return p.buf, err
}
序列化函數一進來,會先調用
message
對象自身的實作的序列化方法。
// Marshaler is the interface representing objects that can marshal themselves.
type Marshaler interface {
Marshal() ([]byte, error)
}
Marshaler
是一個
interface
,這個接口是專門留給對象自定義序列化的。如果有實作,就
return
自己實作的方法。如果沒有,接下來就進行預設序列化方式。
p := NewBuffer(nil)
err := p.Marshal(pb)
if p.buf == nil && err == nil {
// Return a non-nil slice on success.
return []byte{}, nil
}
建立一個
Buffer
,調用
Buffer
的
Marshal()
方法。
message
經過序列化以後,資料流會放到
Buffer
的
buf
位元組流中。序列化最終傳回
buf
位元組流即可。
type Buffer struct {
buf []byte // encode/decode byte stream
index int // read point
// pools of basic types to amortize allocation.
bools []bool
uint32s []uint32
uint64s []uint64
// extra pools, only used with pointer_reflect.go
int32s []int32
int64s []int64
float32s []float32
float64s []float64
}
Buffer
的資料結構如上,
Buffer
是用于序列化和反序列化
protocol buffers
的緩沖區管理器。它可以在調用的時候重用以減少記憶體使用量。内部維護了 7 個
pool
,3 個基礎資料類型的
pool
,4 個隻能被
pointer_reflect
使用的
pool
。
func (p *Buffer) Marshal(pb Message) error {
// Can the object marshal itself?
if m, ok := pb.(Marshaler); ok {
data, err := m.Marshal()
p.buf = append(p.buf, data...)
return err
}
t, base, err := getbase(pb)
// 異常處理
if structPointer_IsNil(base) {
return ErrNil
}
if err == nil {
err = p.enc_struct(GetProperties(t.Elem()), base)
}
// 用來統計 Encode 次數的
if collectStats {
(stats).Encode++ // Parens are to work around a goimports bug.
}
// maxMarshalSize = 1<<31 - 1,這個值是 protobuf 可以 encoded 的最大值。
if len(p.buf) > maxMarshalSize {
return ErrTooLarge
}
return err
}
Buffer
的
Marshal()
方法依舊先調用一下對象是否實作了
Marshal()
接口,如果實作了,還是讓它自己序列化,序列化之後的二進制資料流加入到
buf
資料流中。
func getbase(pb Message) (t reflect.Type, b structPointer, err error) {
if pb == nil {
err = ErrNil
return
}
// get the reflect type of the pointer to the struct.
t = reflect.TypeOf(pb)
// get the address of the struct.
value := reflect.ValueOf(pb)
b = toStructPointer(value)
return
}
getbase
方法通過
reflect
方法拿到了
message
的類型和對應
value
的結構體指針。拿到結構體指針先做異常處理。
是以序列化最核心的代碼其實就一句,
p.enc_struct(GetProperties(t.Elem()), base)
// Encode a struct.
func (o *Buffer) enc_struct(prop *StructProperties, base structPointer) error {
var state errorState
// Encode fields in tag order so that decoders may use optimizations
// that depend on the ordering.
// https://developers.google.com/protocol-buffers/docs/encoding#order
for _, i := range prop.order {
p := prop.Prop[i]
if p.enc != nil {
err := p.enc(o, p, base)
if err != nil {
if err == ErrNil {
if p.Required && state.err == nil {
state.err = &RequiredNotSetError{p.Name}
}
} else if err == errRepeatedHasNil {
// Give more context to nil values in repeated fields.
return errors.New("repeated field " + p.OrigName + " has nil element")
} else if !state.shouldContinue(err, p) {
return err
}
}
if len(o.buf) > maxMarshalSize {
return ErrTooLarge
}
}
}
// Do oneof fields.
if prop.oneofMarshaler != nil {
m := structPointer_Interface(base, prop.stype).(Message)
if err := prop.oneofMarshaler(m, o); err == ErrNil {
return errOneofHasNil
} else if err != nil {
return err
}
}
// Add unrecognized fields at the end.
if prop.unrecField.IsValid() {
v := *structPointer_Bytes(base, prop.unrecField)
if len(o.buf)+len(v) > maxMarshalSize {
return ErrTooLarge
}
if len(v) > 0 {
o.buf = append(o.buf, v...)
}
}
return state.err
}
上面代碼中可以看到,除去
oneof fields
和
unrecognized fields
是單獨最後處理的,其他類型都是調用的
p.enc(o, p, base)
進行序列化的。
Properties
的資料結構定義如下:
type Properties struct {
Name string // name of the field, for error messages
OrigName string // original name before protocol compiler (always set)
JSONName string // name to use for JSON; determined by protoc
Wire string
WireType int
Tag int
Required bool
Optional bool
Repeated bool
Packed bool // relevant for repeated primitives only
Enum string // set for enum types only
proto3 bool // whether this is known to be a proto3 field; set for []byte only
oneof bool // whether this is a oneof field
Default string // default value
HasDefault bool // whether an explicit default was provided
CustomType string
StdTime bool
StdDuration bool
enc encoder
valEnc valueEncoder // set for bool and numeric types only
field field
tagcode []byte // encoding of EncodeVarint((Tag<<3)|WireType)
tagbuf [8]byte
stype reflect.Type // set for struct types only
sstype reflect.Type // set for slices of structs types only
ctype reflect.Type // set for custom types only
sprop * StructProperties // set for struct types only
isMarshaler bool
isUnmarshaler bool
mtype reflect.Type // set for map types only
mkeyprop * Properties // set for map types only
mvalprop * Properties // set for map types only
size sizer
valSize valueSizer // set for bool and numeric types only
dec decoder
valDec valueDecoder // set for bool and numeric types only
// If this is a packable field, this will be the decoder for the packed version of the field.
packedDec decoder
}
在
Properties
這個結構體中,定義了名為
enc
的
encoder
和名為
dec
的
decoder
。
encoder
和
decoder
函數定義是完全一樣的。
type encoder func(p *Buffer, prop *Properties, base structPointer) error
type decoder func(p *Buffer, prop *Properties, base structPointer) error
encoder
和
decoder
函數初始化是在
Properties
中:
// Initialize the fields for encoding and decoding.
func (p *Properties) setEncAndDec(typ reflect.Type, f *reflect.StructField, lockGetProp bool) {
// 下面代碼有删減,類似的部分省略了
// proto3 scalar types
case reflect.Int32:
if p.proto3 {
p.enc = (*Buffer).enc_proto3_int32
p.dec = (*Buffer).dec_proto3_int32
p.size = size_proto3_int32
} else {
p.enc = (*Buffer).enc_ref_int32
p.dec = (*Buffer).dec_proto3_int32
p.size = size_ref_int32
}
case reflect.Uint32:
if p.proto3 {
p.enc = (*Buffer).enc_proto3_uint32
p.dec = (*Buffer).dec_proto3_int32 // can reuse
p.size = size_proto3_uint32
} else {
p.enc = (*Buffer).enc_ref_uint32
p.dec = (*Buffer).dec_proto3_int32 // can reuse
p.size = size_ref_uint32
}
case reflect.Float32:
if p.proto3 {
p.enc = (*Buffer).enc_proto3_uint32 // can just treat them as bits
p.dec = (*Buffer).dec_proto3_int32
p.size = size_proto3_uint32
} else {
p.enc = (*Buffer).enc_ref_uint32 // can just treat them as bits
p.dec = (*Buffer).dec_proto3_int32
p.size = size_ref_uint32
}
case reflect.String:
if p.proto3 {
p.enc = (*Buffer).enc_proto3_string
p.dec = (*Buffer).dec_proto3_string
p.size = size_proto3_string
} else {
p.enc = (*Buffer).enc_ref_string
p.dec = (*Buffer).dec_proto3_string
p.size = size_ref_string
}
case reflect.Slice:
switch t2 := t1.Elem(); t2.Kind() {
default:
logNoSliceEnc(t1, t2)
break
case reflect.Int32:
if p.Packed {
p.enc = (*Buffer).enc_slice_packed_int32
p.size = size_slice_packed_int32
} else {
p.enc = (*Buffer).enc_slice_int32
p.size = size_slice_int32
}
p.dec = (*Buffer).dec_slice_int32
p.packedDec = (*Buffer).dec_slice_packed_int32
default:
logNoSliceEnc(t1, t2)
break
}
}
case reflect.Map:
p.enc = (*Buffer).enc_new_map
p.dec = (*Buffer).dec_new_map
p.size = size_new_map
p.mtype = t1
p.mkeyprop = &Properties{}
p.mkeyprop.init(reflect.PtrTo(p.mtype.Key()), "Key", f.Tag.Get("protobuf_key"), nil, lockGetProp)
p.mvalprop = &Properties{}
vtype := p.mtype.Elem()
if vtype.Kind() != reflect.Ptr && vtype.Kind() != reflect.Slice {
// The value type is not a message (*T) or bytes ([]byte),
// so we need encoders for the pointer to this type.
vtype = reflect.PtrTo(vtype)
}
p.mvalprop.CustomType = p.CustomType
p.mvalprop.StdDuration = p.StdDuration
p.mvalprop.StdTime = p.StdTime
p.mvalprop.init(vtype, "Value", f.Tag.Get("protobuf_val"), nil, lockGetProp)
}
p.setTag(lockGetProp)
}
上面代碼中,分别把各個類型都進行
switch - case
枚舉,每種情況都設定對應的
encode
編碼器,
decode
解碼器,
size
大小。
proto2
和
proto3
有差別的地方也分成2種不同的情況進行處理。
有以下幾種類型,
reflect.Bool
、
reflect.Int32
、
reflect.Uint32
、
reflect.Int64
、
reflect.Uint64
、
reflect.Float32
、
reflect.Float64
、
reflect.String
、
reflect.Struct
、
reflect.Ptr、reflect.Slice
、
reflect.Map
共 12 種大的分類。
下面主要挑 3 類,
Int32
、
String
、
Map
代碼實作進行分析。
Int32
func (o *Buffer) enc_proto3_int32(p *Properties, base structPointer) error {
v := structPointer_Word32Val(base, p.field)
x := int32(word32Val_Get(v)) // permit sign extension to use full 64-bit range
if x == 0 {
return ErrNil
}
o.buf = append(o.buf, p.tagcode...)
p.valEnc(o, uint64(x))
return nil
}
處理
Int32
代碼比較簡單,先把
tagcode
放進
buf
二進制資料流緩沖區,接着序列化
Int32
,序列化以後緊接着
tagcode
後面放進緩沖區。
// EncodeVarint writes a varint-encoded integer to the Buffer.
// This is the format for the
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
func (p *Buffer) EncodeVarint(x uint64) error {
for x >= 1<<7 {
p.buf = append(p.buf, uint8(x&0x7f|0x80))
x >>= 7
}
p.buf = append(p.buf, uint8(x))
return nil
}
Int32
的編碼處理方法在上篇裡面講過,用的
Varint
處理方法。上面這個函數同樣适用于處理
int32
,
int64
,
uint32
,
uint64
,
bool
,
enum
。
順道也可以看看
sint32
、
Fixed32
的具體代碼實作。
// EncodeZigzag32 writes a zigzag-encoded 32-bit integer
// to the Buffer.
// This is the format used for the sint32 protocol buffer type.
func (p *Buffer) EncodeZigzag32(x uint64) error {
// use signed number to get arithmetic right shift.
return p.EncodeVarint(uint64((uint32(x) << 1) ^ uint32((int32(x) >> 31))))
}
針對有符号的
sint32
,采取的是先
Zigzag
,然後在
Varint
的處理方式。
// EncodeFixed32 writes a 32-bit integer to the Buffer.
// This is the format for the
// fixed32, sfixed32, and float protocol buffer types.
func (p *Buffer) EncodeFixed32(x uint64) error {
p.buf = append(p.buf,
uint8(x),
uint8(x>>8),
uint8(x>>16),
uint8(x>>24))
return nil
}
對于
Fixed32
的處理,僅僅隻是位移操作,并沒有做什麼壓縮操作。
String
func (o *Buffer) enc_proto3_string(p *Properties, base structPointer) error {
v := *structPointer_StringVal(base, p.field)
if v == "" {
return ErrNil
}
o.buf = append(o.buf, p.tagcode...)
o.EncodeStringBytes(v)
return nil
}
序列化字元串也分2步,先把
tagcode
放進去,然後再序列化資料。
// EncodeStringBytes writes an encoded string to the Buffer.
// This is the format used for the proto2 string type.
func (p *Buffer) EncodeStringBytes(s string) error {
p.EncodeVarint(uint64(len(s)))
p.buf = append(p.buf, s...)
return nil
}
序列化字元串的時候,會先把字元串的長度通過編碼
Varint
的方式,寫到
buf
中。長度後面再緊跟着
string
。這也就是
tag - length - value
的實作。
Map
// Encode a map field.
func (o *Buffer) enc_new_map(p *Properties, base structPointer) error {
var state errorState // XXX: or do we need to plumb this through?
v := structPointer_NewAt(base, p.field, p.mtype).Elem() // map[K]V
if v.Len() == 0 {
return nil
}
keycopy, valcopy, keybase, valbase := mapEncodeScratch(p.mtype)
enc := func() error {
if err := p.mkeyprop.enc(o, p.mkeyprop, keybase); err != nil {
return err
}
if err := p.mvalprop.enc(o, p.mvalprop, valbase); err != nil && err != ErrNil {
return err
}
return nil
}
// Don't sort map keys. It is not required by the spec, and C++ doesn't do it.
for _, key := range v.MapKeys() {
val := v.MapIndex(key)
keycopy.Set(key)
valcopy.Set(val)
o.buf = append(o.buf, p.tagcode...)
if err := o.enc_len_thing(enc, &state); err != nil {
return err
}
}
return nil
}
上述代碼也可以序列化字典數組,例如:
轉換成對應的
repeated message
形式再進行序列化。
message MapFieldEntry {
key_type key = 1;
value_type value = 2;
}
repeated MapFieldEntry map_field = N;
map
序列化是針對每個
k-v
,都先放入
tagcode
,然後再序列化
k-v
。這裡需要化未知長度的結構體的時候需要調用
enc_len_thing()
方法。
// Encode something, preceded by its encoded length (as a varint).
func (o *Buffer) enc_len_thing(enc func() error, state *errorState) error {
iLen := len(o.buf)
o.buf = append(o.buf, 0, 0, 0, 0) // reserve four bytes for length
iMsg := len(o.buf)
err := enc()
if err != nil && !state.shouldContinue(err, nil) {
return err
}
lMsg := len(o.buf) - iMsg
lLen := sizeVarint(uint64(lMsg))
switch x := lLen - (iMsg - iLen); {
case x > 0: // actual length is x bytes larger than the space we reserved
// Move msg x bytes right.
o.buf = append(o.buf, zeroes[:x]...)
copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])
case x < 0: // actual length is x bytes smaller than the space we reserved
// Move msg x bytes left.
copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])
o.buf = o.buf[:len(o.buf)+x] // x is negative
}
// Encode the length in the reserved space.
o.buf = o.buf[:iLen]
o.EncodeVarint(uint64(lMsg))
o.buf = o.buf[:len(o.buf)+lMsg]
return state.err
}
enc_len_thing()
方法會先預存 4 個位元組的長度空位。序列化以後算出長度。如果長度比 4 個位元組還要長,則右移序列化的二進制資料,把長度填到
tagcode
和資料之間。如果長度小于 4 個位元組,相應的要左移。
slice
最後再舉一個數組的例子。以
[]int32
為例。
// Encode a slice of int32s ([]int32) in packed format.
func (o *Buffer) enc_slice_packed_int32(p *Properties, base structPointer) error {
s := structPointer_Word32Slice(base, p.field)
l := s.Len()
if l == 0 {
return ErrNil
}
// TODO: Reuse a Buffer.
buf := NewBuffer(nil)
for i := 0; i < l; i++ {
x := int32(s.Index(i)) // permit sign extension to use full 64-bit range
p.valEnc(buf, uint64(x))
}
o.buf = append(o.buf, p.tagcode...)
o.EncodeVarint(uint64(len(buf.buf)))
o.buf = append(o.buf, buf.buf...)
return nil
}
序列化這個數組,分3步,先把
tagcode
放進去,然後再序列化整個數組的長度,最後把數組的每個資料都序列化放在後面。最後形成
tag - length - value - value - value
的形式。
上述就是
Protocol Buffer
序列化的過程。
序列化小結
Protocol Buffer
序列化采用
Varint、Zigzag
方法,壓縮
int
型整數和帶符号的整數。對浮點型數字不做壓縮(這裡可以進一步的壓縮,
Protocol Buffer
還有提升空間)。編碼
.proto
檔案,會對
option
和
repeated
字段進行檢查,若
optional
或
repeated
字段沒有被設定字段值,那麼該字段在序列化時的資料中是完全不存在的,即不進行序列化(少編碼一個字段)。
上面這兩點做到了壓縮資料,序列化工作量減少。
序列化的過程都是二進制的位移,速度非常快。資料都以
tag - length - value
(或者
tag - value
)的形式存在二進制資料流中。采用了
TLV
結構存儲資料以後,也擺脫了
JSON
中的
{、}、; 、
這些分隔符,沒有這些分隔符也算是再一次減少了一部分資料。
這一點做到了序列化速度非常快。
回到頂部
protocolBuffers反序列化
反序列化的實作完全是序列化實作的逆過程。
func Unmarshal(buf []byte, pb Message) error {
pb.Reset()
return UnmarshalMerge(buf, pb)
}
在反序列化開始之前,先重置一下緩沖區。
func (p *Buffer) Reset() {
p.buf = p.buf[0:0] // for reading/writing
p.index = 0 // for reading
}
清空
buf
中的所有資料,并且重置
index
。
func UnmarshalMerge(buf []byte, pb Message) error {
// If the object can unmarshal itself, let it.
if u, ok := pb.(Unmarshaler); ok {
return u.Unmarshal(buf)
}
return NewBuffer(buf).Unmarshal(pb)
}
反序列化資料的開始從上面這個函數開始,如果傳進來的
message
的結果和
buf
結果不比對,最終得到的結果是不可預知的。反序列化之前,同樣會先調用一下對應自己身自定義的
Unmarshal()
方法。
type Unmarshaler interface {
Unmarshal([]byte) error
}
Unmarshal()
是一個可以自己實作的接口。
UnmarshalMerge
中會調用
Unmarshal(pb Message)
方法。
func (p *Buffer) Unmarshal(pb Message) error {
// If the object can unmarshal itself, let it.
if u, ok := pb.(Unmarshaler); ok {
err := u.Unmarshal(p.buf[p.index:])
p.index = len(p.buf)
return err
}
typ, base, err := getbase(pb)
if err != nil {
return err
}
err = p.unmarshalType(typ.Elem(), GetProperties(typ.Elem()), false, base)
if collectStats {
stats.Decode++
}
return err
}
Unmarshal(pb Message)
這個函數隻有一個入參,和
proto.Unmarshal()
方法函數簽名不同(前面的函數隻有 1 個入參,後面的有 2 個入參)。兩者的差別在于,1 個入參的函數實作裡面并不會重置
buf
緩沖區,二個入參的會先重置
buf
緩沖區。
這兩個函數最終都會調用
unmarshalType()
方法,這個函數是最終支援反序列化的函數。
func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error {
var state errorState
required, reqFields := prop.reqCount, uint64(0)
var err error
for err == nil && o.index < len(o.buf) {
oi := o.index
var u uint64
u, err = o.DecodeVarint()
if err != nil {
break
}
wire := int(u & 0x7)
// 下面代碼有省略
dec := p.dec
// 中間代碼有省略
decErr := dec(o, p, base)
if decErr != nil && !state.shouldContinue(decErr, p) {
err = decErr
}
if err == nil && p.Required {
// Successfully decoded a required field.
if tag <= 64 {
// use bitmap for fields 1-64 to catch field reuse.
var mask uint64 = 1 << uint64(tag-1)
if reqFields&mask == 0 {
// new required field
reqFields |= mask
required--
}
} else {
// This is imprecise. It can be fooled by a required field
// with a tag > 64 that is encoded twice; that's very rare.
// A fully correct implementation would require allocating
// a data structure, which we would like to avoid.
required--
}
}
}
if err == nil {
if is_group {
return io.ErrUnexpectedEOF
}
if state.err != nil {
return state.err
}
if required > 0 {
// Not enough information to determine the exact field. If we use extra
// CPU, we could determine the field only if the missing required field
// has a tag <= 64 and we check reqFields.
return &RequiredNotSetError{"{Unknown}"}
}
}
return err
}
unmarshalType()
函數比較長,裡面處理的情況比較多,有
oneof,WireEndGroup
。真正處理反序列化的函數在
decErr := dec(o, p, base)
這一行。
dec
函數在
Properties
的
setEncAndDec()
函數中進行了初始化。上面序列化的時候談到過那個函數了,這裡就不再贅述了。
dec()
函數針對每個不同類型都有對應的反序列化函數。
同樣的,接下來也舉 4 個例子,看看反序列化的實際代碼實作。
Int32
func (o *Buffer) dec_proto3_int32(p *Properties, base structPointer) error {
u, err := p.valDec(o)
if err != nil {
return err
}
word32Val_Set(structPointer_Word32Val(base, p.field), uint32(u))
return nil
}
反序列化
Int32
代碼比較簡單,原理是按照
encode
的逆過程,還原原來的資料。
func (p *Buffer) DecodeVarint() (x uint64, err error) {
i := p.index
buf := p.buf
if i >= len(buf) {
return 0, io.ErrUnexpectedEOF
} else if buf[i] < 0x80 {
p.index++
return uint64(buf[i]), nil
} else if len(buf)-i < 10 {
return p.decodeVarintSlow()
}
var b uint64
// we already checked the first byte
x = uint64(buf[i]) - 0x80
i++
b = uint64(buf[i])
i++
x += b << 7
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 7
b = uint64(buf[i])
i++
x += b << 14
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 14
b = uint64(buf[i])
i++
x += b << 21
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 21
b = uint64(buf[i])
i++
x += b << 28
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 28
b = uint64(buf[i])
i++
x += b << 35
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 35
b = uint64(buf[i])
i++
x += b << 42
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 42
b = uint64(buf[i])
i++
x += b << 49
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 49
b = uint64(buf[i])
i++
x += b << 56
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 56
b = uint64(buf[i])
i++
x += b << 63
if b&0x80 == 0 {
goto done
}
// x -= 0x80 << 63 // Always zero.
return 0, errOverflow
done:
p.index = i
return x, nil
}
Int32
序列化之後,第一個位元組一定是
0x80
,那麼除去這個位元組以後,後面的每個二進制位元組都是資料,剩下的步驟就是通過位移操作把每個數字都加起來。上面這個反序列化的函數同樣适用于
int32
,
int64
,
uint32
,
uint64
,
bool
, 和
enum
。
順道也可以看看
sint32
、
Fixed32
的反序列化具體代碼實作。
func (p *Buffer) DecodeZigzag32() (x uint64, err error) {
x, err = p.DecodeVarint()
if err != nil {
return
}
x = uint64((uint32(x) >> 1) ^ uint32((int32(x&1)<<31)>>31))
return
}
針對有符号的
sint32
,反序列化的過程就是先反序列
Varint
,再反序列化
Zigzag
。
func (p *Buffer) DecodeFixed32() (x uint64, err error) {
// x, err already 0
i := p.index + 4
if i < 0 || i > len(p.buf) {
err = io.ErrUnexpectedEOF
return
}
p.index = i
x = uint64(p.buf[i-4])
x |= uint64(p.buf[i-3]) << 8
x |= uint64(p.buf[i-2]) << 16
x |= uint64(p.buf[i-1]) << 24
return
}
Fixed32
反序列化的過程也是通過位移,每個位元組的内容都累加,就可以還原出原先的資料。注意這裡也要先跳過
tag
的位置。
String
func (p *Buffer) DecodeRawBytes(alloc bool) (buf []byte, err error) {
n, err := p.DecodeVarint()
if err != nil {
return nil, err
}
nb := int(n)
if nb < 0 {
return nil, fmt.Errorf("proto: bad byte length %d", nb)
}
end := p.index + nb
if end < p.index || end > len(p.buf) {
return nil, io.ErrUnexpectedEOF
}
if !alloc {
// todo: check if can get more uses of alloc=false
buf = p.buf[p.index:end]
p.index += nb
return
}
buf = make([]byte, nb)
copy(buf, p.buf[p.index:])
p.index += nb
return
}
反序列化
string
先把
length
序列化出來,通過
DecodeVarint
的方式。拿到
length
以後,剩下的就是直接拷貝的過程。在上篇
encode
中,我們知道字元串是不做處理,直接放到二進制流裡面的,是以反序列化直接取出即可。
Map
func (o *Buffer) dec_new_map(p *Properties, base structPointer) error {
raw, err := o.DecodeRawBytes(false)
if err != nil {
return err
}
oi := o.index // index at the end of this map entry
o.index -= len(raw) // move buffer back to start of map entry
mptr := structPointer_NewAt(base, p.field, p.mtype) // *map[K]V
if mptr.Elem().IsNil() {
mptr.Elem().Set(reflect.MakeMap(mptr.Type().Elem()))
}
v := mptr.Elem() // map[K]V
// 這裡省略一些代碼,主要是為了 key - value 準備的一些可以雙重間接尋址的占位符,具體原因可以見序列化代碼裡面的 enc_new_map 函數
// Decode.
// This parses a restricted wire format, namely the encoding of a message
// with two fields. See enc_new_map for the format.
for o.index < oi {
// tagcode for key and value properties are always a single byte
// because they have tags 1 and 2.
tagcode := o.buf[o.index]
o.index++
switch tagcode {
case p.mkeyprop.tagcode[0]:
if err := p.mkeyprop.dec(o, p.mkeyprop, keybase); err != nil {
return err
}
case p.mvalprop.tagcode[0]:
if err := p.mvalprop.dec(o, p.mvalprop, valbase); err != nil {
return err
}
default:
// TODO: Should we silently skip this instead?
return fmt.Errorf("proto: bad map data tag %d", raw[0])
}
}
keyelem, valelem := keyptr.Elem(), valptr.Elem()
if !keyelem.IsValid() {
keyelem = reflect.Zero(p.mtype.Key())
}
if !valelem.IsValid() {
valelem = reflect.Zero(p.mtype.Elem())
}
v.SetMapIndex(keyelem, valelem)
return nil
}
反序列化
map
需要把每個
tag
取出來,然後緊接着反序列化每個
key - value
。最後會判斷
keyelem
和
valelem
是否為零值,如果是零值要分别調用
reflect.Zero
處理零值的情況。
slice
最後還是舉一個數組的例子。以
[]int32
為例。
func (o *Buffer) dec_slice_packed_int32(p *Properties, base structPointer) error {
v := structPointer_Word32Slice(base, p.field)
nn, err := o.DecodeVarint()
if err != nil {
return err
}
nb := int(nn) // number of bytes of encoded int32s
fin := o.index + nb
if fin < o.index {
return errOverflow
}
for o.index < fin {
u, err := p.valDec(o)
if err != nil {
return err
}
v.Append(uint32(u))
}
return nil
}
反序列化這個數組,分2步,跳過
tagcode
拿到
length
,反序列化
length
。在
length
這個長度中依次反序列化各個
value
。
上述就是
Protocol Buffer
反序列化的過程。
序列化小結
Protocol Buffer
反序列化直接讀取二進制位元組資料流,反序列化就是
encode
的反過程,同樣是一些二進制操作。反序列化的時候,通常隻需要用到
length
。
tag
值隻是用來辨別類型的,
Properties
的
setEncAndDec()
方法裡面已經把每個類型對應的
decode
解碼器初始化好了,是以反序列化的時候,
tag
值可以直接跳過,從
length
開始處理。
XML
的解析過程就複雜一些。
XML
需要從檔案中讀取出字元串,再轉換為
XML
文檔對象結構模型。之後,再從
XML
文檔對象結構模型中讀取指定節點的字元串,最後再将這個字元串轉換成指定類型的變量。這個過程非常複雜,其中将
XML
檔案轉換為文檔對象結構模型的過程通常需要完成詞法文法分析等大量消耗
CPU
的複雜計算。
回到頂部
序列化/反序列化性能
Protocol Buffer
一直被人們認為是高性能的存在。也有很多人做過實作,驗證了這一說法。例如這個連結裡面的實驗 jvm-serializers。
在看資料之前,我們可以先理性的分析一下
Protocol Buffer
和
JSON
、
XML
這些比有哪些優勢:
Protobuf
采用了
Varint
、
Zigzag
大幅的壓縮了整數類型,也沒有
JSON
裡面的
{、}、;、
這些資料分隔符,有
option
字段辨別的,沒有資料的時候不會進行反序列化。這幾個措施導緻
pb
的資料量整體的就比
JSON
少很多。
Protobuf
采取的是
TLV
的形式,
JSON
這些都是字元串的形式。字元串比對應該比基于數字的字段
tag
更耗時。
Protobuf
在正文前有一個大小或者長度的标記,而
JSON
必須全文掃描無法跳過不需要的字段。
下面這張圖來自參考連結裡面的 《Protobuf有沒有比JSON快5倍?用代碼來擊破pb性能神話》:
從這個實驗來看,确實
Protobuf
在序列化數字這方面性能是非常強悍的。
序列化 / 反序列化數字确實是
Protobuf
針對
JSON
和
XML
的優勢,但是它也存在一些沒有優勢的地方。比如字元串。字元串在
Protobuf
中基本沒有處理,除了前面加了
tag - length
。在序列化 / 反序列化字元串的過程中,字元串拷貝的速度反而決定的真正的速度。
從上圖可以看到
encode
字元串的時候,速度基本和
JSON
相差無幾。
回到頂部
最後
至此,關于
protocol buffers
的所有,讀者應該了然于胸了。
protocol buffers
誕生之初也并不是為了傳輸資料存在的,隻是為了解決伺服器多版本協定相容的問題。實質其實是發明了一個新的跨語言無歧義的
IDL (Interface description language)
。隻不過人們後來發現用它來傳輸資料也不錯,才開始用
protocol buffers
。
想用
protocol buffers
替換
JSON
,可能是考慮到:
-
相同資料,傳輸的資料量比protocol buffers
小,JSON
或者gzip
壓縮以後,網絡傳輸消耗較少。7zip
-
不是自我描述的,在缺少protocol buffers
檔案以後,有一定的加密性,資料傳輸過程中都是二進制流,并不是明文。.proto
-
提供了一套工具,自動化生成代碼也非常友善。protocol buffers
-
具有向後相容性,改變了資料結構以後,對老的版本沒有影響。protocol buffers
-
原生完美相容protocol buffers
調用。RPC
如果很少用到整型數字,浮點型數字,全部都是字元串資料,那麼
JSON 和 protocol buffers
性能不會差太多。純前端之間互動的話,選擇
JSON
或者
protocol buffers
差别不是很大。
與後端互動過程中,用到
protocol buffers
比較多,筆者認為選擇
protocol buffers
除了性能強以外,完美相容
RPC
調用也是一個重要因素。
回到頂部