天天看點

高效的序列化/反序列化資料方式 Protobuf高效的序列化/反序列化資料方式 Protobuf

高效的序列化/反序列化資料方式 Protobuf

github位址

目錄

  • protocolBuffers 序列化
    • Int32
    • String
    • Map
    • slice
    • 序列化小結
  • protocolBuffers 反序列化
    • Int32
    • String
    • Map
    • slice
    • 序列化小結
  • 序列化/反序列化性能
  • 最後
高效的序列化/反序列化資料方式 Protobuf高效的序列化/反序列化資料方式 Protobuf

protocolBuffers序列化

上篇文章中其實已經講過了

encode

的過程,這篇文章以

golang

為例,從代碼實作的層面講講序列化和反序列化的過程。

舉個

go

使用

protobuf

進行資料序列化和反序列化的例子,本篇文章從這個例子開始。

先建立一個

example

message

syntax = "proto2";
package example;

enum FOO { X = 17; };

message Test {
  required string label = 1;
  optional int32 type = 2 [default=77];
  repeated int64 reps = 3;
  optional group OptionalGroup = 4 {
    required string RequiredField = 5;
  }
}
           

利用

protoc-gen-go

生成對應的

get/set

方法。代碼中就可以用生成的代碼進行序列化和反序列化了。

package main

import (
  "log"

  "github.com/golang/protobuf/proto"
  "path/to/example"
)

func main() {
  test := &example.Test {
    Label: proto.String("hello"),
    Type:  proto.Int32(17),
    Reps:  []int64{1, 2, 3},
    Optionalgroup: &example.Test_OptionalGroup {
      RequiredField: proto.String("good bye"),
    },
  }
  data, err := proto.Marshal(test)
  if err != nil {
    log.Fatal("marshaling error: ", err)
  }
  newTest := &example.Test{}
  err = proto.Unmarshal(data, newTest)
  if err != nil {
    log.Fatal("unmarshaling error: ", err)
  }
  // Now test and newTest contain the same data.
  if test.GetLabel() != newTest.GetLabel() {
    log.Fatalf("data mismatch %q != %q", test.GetLabel(), newTest.GetLabel())
  }
  // etc.
}
           

上面代碼中

proto.Marshal()

是序列化過程。

proto.Unmarshal()

是反序列化過程。這一章節先看看序列化過程的實作,下一章節再分析反序列化過程的實作。

// Marshal takes the protocol buffer
// and encodes it into the wire format, returning the data.
func Marshal(pb Message) ([]byte, error) {
	// Can the object marshal itself?
	if m, ok := pb.(Marshaler); ok {
		return m.Marshal()
	}
	p := NewBuffer(nil)
	err := p.Marshal(pb)
	if p.buf == nil && err == nil {
		// Return a non-nil slice on success.
		return []byte{}, nil
	}
	return p.buf, err
}
           

序列化函數一進來,會先調用

message

對象自身的實作的序列化方法。

// Marshaler is the interface representing objects that can marshal themselves.
type Marshaler interface {
	Marshal() ([]byte, error)
}
           

Marshaler

是一個

interface

,這個接口是專門留給對象自定義序列化的。如果有實作,就

return

自己實作的方法。如果沒有,接下來就進行預設序列化方式。

p := NewBuffer(nil)
err := p.Marshal(pb)
if p.buf == nil && err == nil {
  // Return a non-nil slice on success.
  return []byte{}, nil
}
           

建立一個

Buffer

,調用

Buffer

Marshal()

方法。

message

經過序列化以後,資料流會放到

Buffer

buf

位元組流中。序列化最終傳回

buf

位元組流即可。

type Buffer struct {
	buf   []byte // encode/decode byte stream
	index int    // read point

	// pools of basic types to amortize allocation.
	bools   []bool
	uint32s []uint32
	uint64s []uint64

	// extra pools, only used with pointer_reflect.go
	int32s   []int32
	int64s   []int64
	float32s []float32
	float64s []float64
}
           

Buffer

的資料結構如上,

Buffer

是用于序列化和反序列化

protocol buffers

的緩沖區管理器。它可以在調用的時候重用以減少記憶體使用量。内部維護了 7 個

pool

,3 個基礎資料類型的

pool

,4 個隻能被

pointer_reflect

使用的

pool

func (p *Buffer) Marshal(pb Message) error {
	// Can the object marshal itself?
	if m, ok := pb.(Marshaler); ok {
		data, err := m.Marshal()
		p.buf = append(p.buf, data...)
		return err
	}

	t, base, err := getbase(pb)
	// 異常處理
	if structPointer_IsNil(base) {
		return ErrNil
	}
	if err == nil {
		err = p.enc_struct(GetProperties(t.Elem()), base)
	}

	// 用來統計 Encode 次數的
	if collectStats {
		(stats).Encode++ // Parens are to work around a goimports bug.
	}
	// maxMarshalSize = 1<<31 - 1,這個值是 protobuf 可以 encoded 的最大值。
	if len(p.buf) > maxMarshalSize {
		return ErrTooLarge
	}
	return err
}
           

Buffer

Marshal()

方法依舊先調用一下對象是否實作了

Marshal()

接口,如果實作了,還是讓它自己序列化,序列化之後的二進制資料流加入到

buf

資料流中。

func getbase(pb Message) (t reflect.Type, b structPointer, err error) {
	if pb == nil {
		err = ErrNil
		return
	}
	// get the reflect type of the pointer to the struct.
	t = reflect.TypeOf(pb)
	// get the address of the struct.
	value := reflect.ValueOf(pb)
	b = toStructPointer(value)
	return
}
           

getbase

方法通過

reflect

方法拿到了

message

的類型和對應

value

的結構體指針。拿到結構體指針先做異常處理。

是以序列化最核心的代碼其實就一句,

p.enc_struct(GetProperties(t.Elem()), base)

// Encode a struct.
func (o *Buffer) enc_struct(prop *StructProperties, base structPointer) error {
	var state errorState
	// Encode fields in tag order so that decoders may use optimizations
	// that depend on the ordering.
	// https://developers.google.com/protocol-buffers/docs/encoding#order
	for _, i := range prop.order {
		p := prop.Prop[i]
		if p.enc != nil {
			err := p.enc(o, p, base)
			if err != nil {
				if err == ErrNil {
					if p.Required && state.err == nil {
						state.err = &RequiredNotSetError{p.Name}
					}
				} else if err == errRepeatedHasNil {
					// Give more context to nil values in repeated fields.
					return errors.New("repeated field " + p.OrigName + " has nil element")
				} else if !state.shouldContinue(err, p) {
					return err
				}
			}
			if len(o.buf) > maxMarshalSize {
				return ErrTooLarge
			}
		}
	}

	// Do oneof fields.
	if prop.oneofMarshaler != nil {
		m := structPointer_Interface(base, prop.stype).(Message)
		if err := prop.oneofMarshaler(m, o); err == ErrNil {
			return errOneofHasNil
		} else if err != nil {
			return err
		}
	}

	// Add unrecognized fields at the end.
	if prop.unrecField.IsValid() {
		v := *structPointer_Bytes(base, prop.unrecField)
		if len(o.buf)+len(v) > maxMarshalSize {
			return ErrTooLarge
		}
		if len(v) > 0 {
			o.buf = append(o.buf, v...)
		}
	}

	return state.err
}
           

上面代碼中可以看到,除去

oneof fields

unrecognized fields

是單獨最後處理的,其他類型都是調用的

p.enc(o, p, base)

進行序列化的。

Properties

的資料結構定義如下:

type Properties struct {
	Name     string // name of the field, for error messages
	OrigName string // original name before protocol compiler (always set)
	JSONName string // name to use for JSON; determined by protoc
	Wire     string
	WireType int
	Tag      int
	Required bool
	Optional bool
	Repeated bool
	Packed   bool   // relevant for repeated primitives only
	Enum     string // set for enum types only
	proto3   bool   // whether this is known to be a proto3 field; set for []byte only
	oneof    bool   // whether this is a oneof field

	Default     string // default value
	HasDefault  bool   // whether an explicit default was provided
	CustomType  string
	StdTime     bool
	StdDuration bool

	enc           encoder
	valEnc        valueEncoder // set for bool and numeric types only
	field         field
	tagcode       []byte // encoding of EncodeVarint((Tag<<3)|WireType)
	tagbuf        [8]byte
	stype         reflect.Type      // set for struct types only
	sstype        reflect.Type      // set for slices of structs types only
	ctype         reflect.Type      // set for custom types only
	sprop         * StructProperties // set for struct types only
	isMarshaler   bool
	isUnmarshaler bool

	mtype    reflect.Type // set for map types only
	mkeyprop * Properties  // set for map types only
	mvalprop * Properties  // set for map types only

	size    sizer
	valSize valueSizer // set for bool and numeric types only

	dec    decoder
	valDec valueDecoder // set for bool and numeric types only

	// If this is a packable field, this will be the decoder for the packed version of the field.
	packedDec decoder
}
           

Properties

這個結構體中,定義了名為

enc

encoder

和名為

dec

decoder

encoder

decoder

函數定義是完全一樣的。

type encoder func(p *Buffer, prop *Properties, base structPointer) error
type decoder func(p *Buffer, prop *Properties, base structPointer) error
           

encoder

decoder

函數初始化是在

Properties

中:

// Initialize the fields for encoding and decoding.
func (p *Properties) setEncAndDec(typ reflect.Type, f *reflect.StructField, lockGetProp bool) {
	// 下面代碼有删減,類似的部分省略了
	// proto3 scalar types

	case reflect.Int32:
		if p.proto3 {
			p.enc = (*Buffer).enc_proto3_int32
			p.dec = (*Buffer).dec_proto3_int32
			p.size = size_proto3_int32
		} else {
			p.enc = (*Buffer).enc_ref_int32
			p.dec = (*Buffer).dec_proto3_int32
			p.size = size_ref_int32
		}
	case reflect.Uint32:
		if p.proto3 {
			p.enc = (*Buffer).enc_proto3_uint32
			p.dec = (*Buffer).dec_proto3_int32 // can reuse
			p.size = size_proto3_uint32
		} else {
			p.enc = (*Buffer).enc_ref_uint32
			p.dec = (*Buffer).dec_proto3_int32 // can reuse
			p.size = size_ref_uint32
		}
	case reflect.Float32:
		if p.proto3 {
			p.enc = (*Buffer).enc_proto3_uint32 // can just treat them as bits
			p.dec = (*Buffer).dec_proto3_int32
			p.size = size_proto3_uint32
		} else {
			p.enc = (*Buffer).enc_ref_uint32 // can just treat them as bits
			p.dec = (*Buffer).dec_proto3_int32
			p.size = size_ref_uint32
		}
	case reflect.String:
		if p.proto3 {
			p.enc = (*Buffer).enc_proto3_string
			p.dec = (*Buffer).dec_proto3_string
			p.size = size_proto3_string
		} else {
			p.enc = (*Buffer).enc_ref_string
			p.dec = (*Buffer).dec_proto3_string
			p.size = size_ref_string
		}

	case reflect.Slice:
		switch t2 := t1.Elem(); t2.Kind() {
		default:
			logNoSliceEnc(t1, t2)
			break

		case reflect.Int32:
			if p.Packed {
				p.enc = (*Buffer).enc_slice_packed_int32
				p.size = size_slice_packed_int32
			} else {
				p.enc = (*Buffer).enc_slice_int32
				p.size = size_slice_int32
			}
			p.dec = (*Buffer).dec_slice_int32
			p.packedDec = (*Buffer).dec_slice_packed_int32

			default:
				logNoSliceEnc(t1, t2)
				break
			}
		}

	case reflect.Map:
		p.enc = (*Buffer).enc_new_map
		p.dec = (*Buffer).dec_new_map
		p.size = size_new_map

		p.mtype = t1
		p.mkeyprop = &Properties{}
		p.mkeyprop.init(reflect.PtrTo(p.mtype.Key()), "Key", f.Tag.Get("protobuf_key"), nil, lockGetProp)
		p.mvalprop = &Properties{}
		vtype := p.mtype.Elem()
		if vtype.Kind() != reflect.Ptr && vtype.Kind() != reflect.Slice {
			// The value type is not a message (*T) or bytes ([]byte),
			// so we need encoders for the pointer to this type.
			vtype = reflect.PtrTo(vtype)
		}

		p.mvalprop.CustomType = p.CustomType
		p.mvalprop.StdDuration = p.StdDuration
		p.mvalprop.StdTime = p.StdTime
		p.mvalprop.init(vtype, "Value", f.Tag.Get("protobuf_val"), nil, lockGetProp)
	}
	p.setTag(lockGetProp)
}
           

上面代碼中,分别把各個類型都進行

switch - case

枚舉,每種情況都設定對應的

encode

編碼器,

decode

解碼器,

size

大小。

proto2

proto3

有差別的地方也分成2種不同的情況進行處理。

有以下幾種類型,

reflect.Bool

reflect.Int32

reflect.Uint32

reflect.Int64

reflect.Uint64

reflect.Float32

reflect.Float64

reflect.String

reflect.Struct

reflect.Ptr、reflect.Slice

reflect.Map

共 12 種大的分類。

下面主要挑 3 類,

Int32

String

Map

代碼實作進行分析。

Int32

func (o *Buffer) enc_proto3_int32(p *Properties, base structPointer) error {
	v := structPointer_Word32Val(base, p.field)
	x := int32(word32Val_Get(v)) // permit sign extension to use full 64-bit range
	if x == 0 {
		return ErrNil
	}
	o.buf = append(o.buf, p.tagcode...)
	p.valEnc(o, uint64(x))
	return nil
}
           

處理

Int32

代碼比較簡單,先把

tagcode

放進

buf

二進制資料流緩沖區,接着序列化

Int32

,序列化以後緊接着

tagcode

後面放進緩沖區。

// EncodeVarint writes a varint-encoded integer to the Buffer.
// This is the format for the
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
func (p *Buffer) EncodeVarint(x uint64) error {
	for x >= 1<<7 {
		p.buf = append(p.buf, uint8(x&0x7f|0x80))
		x >>= 7
	}
	p.buf = append(p.buf, uint8(x))
	return nil
}
           

Int32

的編碼處理方法在上篇裡面講過,用的

Varint

處理方法。上面這個函數同樣适用于處理

int32

,

int64

,

uint32

,

uint64

,

bool

,

enum

順道也可以看看

sint32

Fixed32

的具體代碼實作。

// EncodeZigzag32 writes a zigzag-encoded 32-bit integer
// to the Buffer.
// This is the format used for the sint32 protocol buffer type.
func (p *Buffer) EncodeZigzag32(x uint64) error {
	// use signed number to get arithmetic right shift.
	return p.EncodeVarint(uint64((uint32(x) << 1) ^ uint32((int32(x) >> 31))))
}
           

針對有符号的

sint32

,采取的是先

Zigzag

,然後在

Varint

的處理方式。

// EncodeFixed32 writes a 32-bit integer to the Buffer.
// This is the format for the
// fixed32, sfixed32, and float protocol buffer types.
func (p *Buffer) EncodeFixed32(x uint64) error {
	p.buf = append(p.buf,
		uint8(x),
		uint8(x>>8),
		uint8(x>>16),
		uint8(x>>24))
	return nil
}
           

對于

Fixed32

的處理,僅僅隻是位移操作,并沒有做什麼壓縮操作。

String

func (o *Buffer) enc_proto3_string(p *Properties, base structPointer) error {
	v := *structPointer_StringVal(base, p.field)
	if v == "" {
		return ErrNil
	}
	o.buf = append(o.buf, p.tagcode...)
	o.EncodeStringBytes(v)
	return nil
}
           

序列化字元串也分2步,先把

tagcode

放進去,然後再序列化資料。

// EncodeStringBytes writes an encoded string to the Buffer.
// This is the format used for the proto2 string type.
func (p *Buffer) EncodeStringBytes(s string) error {
	p.EncodeVarint(uint64(len(s)))
	p.buf = append(p.buf, s...)
	return nil
}
           

序列化字元串的時候,會先把字元串的長度通過編碼

Varint

的方式,寫到

buf

中。長度後面再緊跟着

string

。這也就是

tag - length - value

的實作。

Map

// Encode a map field.
func (o *Buffer) enc_new_map(p *Properties, base structPointer) error {
	var state errorState // XXX: or do we need to plumb this through?

	v := structPointer_NewAt(base, p.field, p.mtype).Elem() // map[K]V
	if v.Len() == 0 {
		return nil
	}

	keycopy, valcopy, keybase, valbase := mapEncodeScratch(p.mtype)

	enc := func() error {
		if err := p.mkeyprop.enc(o, p.mkeyprop, keybase); err != nil {
			return err
		}
		if err := p.mvalprop.enc(o, p.mvalprop, valbase); err != nil && err != ErrNil {
			return err
		}
		return nil
	}

	// Don't sort map keys. It is not required by the spec, and C++ doesn't do it.
	for _, key := range v.MapKeys() {
		val := v.MapIndex(key)

		keycopy.Set(key)
		valcopy.Set(val)

		o.buf = append(o.buf, p.tagcode...)
		if err := o.enc_len_thing(enc, &state); err != nil {
			return err
		}
	}
	return nil
}
           

上述代碼也可以序列化字典數組,例如:

轉換成對應的

repeated message

形式再進行序列化。

message MapFieldEntry {
		key_type key = 1;
		value_type value = 2;
}
repeated MapFieldEntry map_field = N;
           

map

序列化是針對每個

k-v

,都先放入

tagcode

,然後再序列化

k-v

。這裡需要化未知長度的結構體的時候需要調用

enc_len_thing()

方法。

// Encode something, preceded by its encoded length (as a varint).
func (o *Buffer) enc_len_thing(enc func() error, state *errorState) error {
	iLen := len(o.buf)
	o.buf = append(o.buf, 0, 0, 0, 0) // reserve four bytes for length
	iMsg := len(o.buf)
	err := enc()
	if err != nil && !state.shouldContinue(err, nil) {
		return err
	}
	lMsg := len(o.buf) - iMsg
	lLen := sizeVarint(uint64(lMsg))
	switch x := lLen - (iMsg - iLen); {
	case x > 0: // actual length is x bytes larger than the space we reserved
		// Move msg x bytes right.
		o.buf = append(o.buf, zeroes[:x]...)
		copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])
	case x < 0: // actual length is x bytes smaller than the space we reserved
		// Move msg x bytes left.
		copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])
		o.buf = o.buf[:len(o.buf)+x] // x is negative
	}
	// Encode the length in the reserved space.
	o.buf = o.buf[:iLen]
	o.EncodeVarint(uint64(lMsg))
	o.buf = o.buf[:len(o.buf)+lMsg]
	return state.err
}
           

enc_len_thing()

方法會先預存 4 個位元組的長度空位。序列化以後算出長度。如果長度比 4 個位元組還要長,則右移序列化的二進制資料,把長度填到

tagcode

和資料之間。如果長度小于 4 個位元組,相應的要左移。

slice

最後再舉一個數組的例子。以

[]int32

為例。

// Encode a slice of int32s ([]int32) in packed format.
func (o *Buffer) enc_slice_packed_int32(p *Properties, base structPointer) error {
	s := structPointer_Word32Slice(base, p.field)
	l := s.Len()
	if l == 0 {
		return ErrNil
	}
	// TODO: Reuse a Buffer.
	buf := NewBuffer(nil)
	for i := 0; i < l; i++ {
		x := int32(s.Index(i)) // permit sign extension to use full 64-bit range
		p.valEnc(buf, uint64(x))
	}

	o.buf = append(o.buf, p.tagcode...)
	o.EncodeVarint(uint64(len(buf.buf)))
	o.buf = append(o.buf, buf.buf...)
	return nil
}
           

序列化這個數組,分3步,先把

tagcode

放進去,然後再序列化整個數組的長度,最後把數組的每個資料都序列化放在後面。最後形成

tag - length - value - value - value

的形式。

上述就是

Protocol Buffer

序列化的過程。

序列化小結

Protocol Buffer

序列化采用

Varint、Zigzag

方法,壓縮

int

型整數和帶符号的整數。對浮點型數字不做壓縮(這裡可以進一步的壓縮,

Protocol Buffer

還有提升空間)。編碼

.proto

檔案,會對

option

repeated

字段進行檢查,若

optional

repeated

字段沒有被設定字段值,那麼該字段在序列化時的資料中是完全不存在的,即不進行序列化(少編碼一個字段)。

上面這兩點做到了壓縮資料,序列化工作量減少。

序列化的過程都是二進制的位移,速度非常快。資料都以

tag - length - value

(或者

tag - value

)的形式存在二進制資料流中。采用了

TLV

結構存儲資料以後,也擺脫了

JSON

中的

{、}、; 、

這些分隔符,沒有這些分隔符也算是再一次減少了一部分資料。

這一點做到了序列化速度非常快。

回到頂部

protocolBuffers反序列化

反序列化的實作完全是序列化實作的逆過程。

func Unmarshal(buf []byte, pb Message) error {
	pb.Reset()
	return UnmarshalMerge(buf, pb)
}
           

在反序列化開始之前,先重置一下緩沖區。

func (p *Buffer) Reset() {
	p.buf = p.buf[0:0] // for reading/writing
	p.index = 0        // for reading
}
           

清空

buf

中的所有資料,并且重置

index

func UnmarshalMerge(buf []byte, pb Message) error {
	// If the object can unmarshal itself, let it.
	if u, ok := pb.(Unmarshaler); ok {
		return u.Unmarshal(buf)
	}
	return NewBuffer(buf).Unmarshal(pb)
}
           

反序列化資料的開始從上面這個函數開始,如果傳進來的

message

的結果和

buf

結果不比對,最終得到的結果是不可預知的。反序列化之前,同樣會先調用一下對應自己身自定義的

Unmarshal()

方法。

type Unmarshaler interface {
	Unmarshal([]byte) error
}
           

Unmarshal()

是一個可以自己實作的接口。

UnmarshalMerge

中會調用

Unmarshal(pb Message)

方法。

func (p *Buffer) Unmarshal(pb Message) error {
	// If the object can unmarshal itself, let it.
	if u, ok := pb.(Unmarshaler); ok {
		err := u.Unmarshal(p.buf[p.index:])
		p.index = len(p.buf)
		return err
	}

	typ, base, err := getbase(pb)
	if err != nil {
		return err
	}

	err = p.unmarshalType(typ.Elem(), GetProperties(typ.Elem()), false, base)

	if collectStats {
		stats.Decode++
	}

	return err
}
           

Unmarshal(pb Message)

這個函數隻有一個入參,和

proto.Unmarshal()

方法函數簽名不同(前面的函數隻有 1 個入參,後面的有 2 個入參)。兩者的差別在于,1 個入參的函數實作裡面并不會重置

buf

緩沖區,二個入參的會先重置

buf

緩沖區。

這兩個函數最終都會調用

unmarshalType()

方法,這個函數是最終支援反序列化的函數。

func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error {
	var state errorState
	required, reqFields := prop.reqCount, uint64(0)

	var err error
	for err == nil && o.index < len(o.buf) {
		oi := o.index
		var u uint64
		u, err = o.DecodeVarint()
		if err != nil {
			break
		}
		wire := int(u & 0x7)

		// 下面代碼有省略

		dec := p.dec

		// 中間代碼有省略

		decErr := dec(o, p, base)
		if decErr != nil && !state.shouldContinue(decErr, p) {
			err = decErr
		}
		if err == nil && p.Required {
			// Successfully decoded a required field.
			if tag <= 64 {
				// use bitmap for fields 1-64 to catch field reuse.
				var mask uint64 = 1 << uint64(tag-1)
				if reqFields&mask == 0 {
					// new required field
					reqFields |= mask
					required--
				}
			} else {
				// This is imprecise. It can be fooled by a required field
				// with a tag > 64 that is encoded twice; that's very rare.
				// A fully correct implementation would require allocating
				// a data structure, which we would like to avoid.
				required--
			}
		}
	}
	if err == nil {
		if is_group {
			return io.ErrUnexpectedEOF
		}
		if state.err != nil {
			return state.err
		}
		if required > 0 {
			// Not enough information to determine the exact field. If we use extra
			// CPU, we could determine the field only if the missing required field
			// has a tag <= 64 and we check reqFields.
			return &RequiredNotSetError{"{Unknown}"}
		}
	}
	return err
}
           

unmarshalType()

函數比較長,裡面處理的情況比較多,有

oneof,WireEndGroup

。真正處理反序列化的函數在

decErr := dec(o, p, base)

這一行。

dec

函數在

Properties

setEncAndDec()

函數中進行了初始化。上面序列化的時候談到過那個函數了,這裡就不再贅述了。

dec()

函數針對每個不同類型都有對應的反序列化函數。

同樣的,接下來也舉 4 個例子,看看反序列化的實際代碼實作。

Int32

func (o *Buffer) dec_proto3_int32(p *Properties, base structPointer) error {
	u, err := p.valDec(o)
	if err != nil {
		return err
	}
	word32Val_Set(structPointer_Word32Val(base, p.field), uint32(u))
	return nil
}
           

反序列化

Int32

代碼比較簡單,原理是按照

encode

的逆過程,還原原來的資料。

func (p *Buffer) DecodeVarint() (x uint64, err error) {
	i := p.index
	buf := p.buf

	if i >= len(buf) {
		return 0, io.ErrUnexpectedEOF
	} else if buf[i] < 0x80 {
		p.index++
		return uint64(buf[i]), nil
	} else if len(buf)-i < 10 {
		return p.decodeVarintSlow()
	}

	var b uint64
	// we already checked the first byte
	x = uint64(buf[i]) - 0x80
	i++

	b = uint64(buf[i])
	i++
	x += b << 7
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 7

	b = uint64(buf[i])
	i++
	x += b << 14
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 14

	b = uint64(buf[i])
	i++
	x += b << 21
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 21

	b = uint64(buf[i])
	i++
	x += b << 28
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 28

	b = uint64(buf[i])
	i++
	x += b << 35
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 35

	b = uint64(buf[i])
	i++
	x += b << 42
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 42

	b = uint64(buf[i])
	i++
	x += b << 49
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 49

	b = uint64(buf[i])
	i++
	x += b << 56
	if b&0x80 == 0 {
		goto done
	}
	x -= 0x80 << 56

	b = uint64(buf[i])
	i++
	x += b << 63
	if b&0x80 == 0 {
		goto done
	}
	// x -= 0x80 << 63 // Always zero.

	return 0, errOverflow

done:
	p.index = i
	return x, nil
}
           

Int32

序列化之後,第一個位元組一定是

0x80

,那麼除去這個位元組以後,後面的每個二進制位元組都是資料,剩下的步驟就是通過位移操作把每個數字都加起來。上面這個反序列化的函數同樣适用于

int32

,

int64

,

uint32

,

uint64

,

bool

, 和

enum

順道也可以看看

sint32

Fixed32

的反序列化具體代碼實作。

func (p *Buffer) DecodeZigzag32() (x uint64, err error) {
	x, err = p.DecodeVarint()
	if err != nil {
		return
	}
	x = uint64((uint32(x) >> 1) ^ uint32((int32(x&1)<<31)>>31))
	return
}
           

針對有符号的

sint32

,反序列化的過程就是先反序列

Varint

,再反序列化

Zigzag

func (p *Buffer) DecodeFixed32() (x uint64, err error) {
	// x, err already 0
	i := p.index + 4
	if i < 0 || i > len(p.buf) {
		err = io.ErrUnexpectedEOF
		return
	}
	p.index = i

	x = uint64(p.buf[i-4])
	x |= uint64(p.buf[i-3]) << 8
	x |= uint64(p.buf[i-2]) << 16
	x |= uint64(p.buf[i-1]) << 24
	return
}
           

Fixed32

反序列化的過程也是通過位移,每個位元組的内容都累加,就可以還原出原先的資料。注意這裡也要先跳過

tag

的位置。

String

func (p *Buffer) DecodeRawBytes(alloc bool) (buf []byte, err error) {
	n, err := p.DecodeVarint()
	if err != nil {
		return nil, err
	}

	nb := int(n)
	if nb < 0 {
		return nil, fmt.Errorf("proto: bad byte length %d", nb)
	}
	end := p.index + nb
	if end < p.index || end > len(p.buf) {
		return nil, io.ErrUnexpectedEOF
	}

	if !alloc {
		// todo: check if can get more uses of alloc=false
		buf = p.buf[p.index:end]
		p.index += nb
		return
	}

	buf = make([]byte, nb)
	copy(buf, p.buf[p.index:])
	p.index += nb
	return
}
           

反序列化

string

先把

length

序列化出來,通過

DecodeVarint

的方式。拿到

length

以後,剩下的就是直接拷貝的過程。在上篇

encode

中,我們知道字元串是不做處理,直接放到二進制流裡面的,是以反序列化直接取出即可。

Map

func (o *Buffer) dec_new_map(p *Properties, base structPointer) error {
	raw, err := o.DecodeRawBytes(false)
	if err != nil {
		return err
	}
	oi := o.index       // index at the end of this map entry
	o.index -= len(raw) // move buffer back to start of map entry

	mptr := structPointer_NewAt(base, p.field, p.mtype) // *map[K]V
	if mptr.Elem().IsNil() {
		mptr.Elem().Set(reflect.MakeMap(mptr.Type().Elem()))
	}
	v := mptr.Elem() // map[K]V

	// 這裡省略一些代碼,主要是為了 key - value 準備的一些可以雙重間接尋址的占位符,具體原因可以見序列化代碼裡面的 enc_new_map 函數

	// Decode.
	// This parses a restricted wire format, namely the encoding of a message
	// with two fields. See enc_new_map for the format.
	for o.index < oi {
		// tagcode for key and value properties are always a single byte
		// because they have tags 1 and 2.
		tagcode := o.buf[o.index]
		o.index++
		switch tagcode {
		case p.mkeyprop.tagcode[0]:
			if err := p.mkeyprop.dec(o, p.mkeyprop, keybase); err != nil {
				return err
			}
		case p.mvalprop.tagcode[0]:
			if err := p.mvalprop.dec(o, p.mvalprop, valbase); err != nil {
				return err
			}
		default:
			// TODO: Should we silently skip this instead?
			return fmt.Errorf("proto: bad map data tag %d", raw[0])
		}
	}
	keyelem, valelem := keyptr.Elem(), valptr.Elem()
	if !keyelem.IsValid() {
		keyelem = reflect.Zero(p.mtype.Key())
	}
	if !valelem.IsValid() {
		valelem = reflect.Zero(p.mtype.Elem())
	}

	v.SetMapIndex(keyelem, valelem)
	return nil
}
           

反序列化

map

需要把每個

tag

取出來,然後緊接着反序列化每個

key - value

。最後會判斷

keyelem

valelem

是否為零值,如果是零值要分别調用

reflect.Zero

處理零值的情況。

slice

最後還是舉一個數組的例子。以

[]int32

為例。

func (o *Buffer) dec_slice_packed_int32(p *Properties, base structPointer) error {
	v := structPointer_Word32Slice(base, p.field)

	nn, err := o.DecodeVarint()
	if err != nil {
		return err
	}
	nb := int(nn) // number of bytes of encoded int32s

	fin := o.index + nb
	if fin < o.index {
		return errOverflow
	}
	for o.index < fin {
		u, err := p.valDec(o)
		if err != nil {
			return err
		}
		v.Append(uint32(u))
	}
	return nil
}
           

反序列化這個數組,分2步,跳過

tagcode

拿到

length

,反序列化

length

。在

length

這個長度中依次反序列化各個

value

上述就是

Protocol Buffer

反序列化的過程。

序列化小結

Protocol Buffer

反序列化直接讀取二進制位元組資料流,反序列化就是

encode

的反過程,同樣是一些二進制操作。反序列化的時候,通常隻需要用到

length

tag

值隻是用來辨別類型的,

Properties

setEncAndDec()

方法裡面已經把每個類型對應的

decode

解碼器初始化好了,是以反序列化的時候,

tag

值可以直接跳過,從

length

開始處理。

XML

的解析過程就複雜一些。

XML

需要從檔案中讀取出字元串,再轉換為

XML

文檔對象結構模型。之後,再從

XML

文檔對象結構模型中讀取指定節點的字元串,最後再将這個字元串轉換成指定類型的變量。這個過程非常複雜,其中将

XML

檔案轉換為文檔對象結構模型的過程通常需要完成詞法文法分析等大量消耗

CPU

的複雜計算。

回到頂部

序列化/反序列化性能

Protocol Buffer

一直被人們認為是高性能的存在。也有很多人做過實作,驗證了這一說法。例如這個連結裡面的實驗 jvm-serializers。

在看資料之前,我們可以先理性的分析一下

Protocol Buffer

JSON

XML

這些比有哪些優勢:

Protobuf

采用了

Varint

Zigzag

大幅的壓縮了整數類型,也沒有

JSON

裡面的

{、}、;、

這些資料分隔符,有

option

字段辨別的,沒有資料的時候不會進行反序列化。這幾個措施導緻

pb

的資料量整體的就比

JSON

少很多。

Protobuf

采取的是

TLV

的形式,

JSON

這些都是字元串的形式。字元串比對應該比基于數字的字段

tag

更耗時。

Protobuf

在正文前有一個大小或者長度的标記,而

JSON

必須全文掃描無法跳過不需要的字段。

下面這張圖來自參考連結裡面的 《Protobuf有沒有比JSON快5倍?用代碼來擊破pb性能神話》:

高效的序列化/反序列化資料方式 Protobuf高效的序列化/反序列化資料方式 Protobuf

從這個實驗來看,确實

Protobuf

在序列化數字這方面性能是非常強悍的。

序列化 / 反序列化數字确實是

Protobuf

針對

JSON

XML

的優勢,但是它也存在一些沒有優勢的地方。比如字元串。字元串在

Protobuf

中基本沒有處理,除了前面加了

tag - length

。在序列化 / 反序列化字元串的過程中,字元串拷貝的速度反而決定的真正的速度。

高效的序列化/反序列化資料方式 Protobuf高效的序列化/反序列化資料方式 Protobuf

從上圖可以看到

encode

字元串的時候,速度基本和

JSON

相差無幾。

回到頂部

最後

至此,關于

protocol buffers

的所有,讀者應該了然于胸了。

protocol buffers

誕生之初也并不是為了傳輸資料存在的,隻是為了解決伺服器多版本協定相容的問題。實質其實是發明了一個新的跨語言無歧義的

IDL (Interface description language)

。隻不過人們後來發現用它來傳輸資料也不錯,才開始用

protocol buffers

想用

protocol buffers

替換

JSON

,可能是考慮到:

  • protocol buffers

    相同資料,傳輸的資料量比

    JSON

    小,

    gzip

    或者

    7zip

    壓縮以後,網絡傳輸消耗較少。
  • protocol buffers

    不是自我描述的,在缺少

    .proto

    檔案以後,有一定的加密性,資料傳輸過程中都是二進制流,并不是明文。
  • protocol buffers

    提供了一套工具,自動化生成代碼也非常友善。
  • protocol buffers

    具有向後相容性,改變了資料結構以後,對老的版本沒有影響。
  • protocol buffers

    原生完美相容

    RPC

    調用。

如果很少用到整型數字,浮點型數字,全部都是字元串資料,那麼

JSON 和 protocol buffers

性能不會差太多。純前端之間互動的話,選擇

JSON

或者

protocol buffers

差别不是很大。

與後端互動過程中,用到

protocol buffers

比較多,筆者認為選擇

protocol buffers

除了性能強以外,完美相容

RPC

調用也是一個重要因素。

回到頂部

繼續閱讀