天天看點

datagrid如何擷取一行資料中的某個字段值_MySql Binlog事件資料篇Binlog事件資料Java讀取簡單執行個體

datagrid如何擷取一行資料中的某個字段值_MySql Binlog事件資料篇Binlog事件資料Java讀取簡單執行個體

前言

前兩篇文章MySql Binlog初識和MySql Binlog事件介紹篇分别從Binlog入門和Binlog事件如何産生的兩個角度來介紹Binlog,本文将從Binlog事件的資料來更深入的了解Binlog。

Binlog事件資料

1.QUERY_EVENT

執行更新語句時會生成此事件,包括:create,insert,update,delete;

Fixed data part,總長度13位元組:

4位元組:執行sql的線程id;

4位元組:執行sql的時間;

1位元組:資料庫名稱的長度;

2位元組:執行sql産生的錯誤碼;

2位元組:狀态變量的長度,具體内容在Variable part;

Variable part:

可變位元組:狀态變量,每個狀态變量key為一個位元組,後面跟着value,不同的key對應不同長度的value,但是總長度在Fixed data part中已經定義;

可變位元組:資料庫名稱

可變位元組:sql語句,通過事件的總長度-header長度-Fixed data-狀态變量,剩餘的位元組數組通過utf-8編碼即可擷取;

2.STOP_EVENT

當mysqld停止時生成此事件;

Fixed data part:空的

Variable part:空的

3.ROTATE_EVENT

當mysqld切換到新的binlog檔案生成此事件;

Fixed data part,總長度8位元組:

8位元組:下一個binlog檔案的第一個事件的position,這個值一直是4,因為魔數占用了4位元組;

Variable data part:

可變位元組:下一個binlog的名稱,它的長度=事件總長度-header長度-Fixed data

4.INTVAR_EVENT

當sql語句中使用了AUTO_INCREMENT的字段或者LAST_INSERT_ID()函數;

Fixed data part:空的

Variable part:

1位元組:一個變量類型的值:LAST_INSERT_ID_EVENT = 1 或者 INSERT_ID_EVENT = 2;

8位元組:LAST_INSERT_ID()函數調用,或者AUTO_INCREMENT字段生成的一個無符号的整型;

5.RAND_EVENT

| bin-log.000003 | 438 | RAND | 1 | 473 | rand_seed1=223769196,rand_seed2=1013907192
           

執行包含RAND()函數的語句産生此事件,此事件沒有被用在binlog_format為ROW模式的情況下;

Fixed data part:空的

Variable part:

8位元組:第一個種子值(ex:rand_seed1=223769196)

8位元組:第二個種子值(ex:rand_seed2=1013907192)

6.USER_VAR_EVENT

| bin-log.000003 | 711 | User var | 1 | 756 | @`age`=50
           

執行包含了使用者變量的語句産生此事件,此事件沒有被用在binlog_format為ROW模式的情況下;

Fixed data part:空的

Variable part:

4位元組:使用者變量名的大小;

可變位元組:使用者變量名,具體長度上一個4位元組的資料指定了;

1位元組:如果是變量值是NULL,那麼此值是非0的;如果是此值是0,那麼才有接下來的其他資料;應該是對有空值情況的一種優化;

1位元組:使用者變量類型,包括:(STRING_RESULT=0, REAL_RESULT=1, INT_RESULT=2, ROW_RESULT=3, DECIMAL_RESULT=4);

4位元組:使用者變量字元的數量;

4位元組:使用者變量值的長度;

可變位元組:變量的值,通過變量類型和變量值的長度,可以解析出具體的變量值;

7.FORMAT_DESCRIPTION_EVENT

| bin-log.000003 | 4 | Format_desc | 1 | 107 | Server ver: 5.5.29-log, Binlog ver: 4
           

描述事件,被寫在每個binlog檔案的開始位置,用在MySQL5.0以後的版本中,代替了START_EVENT_V3;

Fixed data part:

2位元組:binlog版本,Mysql5.0以及以上的版本值為:4

50位元組:Mysql Server版本;

4位元組:事件建立的時間戳;

1位元組:header的長度,binlog版本為4的情況下header長度是19;

可變位元組:從START_EVENT_V3開始到第27個Event,每個Event的fixed part lengths,每個事件一個位元組,總共27個位元組;

Variable part:空的

8.XID_EVENT

| bin-log.000003 | 315 | Xid | 1 | 342 | COMMIT /* xid=32 */
           

事務送出産生的XID_EVENT事件;

Fixed data part:空的

Variable part:

8位元組:事務編号;

9.BEGIN_LOAD_QUERY_EVENT

| bin-log.000003 | 964 | Begin_load_query | 1 | 1008 | ;file_id=3;block_len=21
           

執行LOAD DATA INFILE 語句時産生此事件

Fixed data part:

4位元組:加載Data File的ID,防止加載的Data File内容是相同的;

Variable part:

加載資料的第一個塊,如果檔案大小超過某個閥值,後面會有多個APPEND_BLOCK_EVENT事件,每一個包含一個資料塊;可變位元組長度 = 事件的總長度 – header長度 – Fixed data;因為測試資料量比較少(999, 101, ‘zhaohui’)總共就21個位元組,是以一個塊足夠了;

10.EXECUTE_LOAD_QUERY_EVENT

| bin-log.000003 | 1008 | Execute_load_query | 1 | 1237 | use `test`; LOAD DATA INFILE 'D:/btest.sql' INTO TABLE `btest` FIELDS TERMINATED BY ',' ENCLOSED BY '' ESCAPED BY '' LINES TERMINATED BY '' (`id`, `age`, `name`) ;file_id=3 |
           

執行LOAD DATA INFILE産生的事件,類似QUERY_EVENT事件,Fixed data的前13個位元組和QUERY_EVENT類似;

Fixed data part:

4位元組:執行sql的線程id;

4位元組:執行sql的時間;

1位元組:資料庫名稱的長度;

2位元組:執行sql産生的錯誤碼;

2位元組:狀态變量的長度,具體内容在Variable part;

4位元組:加載Data File的ID;

4位元組:檔案名替換語句中的起始位置;

4位元組:檔案名替換語句中的結束位置;

1位元組:如何處理重複資料,三個選項:LOAD_DUP_ERROR = 0, LOAD_DUP_IGNORE = 1, LOAD_DUP_REPLACE = 2

Variable part:

1.狀态變量,每個狀态變量key為一個位元組,後面跟着value,不同的key對應不同長度的value,但是總長度在Fixed data part中已經定義;

2.sql語句,通過事件的總長度-header長度-Fixed data-狀态變量,剩餘的位元組數組通過utf-8編碼即可擷取;

11.TABLE_MAP_EVENT

| bin-log.000004 | 844 | Table_map | 1 | 892 | table_id: 33 (test.btest)
           

将表的定義映射到一個數字,在行操作事件之前記錄(包括:WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT,DELETE_ROWS_EVENT);

Fixed data part:

6位元組:表Id;

2位元組:保留字段為将來使用;

Variable part:

1位元組:資料庫名字的長度;

可變位元組:資料庫名字,根據前一個位元組記錄的名字長度,擷取的位元組數組通過utf-8編碼即可擷取;

1位元組:表名的長度;

可變位元組:表名,根據前一個位元組記錄的名字長度,擷取的位元組數組通過utf-8編碼即可擷取;

Packed integer:用來記錄表中字段的數量;

注:Packed integer是一個可變位元組的類型,根據資料大小位元組大小不一樣,

更多詳細:https://dev.mysql.com/doc/internals/en/event-content-writing-conventions.html

可變位元組:表字段類型數組,每個字段一個位元組;

Packed integer:用來記錄表中繼資料的長度;

可變位元組:中繼資料塊,根據前一個位元組記錄的名字長度,擷取的位元組數組通過utf-8編碼即可擷取;

可變位元組:用位域表示每一個字段是否為null,一個位元組有8位,是以N個字段需要(N+7)/8個位元組;

12.WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT和DELETE_ROWS_EVENT

binlog_format為ROW模式下,執行insert,update和delete操作産生的事件;

Fixed data part:

6位元組:表Id;

2位元組:保留字段為将來使用;

Variable part:

Packed integer:記錄表中字段的數量;

可變位元組:用位域表示每個字段是否被使用(比如隻有更新、插入的字段才是被使用的),N個字段需要(N+7)/8個位元組;

可變位元組:僅用在UPDATE_ROWS_EVENT事件中,用位域表示每個字段更新之後是否被使用(值隻有真正被更新了才是被使用的),N個字段需要(N+7)/8個位元組;

接下來是記錄的每一行的資料:

可變位元組:目前行中的字段值是否為NULL,隻有這個字段被辨別為被使用,才會出現在這;

可變位元組:目前行所有字段的值,隻有這個字段被辨別為被使用,并且值不為NULL才會有值;

13.INCIDENT_EVENT

主伺服器發生了不正常的事件,通知從伺服器并告知可能會導緻資料處于不一緻的狀态;

Fixed data part:

1位元組:不正常事件的編号;

1位元組:消息的長度;

Variable part:

消息的内容,根據Fixed data part中指定的消息長度讀取消息的内容;

14.HEARTBEAT_LOG_EVENT

主伺服器告訴從伺服器,主伺服器還活着,不寫入到日志檔案中;

Fixed data part:空的

Variable part:空的

更多參考:https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html

Java讀取簡單執行個體

1.建立表,并插入資料,産生binlog日志檔案;

2.檢視binlog中的事件;

mysql> show binlog events in 'bin-log.000001';+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| bin-log.000001 | 4 | Format_desc | 1 | 107 | Server ver: 5.5.29-log, Binlog ver: 4 || bin-log.000001 | 107 | Query | 1 | 364 | use `test`; CREATE TABLE `btest` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `age` int(11) DEFAULT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 || bin-log.000001 | 364 | Query | 1 | 432 | BEGIN || bin-log.000001 | 432 | Query | 1 | 536 | use `test`; insert into btest values(1,100,'zhaohui') || bin-log.000001 | 536 | Xid | 1 | 563 | COMMIT /* xid=30 */ || bin-log.000001 | 563 | Stop | 1 | 582 | |+----------------+-----+-------------+-----------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
           

3.通過java代碼來讀取binlog日志,具體代碼如下:

public class BinlogRead {  private static RandomAccessFile file; /** 魔數的位元組長度 **/ private static final int MAGIN_LEN = 4; /** 事件header長度 **/ private static final int EVENT_HEADER_LEN = 19; /** Query_Event fix data長度 **/ private static final int QUERY_EVENT_FIX_LEN = 13;  public static void main(String[] args) throws Exception { file = new RandomAccessFile(new File("D://bin-log.000001"), "rw"); FileChannel channel = file.getChannel();  /** 1.魔數4位元組 **/ ByteBuffer magic = ByteBuffer.allocate(MAGIN_LEN); channel.read(magic);  /** 2.Format_desc_Event事件 **/ EventHeader header = getEventHeader(channel); channel.position(header.getEventLen() + MAGIN_LEN);  /** 3.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel));  /** 4.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel));  /** 5.Query_Event事件 **/ header = getEventHeader(channel); System.out.println(getQueryEventSql(header.getEventLen(), channel));  /** 6.Xid_Event事件 **/ header = getEventHeader(channel); ByteBuffer xidNumber = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); channel.read(xidNumber); xidNumber.flip(); System.out.println("xidNumber = " + xidNumber.getLong());  /** 7.Stop_Event事件 **/ header = getEventHeader(channel);  }  /** * 擷取事件Header資訊 *  * @param channel * @return * @throws IOException */ private static EventHeader getEventHeader(FileChannel channel) throws IOException { ByteBuffer formatDescEventHeader = ByteBuffer.allocate(EVENT_HEADER_LEN).order(ByteOrder.LITTLE_ENDIAN); channel.read(formatDescEventHeader); formatDescEventHeader.flip(); EventHeader header = new EventHeader(); header.setTimestamp(formatDescEventHeader.getInt()); header.setTypeCode(formatDescEventHeader.get()); header.setServerId(formatDescEventHeader.getInt()); header.setEventLen(formatDescEventHeader.getInt()); header.setNextPosition(formatDescEventHeader.getInt()); header.setFlags(formatDescEventHeader.getShort()); System.out.println(header.toString()); return header; }  /** * 擷取Query Event sql語句 *  * @param queryEventLen * @param channel * @return * @throws IOException */ private static String getQueryEventSql(int queryEventLen, FileChannel channel) throws IOException { /** Query_Event fix data **/ ByteBuffer queryEventFix = ByteBuffer.allocate(QUERY_EVENT_FIX_LEN).order(ByteOrder.LITTLE_ENDIAN); channel.read(queryEventFix); queryEventFix.flip(); queryEventFix.position(11);  /** 狀态變量的長度 **/ int statusLen = queryEventFix.getShort();  int queryEventValLen = queryEventLen - EVENT_HEADER_LEN - QUERY_EVENT_FIX_LEN; ByteBuffer queryEventVal = ByteBuffer.allocate(queryEventValLen).order(ByteOrder.LITTLE_ENDIAN); channel.read(queryEventVal); queryEventVal.flip(); queryEventVal.position(statusLen);  /** 資料庫名稱 **/ queryEventVal.mark(); int length = 0; while ('0' != queryEventVal.get()) { length++; } queryEventVal.reset(); byte dbName[] = new byte[length]; queryEventVal.get(dbName); System.out.println("db name : " + new String(dbName, "utf-8"));  /** sql語句 **/ byte sql[] = new byte[queryEventValLen - statusLen - length - 1]; queryEventVal.get(sql); return new String(sql, "utf-8"); }}public class EventHeader {  private int timestamp; private byte typeCode; private int serverId; private int eventLen; private int nextPosition; private int flags;  @Override public String toString() { return "EventHeader [timestamp=" + timestamp + 
           

繼續閱讀