天天看點

zookeeper源碼分析(5)-序列化協定

在網絡傳輸時,傳輸的是二進制資料,是以發送端需要将

序列化對象轉變為二進制資料

,也就是

序列化過程

。接收端需要将

二進制資料轉化為序列化對象

,也就是

反序列化過程

。在序列化和反序列化過程中,需要定義一種對

資料互相轉變的一緻性協定

,也就是

序列化協定

。zookeeper使用Jute作為序列化元件。首先看下Jute的使用:

public class RequestHeader implements Record {
  private int xid;
  private int type;
  public RequestHeader() {
  }
  public RequestHeader(
        int xid,
        int type) {
    this.xid=xid;
    this.type=type;
  }
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
//先寫入xid
    a_.writeInt(xid,"xid");
    a_.writeInt(type,"type");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
//先讀出xid
    xid=a_.readInt("xid");
    type=a_.readInt("type");
    a_.endRecord(tag);
}

//測試方法
public void testJute() throws IOException {
        //實作Record接口,自定義序列化
        RequestHeader requestHeader = new RequestHeader(1, ZooDefs.OpCode.create);
        System.out.print("requestHeader:  " +requestHeader );
        //序列化
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(outputStream);
        requestHeader.serialize(binaryOutputArchive,"header");
        //通常是TCP網絡通信對象
        ByteBuffer bb = ByteBuffer.wrap(outputStream.toByteArray());
        //反序列化
        RequestHeader requestHeader1 = new RequestHeader();
        ByteBufferInputStream inputStream = new ByteBufferInputStream(bb);
        BinaryInputArchive binaryInputArchive =  BinaryInputArchive.getArchive(inputStream);
        requestHeader1.deserialize(binaryInputArchive,"header");
        System.out.print("requestHeader1:  " + requestHeader1);
        outputStream.close();
        inputStream.close();
      
    }           

複制

1.定義序列化對象RequestHeader,需要實作Record接口的

serialize

deserialize

接口

2.建構序列化器BinaryOutputArchive,調用

serialize

方法将對象序列化流中

3.建構反序列化器BinaryInputArchive,調用

deserialize

方法将流反序列化為對象

從上面的使用我們可以看出,對RequestHeader對象的序列化 就是對其成員變量xid,type的按順序的寫入序列化器BinaryOutputArchive,反序列化就是從反序列化器BinaryInputArchive按順序的讀出xid,type。

是以序列化元件Jute的實作關鍵就是對

序列化對象

序列化器

反序列化器

的設計。

序列化對象

所有的序列化對象都要實作

Record

接口,它定義了

serialize

deserialize

方法用于子類自己實作自己的序列化和反序列方式。

public interface Record {
    public void serialize(OutputArchive archive, String tag)
        throws IOException;
    public void deserialize(InputArchive archive, String tag)
        throws IOException;
}           

複制

zookeeper的org.apache.zookeeper.proto包下定義了很多用于網絡通信和資料存儲所需要的序列化對象。

序列化器

在zookeeper中序列化就是将Record對象變為二進制資料的過程,序列化器接口為

OutputArchive

public interface OutputArchive {
    public void writeByte(byte b, String tag) throws IOException;
    public void writeBool(boolean b, String tag) throws IOException;
    public void writeInt(int i, String tag) throws IOException;
    public void writeLong(long l, String tag) throws IOException;
    public void writeFloat(float f, String tag) throws IOException;
    public void writeDouble(double d, String tag) throws IOException;
    public void writeString(String s, String tag) throws IOException;
    public void writeBuffer(byte buf[], String tag)
        throws IOException;
    public void writeRecord(Record r, String tag) throws IOException;
    public void startRecord(Record r, String tag) throws IOException;
    public void endRecord(Record r, String tag) throws IOException;
    public void startVector(List<?> v, String tag) throws IOException;
    public void endVector(List<?> v, String tag) throws IOException;
    public void startMap(TreeMap<?,?> v, String tag) throws IOException;
    public void endMap(TreeMap<?,?> v, String tag) throws IOException;

}           

複制

有三種實作:BinaryOutputArchive,CsvOutputArchive和XmlOutputArchive,分别對應無特殊格式,有csv格式和有xml格式的資料序列化。

BinaryOutputArchive

public class BinaryOutputArchive implements OutputArchive {
    private ByteBuffer bb = ByteBuffer.allocate(1024);

    private DataOutput out;
    
    public static BinaryOutputArchive getArchive(OutputStream strm) {
        return new BinaryOutputArchive(new DataOutputStream(strm));
    }
    
    /** Creates a new instance of BinaryOutputArchive */
    public BinaryOutputArchive(DataOutput out) {
        this.out = out;
    }
    
    public void writeByte(byte b, String tag) throws IOException {
        out.writeByte(b);
    }
    
    public void writeBool(boolean b, String tag) throws IOException {
        out.writeBoolean(b);
    }
··········省略代碼·····
}           

複制

可以看到BinaryOutputArchive其實是對

DataOutput out

的包裝,進而實作了對各種資料類型的寫入,tag并不會寫入進二進制資料中,而對于CsvOutputArchive和XmlOutputArchive,如果想在二進制資料中儲存對應格式,就需要tag控制,如XmlOutputArchive中

public void writeBool(boolean b, String tag) throws IOException {
        printBeginEnvelope(tag);
        stream.print("<boolean>");
        stream.print(b ? "1" : "0");
        stream.print("</boolean>");
        printEndEnvelope(tag);
    }
private void printBeginEnvelope(String tag) {
        if (!compoundStack.empty()) {
            String s = compoundStack.peek();
            if ("struct".equals(s)) {
                putIndent();
                stream.print("<member>\n");
                addIndent();
                putIndent();
                stream.print("<name>"+tag+"</name>\n");
                putIndent();
                stream.print("<value>");
            } else if ("vector".equals(s)) {
                stream.print("<value>");
            } else if ("map".equals(s)) {
                stream.print("<value>");
            }
        } else {
            stream.print("<value>");
        }
    }           

複制

反序列化器

在zookeeper中反序列化就是将二進制資料變為Record對象的過程,反序列化器接口為

InputArchive

public interface InputArchive {
    public byte readByte(String tag) throws IOException;
    public boolean readBool(String tag) throws IOException;
    public int readInt(String tag) throws IOException;
    public long readLong(String tag) throws IOException;
    public float readFloat(String tag) throws IOException;
    public double readDouble(String tag) throws IOException;
    public String readString(String tag) throws IOException;
    public byte[] readBuffer(String tag) throws IOException;
    public void readRecord(Record r, String tag) throws IOException;
    public void startRecord(String tag) throws IOException;
    public void endRecord(String tag) throws IOException;
    public Index startVector(String tag) throws IOException;
    public void endVector(String tag) throws IOException;
    public Index startMap(String tag) throws IOException;
    public void endMap(String tag) throws IOException;
}           

複制

同樣有三種實作:BinaryInputArchive,CsvInputArchive和XmlInputArchive,分别對應無特殊格式,有csv格式和有xml格式的資料序列化。

public class BinaryInputArchive implements InputArchive {
    private DataInput in;
    
    static public BinaryInputArchive getArchive(InputStream strm) {
        return new BinaryInputArchive(new DataInputStream(strm));
    }
   
    /** Creates a new instance of BinaryInputArchive */
    public BinaryInputArchive(DataInput in) {
        this.in = in;
    }
    
    public byte readByte(String tag) throws IOException {
        return in.readByte();
    }
    
    public boolean readBool(String tag) throws IOException {
        return in.readBoolean();
    }
    }
··········省略代碼·····
}           

複制

可以看到BinaryInputArchive其實是對

DataInput in

的包裝,進而實作了對各種資料類型的讀取,tag并不會寫入進二進制資料中。

實際zookeeper的用戶端在向服務端發送請求時,通信協定體如下:

zookeeper源碼分析(5)-序列化協定

len

為請求資料的總長度,占4位。

請求頭

就是事例中的

RequestHeader

的xid和type。

xid

用于記錄用戶端請求發起的先後順序,占4位。

type

代表請求的操作類型,占4位。這樣子在服務端反序列化時,就可以根據type的值來選擇對應的Record來讀取請求體内容。