一、前言
在分析NameServer的請求和響應流程之前我們需要先看一下他的序列化協定是怎樣的,RocketMQ支援的序列化協定有以下2種:
- JSON;
- RocketMQ自定義的協定;
json進行序列化其實是省力做法,效率是比較差的,序列化以後的資料格式是比較占用空間,一般成熟的中間件項目一般都會采用自定義的方式進行序列化和反序列化;
二、RemotingCommand源碼分析
RemotingCommand為RocketMQ中自定義協定元件,其中包含了序列化和反序列化代碼邏輯;
但是不向服務直接提供調用,而是通過前文講解的NettyRemotingServer類中的NettyEncoder(編碼器)和NettyDecoder(解碼器)進行具體的調用;
序列化:就是将一段位元組數組以固定的順序的形式存放資料,第一個位元組存放什麼,後面4個位元組存放什麼,再後面幾個位元組存放什麼;
反序列化:就是以固定的順序取資料,你第一個位元組存放的是消息的标志位,那你取出來就是消息的标志位,再後面4個為消息體的長度,那取出來就是消息體的長度,你再可以根據消息體的長度去擷取對應長度位元組的資料;
1、資料模型
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
private static final int RPC_ONEWAY = 1; // 0, RPC
private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
// 1, Oneway
// 1, RESPONSE_COMMAND
private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
static {
final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
if (!isBlank(protocol)) {
try {
serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
} catch (IllegalArgumentException e) {
throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
}
}
}
// code編号,請求編号
private int code;
private LanguageCode language = LanguageCode.JAVA; // 程式設計語言,java
private int version = 0; // 版本号
private int opaque = requestId.getAndIncrement(); // 請求id
private int flag = 0; // 辨別
private String remark; // 備注
private HashMap<String, String> extFields; // 擴充字段
private transient CommandCustomHeader customHeader; // 自定義header頭
// 這一次rpc調用的序列化類型,預設就是json格式
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
// 消息體,會把真正的消息體序列化成位元組數組
private transient byte[] body;
}
2、序列化
org.apache.rocketmq.remoting.netty.NettyEncoder#encode
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}
這裡會去判斷序列化協定的類型,json類型其實沒什麼好看的,JSON.toJSONString(obj, prettyFormat).getBytes(CHARSET_UTF8); 就沒了,我們主要是看RocketMQ的自定義協定;
private byte[] headerEncode() {
// 把自定義headers放到一個ext fields map裡去
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
// 通過反射擷取到自定義header類裡面的fields
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
// 對自定義header類的fields進行周遊
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
// 自定義header這些fields都是放到ext fields裡面去
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
// 如果說你要是自定義了一套header以後,你搞一個類,實作接口
// 然後在這個自定義頭的類裡,可以定義一堆的field,這些field就是你的自定義的頭
Field[] field = CLASS_HASH_MAP.get(classHeader);
if (field == null) {
// 通過反射直接擷取到你自定義類裡的頭fields拿出來
field = classHeader.getDeclaredFields();
synchronized (CLASS_HASH_MAP) {
CLASS_HASH_MAP.put(classHeader, field);
}
}
return field;
}
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode(); // header length裡一共是4個位元組,第一個位元組是序列化類型code
result[1] = (byte) ((source >> 16) & 0xFF); // 第二個位元組開始到第四個位元組,一共是3個位元組都是跟header length是有關系的
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
其實自定義序列化就是搞一個byte數組,采用固定的顯示進行建構。
如:第一個位元組放請求類型,後面四個位元組放消息體總長度,在後面發具體的消息體。消息體前面幾位為header長度,後面為header消息體等等,通過固定排列的順序進行建構,這樣解析的時候我們就可以根據位元組順序來讀取消息了。
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// 用json進行序列化其實是省力做法,效率是比較差的,序列化以後的資料格式是比較占用空間一些
// 正常做法是自己對RemotingCommand協定資料對象進行序列化
// 編碼,對象 -> 位元組數組
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
// HashMap<String, String> extFields
// ext fields,是我們可能的自定義headers就在這裡,把擴充頭序列化為位元組數組
byte[] extFieldsBytes = null;
int extLen = 0;
if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
extFieldsBytes = mapSerialize(cmd.getExtFields());
extLen = extFieldsBytes.length;
}
// 計算出來消息頭總長度
int totalLen = calTotalLen(remarkLen, extLen);
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
headerBuffer.putShort((short) cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
headerBuffer.putInt(cmd.getFlag());
// String remark
if (remarkBytes != null) {
headerBuffer.putInt(remarkBytes.length);
headerBuffer.put(remarkBytes);
} else {
headerBuffer.putInt(0);
}
// HashMap<String, String> extFields;
if (extFieldsBytes != null) {
headerBuffer.putInt(extFieldsBytes.length);
headerBuffer.put(extFieldsBytes);
} else {
headerBuffer.putInt(0);
}
return headerBuffer.array();
}
3、反序列化
org.apache.rocketmq.remoting.netty.NettyDecoder#decode
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException {
// 解碼的過程就是編碼過程的逆向過程
int length = byteBuffer.limit(); // 總長度
int oriHeaderLen = byteBuffer.getInt(); // 頭長度
int headerLength = getHeaderLength(oriHeaderLen);
// 搞一個頭長度的位元組數組,一次性把headers都讀出來放到位元組數組裡去
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
// 對header要做一個解碼
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
這裡判斷header是用什麼協定進行序列化的,就會使用什麼協定進行反序列化;
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
我們之間看rocketMQ自定義的協定吧,其實就是一個逆向的過程,你之前放的什麼,他就根據位元組拿出來;
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException {
RemotingCommand cmd = new RemotingCommand();
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
cmd.setCode(headerBuffer.getShort());
// LanguageCode language
cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
// int version(~32767)
cmd.setVersion(headerBuffer.getShort());
// int opaque
cmd.setOpaque(headerBuffer.getInt());
// int flag
cmd.setFlag(headerBuffer.getInt());
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
if (remarkLength > headerArray.length) {
throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length);
}
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap<String, String> extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
if (extFieldsLength > headerArray.length) {
throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length);
}
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
cmd.setExtFields(mapDeserialize(extFieldsBytes));
}
return cmd;
}
三、總結
- RemotingCommand為rocketMQ的序列化和反序列化的元件,所有消息都需要使用他進行處理;
- 序列化和反序列化是根據約定的協定存放資料,再根據約定的協定取資料;