
前言
前兩篇文章MySql Binlog初識和MySql Binlog事件介紹篇分别從Binlog入門和Binlog事件如何産生的兩個角度來介紹Binlog,本文将從Binlog事件的資料來更深入的了解Binlog。
Binlog事件資料
1.QUERY_EVENT
執行更新語句時會生成此事件,包括:create,insert,update,delete;
Fixed data part,總長度13位元組:
4位元組:執行sql的線程id;
4位元組:執行sql的時間;
1位元組:資料庫名稱的長度;
2位元組:執行sql産生的錯誤碼;
2位元組:狀态變量的長度,具體内容在Variable part;
Variable part:
可變位元組:狀态變量,每個狀态變量key為一個位元組,後面跟着value,不同的key對應不同長度的value,但是總長度在Fixed data part中已經定義;
可變位元組:資料庫名稱
可變位元組:sql語句,通過事件的總長度-header長度-Fixed data-狀态變量,剩餘的位元組數組通過utf-8編碼即可擷取;
2.STOP_EVENT
當mysqld停止時生成此事件;
Fixed data part:空的
Variable part:空的
3.ROTATE_EVENT
當mysqld切換到新的binlog檔案生成此事件;
Fixed data part,總長度8位元組:
8位元組:下一個binlog檔案的第一個事件的position,這個值一直是4,因為魔數占用了4位元組;
Variable data part:
可變位元組:下一個binlog的名稱,它的長度=事件總長度-header長度-Fixed data
4.INTVAR_EVENT
當sql語句中使用了AUTO_INCREMENT的字段或者LAST_INSERT_ID()函數;
Fixed data part:空的
Variable part:
1位元組:一個變量類型的值:LAST_INSERT_ID_EVENT = 1 或者 INSERT_ID_EVENT = 2;
8位元組:LAST_INSERT_ID()函數調用,或者AUTO_INCREMENT字段生成的一個無符号的整型;
5.RAND_EVENT
| bin-log.000003 | 438 | RAND | 1 | 473 | rand_seed1=223769196,rand_seed2=1013907192
執行包含RAND()函數的語句産生此事件,此事件沒有被用在binlog_format為ROW模式的情況下;
Fixed data part:空的
Variable part:
8位元組:第一個種子值(ex:rand_seed1=223769196)
8位元組:第二個種子值(ex:rand_seed2=1013907192)
6.USER_VAR_EVENT
| bin-log.000003 | 711 | User var | 1 | 756 | @`age`=50
執行包含了使用者變量的語句産生此事件,此事件沒有被用在binlog_format為ROW模式的情況下;
Fixed data part:空的
Variable part:
4位元組:使用者變量名的大小;
可變位元組:使用者變量名,具體長度上一個4位元組的資料指定了;
1位元組:如果是變量值是NULL,那麼此值是非0的;如果是此值是0,那麼才有接下來的其他資料;應該是對有空值情況的一種優化;
1位元組:使用者變量類型,包括:(STRING_RESULT=0, REAL_RESULT=1, INT_RESULT=2, ROW_RESULT=3, DECIMAL_RESULT=4);
4位元組:使用者變量字元的數量;
4位元組:使用者變量值的長度;
可變位元組:變量的值,通過變量類型和變量值的長度,可以解析出具體的變量值;
7.FORMAT_DESCRIPTION_EVENT
| bin-log.000003 | 4 | Format_desc | 1 | 107 | Server ver: 5.5.29-log, Binlog ver: 4
描述事件,被寫在每個binlog檔案的開始位置,用在MySQL5.0以後的版本中,代替了START_EVENT_V3;
Fixed data part:
2位元組:binlog版本,Mysql5.0以及以上的版本值為:4
50位元組:Mysql Server版本;
4位元組:事件建立的時間戳;
1位元組:header的長度,binlog版本為4的情況下header長度是19;
可變位元組:從START_EVENT_V3開始到第27個Event,每個Event的fixed part lengths,每個事件一個位元組,總共27個位元組;
Variable part:空的
8.XID_EVENT
| bin-log.000003 | 315 | Xid | 1 | 342 | COMMIT /* xid=32 */
事務送出産生的XID_EVENT事件;
Fixed data part:空的
Variable part:
8位元組:事務編号;
9.BEGIN_LOAD_QUERY_EVENT
| bin-log.000003 | 964 | Begin_load_query | 1 | 1008 | ;file_id=3;block_len=21
執行LOAD DATA INFILE 語句時産生此事件
Fixed data part:
4位元組:加載Data File的ID,防止加載的Data File内容是相同的;
Variable part:
加載資料的第一個塊,如果檔案大小超過某個閥值,後面會有多個APPEND_BLOCK_EVENT事件,每一個包含一個資料塊;可變位元組長度 = 事件的總長度 – header長度 – Fixed data;因為測試資料量比較少(999, 101, ‘zhaohui’)總共就21個位元組,是以一個塊足夠了;
10.EXECUTE_LOAD_QUERY_EVENT
| bin-log.000003 | 1008 | Execute_load_query | 1 | 1237 | use `test`; LOAD DATA INFILE 'D:/btest.sql' INTO TABLE `btest` FIELDS TERMINATED BY ',' ENCLOSED BY '' ESCAPED BY '' LINES TERMINATED BY '' (`id`, `age`, `name`) ;file_id=3 |
執行LOAD DATA INFILE産生的事件,類似QUERY_EVENT事件,Fixed data的前13個位元組和QUERY_EVENT類似;
Fixed data part:
4位元組:執行sql的線程id;
4位元組:執行sql的時間;
1位元組:資料庫名稱的長度;
2位元組:執行sql産生的錯誤碼;
2位元組:狀态變量的長度,具體内容在Variable part;
4位元組:加載Data File的ID;
4位元組:檔案名替換語句中的起始位置;
4位元組:檔案名替換語句中的結束位置;
1位元組:如何處理重複資料,三個選項:LOAD_DUP_ERROR = 0, LOAD_DUP_IGNORE = 1, LOAD_DUP_REPLACE = 2
Variable part:
1.狀态變量,每個狀态變量key為一個位元組,後面跟着value,不同的key對應不同長度的value,但是總長度在Fixed data part中已經定義;
2.sql語句,通過事件的總長度-header長度-Fixed data-狀态變量,剩餘的位元組數組通過utf-8編碼即可擷取;
11.TABLE_MAP_EVENT
| bin-log.000004 | 844 | Table_map | 1 | 892 | table_id: 33 (test.btest)
将表的定義映射到一個數字,在行操作事件之前記錄(包括:WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT,DELETE_ROWS_EVENT);
Fixed data part:
6位元組:表Id;
2位元組:保留字段為将來使用;
Variable part:
1位元組:資料庫名字的長度;
可變位元組:資料庫名字,根據前一個位元組記錄的名字長度,擷取的位元組數組通過utf-8編碼即可擷取;
1位元組:表名的長度;
可變位元組:表名,根據前一個位元組記錄的名字長度,擷取的位元組數組通過utf-8編碼即可擷取;
Packed integer:用來記錄表中字段的數量;
注:Packed integer是一個可變位元組的類型,根據資料大小位元組大小不一樣,
更多詳細:https://dev.mysql.com/doc/internals/en/event-content-writing-conventions.html
可變位元組:表字段類型數組,每個字段一個位元組;
Packed integer:用來記錄表中繼資料的長度;
可變位元組:中繼資料塊,根據前一個位元組記錄的名字長度,擷取的位元組數組通過utf-8編碼即可擷取;
可變位元組:用位域表示每一個字段是否為null,一個位元組有8位,是以N個字段需要(N+7)/8個位元組;
12.WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT和DELETE_ROWS_EVENT
binlog_format為ROW模式下,執行insert,update和delete操作産生的事件;
Fixed data part:
6位元組:表Id;
2位元組:保留字段為将來使用;
Variable part:
Packed integer:記錄表中字段的數量;
可變位元組:用位域表示每個字段是否被使用(比如隻有更新、插入的字段才是被使用的),N個字段需要(N+7)/8個位元組;
可變位元組:僅用在UPDATE_ROWS_EVENT事件中,用位域表示每個字段更新之後是否被使用(值隻有真正被更新了才是被使用的),N個字段需要(N+7)/8個位元組;
接下來是記錄的每一行的資料:
可變位元組:目前行中的字段值是否為NULL,隻有這個字段被辨別為被使用,才會出現在這;
可變位元組:目前行所有字段的值,隻有這個字段被辨別為被使用,并且值不為NULL才會有值;
13.INCIDENT_EVENT
主伺服器發生了不正常的事件,通知從伺服器并告知可能會導緻資料處于不一緻的狀态;
Fixed data part:
1位元組:不正常事件的編号;
1位元組:消息的長度;
Variable part:
消息的内容,根據Fixed data part中指定的消息長度讀取消息的内容;
14.HEARTBEAT_LOG_EVENT
主伺服器告訴從伺服器,主伺服器還活着,不寫入到日志檔案中;
Fixed data part:空的
Variable part:空的
更多參考:https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
Java讀取簡單執行個體
1.建立表,并插入資料,産生binlog日志檔案;
2.檢視binlog中的事件;
mysql> show binlog events in 'bin-log.000001';+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| bin-log.000001 | 4 | Format_desc | 1 | 107 | Server ver: 5.5.29-log, Binlog ver: 4 || bin-log.000001 | 107 | Query | 1 | 364 | use `test`; CREATE TABLE `btest` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `age` int(11) DEFAULT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 || bin-log.000001 | 364 | Query | 1 | 432 | BEGIN || bin-log.000001 | 432 | Query | 1 | 536 | use `test`; insert into btest values(1,100,'zhaohui') || bin-log.000001 | 536 | Xid | 1 | 563 | COMMIT /* xid=30 */ || bin-log.000001 | 563 | Stop | 1 | 582 | |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3.通過java代碼來讀取binlog日志,具體代碼如下:
public class BinlogRead { private static RandomAccessFile file; /** 魔數的位元組長度 **/ private static final int MAGIN_LEN = 4; /** 事件header長度 **/ private static final int EVENT_HEADER_LEN = 19; /** Query_Event fix data長度 **/ private static final int QUERY_EVENT_FIX_LEN = 13; public static void main(String[] args) throws Exception { file = new RandomAccessFile(new File("D://bin-log.000001"), "rw"); FileChannel channel = file.getChannel(); /** 1.魔數4位元組 **/ ByteBuffer magic = ByteBuffer.allocate(MAGIN_LEN); channel.read(magic); /** 2.Format_desc_Event事件 **/ EventHeader header = getEventHeader(channel); channel.position(header.getEventLen() + MAGIN_LEN); /** 3.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel)); /** 4.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel)); /** 5.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel)); /** 6.Xid_Event事件 **/ header = getEventHeader(channel); ByteBuffer xidNumber = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); channel.read(xidNumber); xidNumber.flip(); System.out.println("xidNumber = " + xidNumber.getLong()); /** 7.Stop_Event事件 **/ header = getEventHeader(channel); } /** * 擷取事件Header資訊 * * @param channel * @return * @throws IOException */ private static EventHeader getEventHeader(FileChannel channel) throws IOException { ByteBuffer formatDescEventHeader = ByteBuffer.allocate(EVENT_HEADER_LEN).order(ByteOrder.LITTLE_ENDIAN); channel.read(formatDescEventHeader); formatDescEventHeader.flip(); EventHeader header = new EventHeader(); header.setTimestamp(formatDescEventHeader.getInt()); header.setTypeCode(formatDescEventHeader.get()); header.setServerId(formatDescEventHeader.getInt()); header.setEventLen(formatDescEventHeader.getInt()); header.setNextPosition(formatDescEventHeader.getInt()); header.setFlags(formatDescEventHeader.getShort()); System.out.println(header.toString()); return header; } /** * 擷取Query Event sql語句 * * @param queryEventLen * @param channel * @return * @throws IOException */ private static String getQueryEventSql(int queryEventLen, FileChannel channel) throws IOException { /** Query_Event fix data **/ ByteBuffer queryEventFix = ByteBuffer.allocate(QUERY_EVENT_FIX_LEN).order(ByteOrder.LITTLE_ENDIAN); channel.read(queryEventFix); queryEventFix.flip(); queryEventFix.position(11); /** 狀态變量的長度 **/ int statusLen = queryEventFix.getShort(); int queryEventValLen = queryEventLen - EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN; ByteBuffer queryEventVal = ByteBuffer.allocate(queryEventValLen).order(ByteOrder.LITTLE_ENDIAN); channel.read(queryEventVal); queryEventVal.flip(); queryEventVal.position(statusLen); /** 資料庫名稱 **/ queryEventVal.mark(); int length = 0; while ('0' != queryEventVal.get()) { length++; } queryEventVal.reset(); byte dbName[] = new byte[length]; queryEventVal.get(dbName); System.out.println("db name : " + new String(dbName, "utf-8")); /** sql語句 **/ byte sql[] = new byte[queryEventValLen - statusLen - length - 1]; queryEventVal.get(sql); return new String(sql, "utf-8"); }}public class EventHeader { private int timestamp; private byte typeCode; private int serverId; private int eventLen; private int nextPosition; private int flags; @Override public String toString() { return "EventHeader [timestamp=" + timestamp +