åè¨
éè¿åæCanalï¼å®æ模æSlaveåMaster建ç«è¿æ¥ï¼ç¶ååæ¥Binlogçè¿ç¨ã
éè¿æ¬æå¯ä»¥ç解 Mysqlç Slaveå¦ä½åMasterè¿è¡åæ¥çï¼å¯ä»¥èªè¡å¼åMockSlaveï¼åæ¶è®©æ们å¯ä»¥æ´å¥½ç使ç¨canalï¼å¹¶ä¸å¨canalåºç°é®é¢çæ¶åæ´å¥½çå®ä½é®é¢ã
æ¬æç代ç æ¯å¨canal项ç®ä¸æååºæ¥çï¼ä¸»è¦ç®çå°±æ¯ç解SlaveåMasterçåæ¥è¿ç¨ã
è½åæéï¼å¦æåºå ¥æ¬¢è¿ææ£ã
注æäºé¡¹
SlaveåMasteréä¿¡åéçæ°æ®é½æ¯Little-Endianãè¿ä¸ªæ¯éè¦ç¹åºæ³¨æçï¼æ 论æ¯è§£æè¿æ¯åéã
Javaé½æ¯Big-Endianã
å æ¤æ好å®ç°ä¸ä¸ªå·¥å ·ç±»ï¼å¯¹æ°æ®è¿è¡è¯»åã使æä½æ´å ç®åï¼canalå°±æ¯å®ç°äºä¸ä¸ªByteHelperæ¥å®æè¿ä¸ªåè½çã
åå¤å·¥ä½
éè¦é ç½®ä¸ä¸masteræ°æ®åº
[mysqld]
log-bin=mysql-bin #æ·»å è¿ä¸è¡å°±ok
binlog-format=ROW #éæ©row模å¼
server_id=1 #é ç½®mysql replactionéè¦å®ä¹ï¼ä¸è½åcanalçslaveIdéå¤
æ»ä½æµç¨å¾
æµç¨æè¿°
1ï¼è¿æ¥Mysql
æä¾äºIPåPortï¼ç¨Socketç´æ¥è¿æ¥Mysqlã
2ï¼è§£ææ¡æä¿¡æ¯
å½è¿æ¥ä¸åï¼Masterä¼ç«å»åæ¶æ¯ç»Slaveï¼å¦æ失败ï¼ä¼è¿åé误信æ¯ï¼å¦ææååè¿åå¿ è¦çæ¡æä¿¡æ¯ãæ¡æä¿¡æ¯éé¢çæ°æ®å¨åé¢éä¿¡è¿ç¨ä¸ä¼ä½¿ç¨ï¼éè¦è¿è¡ä¿åã
3ï¼åé认è¯æ¶æ¯
æ¶å°æ¡ææ¶æ¯è¡¨ç¤ºè¿æ¥æåäºï¼ç°å¨è¦åMaster建ç«ä¸ä¸ªå®å ¨çè¿æ¥ï¼éè¦å°ç¨æ·ååå¯ç åéç»Masterãæ ¼å¼å¦ä¸ï¼
bytes | æè¿° |
---|---|
4 | client_flags |
4 | max_packet_size |
1 | charset_number |
23 | (filler) always 0x00⦠|
n | (Null-Terminated String) user |
n | (Length Coded Binary) scramble_buff (1 + x bytes)ï¼scramble411å å¯çå¯ç |
n | (Null-Terminated String) databasename (optional) |
4ï¼è§£æ认è¯æ¶æ¯
解æè¿åæ¶æ¯ï¼å¦ææé误è¿è¡è§£æï¼è·åé误æ示ãæ é误å忽ç¥ã
5ï¼åé323认è¯è¯·æ±
å©ç¨æ¡ææ¶æ¯çseedåå¯ç ç¨scramble323å½æ°å¯¹å¯ç è¿è¡å å¯ï¼å°å¯ç åéç»masterãåéçå 容æ¯å å¯åçbyteæ°ç»ã
6ï¼è§£æ323è¿åæ¶æ¯
è¿ä¸ªè§£æ主è¦ä¹æ¯å¯¹é误è¿è¡å¤çã
7ï¼åéæ°æ®åºè¿æ¥åæ°
主è¦æ¯å°å¦ä¸çåæ°åéç»Masterï¼è¿è¡è®¾ç½®ã
set wait_timeout=
set net_write_timeout=
set net_read_timeout=
set names 'binary'
8ï¼è§£æåæ°è®¾ç½®ç»æ
设置æååä¼è¿åä¸ä¸ªOKçæ¶æ¯ï¼å¦æ失败ä¼æé误æ示ãæ¶æ¯æ ¼å¼å¦ä¸ï¼
VERSION 4.1
bytes | æè¿° |
---|---|
1 | (Length Coded Binary) field_count, always = 0 |
1-9 | (Length Coded Binary) affected_rows,æ ¹æ®ç±»ååå |
1-9 | (Length Coded Binary) insert_idï¼æ ¹æ®ç±»ååå |
2 | server_status |
2 | warning_count |
n | (until end of packet) message |
9ï¼åéè·åcheckSumçæ¶æ¯
åé请æ±ç»Masterï¼åéçæ¶æ¯å 容âselect @master_binlog_checksumâã
10ï¼è§£ææ¶æ¯ï¼è®°å½CheckSum
ç±äºè¿ä¸ªæ¥è¯¢æç»æéè¿åï¼å æ¤éè¦å¯¹ç»æéè¿è¡è§£æã
a:éªè¯æ¶æ¯æ¯å¦æé误ã
b:解ææ£ç¡®çæ¶æ¯
读ååä¿¡æ¯ï¼æ ¼å¼å¦ä¸ï¼
bytes | æè¿° |
---|---|
1-9 | columnCount |
1-9 | extra |
c:循ç¯è¯»ååä¿¡æ¯
bytes | æè¿° |
---|---|
n | (Length Coded String) catalog |
n | (Length Coded String) db |
n | (Length Coded String) table |
n | (Length Coded String) org_table |
n | (Length Coded String) name |
n | (Length Coded String) org_name |
1 | (filler) |
2 | charsetnr |
4 | length |
1 | type |
2 | flags |
1 | decimals |
2 | (filler), always 0x00 |
n | (Length Coded Binary) default |
d:读åEOF
æ£å¸¸è¯»åä¸ä¸ªæ¶æ¯ï¼ä¸å¤ç
e:循ç¯è¯»åè¡æ°æ®
é½æ¯å符串ï¼æç §è§å®çæ ¼å¼è¿è¡è¯»åãï¼æ ¼å¼çç¥ï¼
f:æå设置binlogChecksumçå¼
æ°å¼1表示CRC32çå缩 0表示ä¸éè¦è¿è¡å¤ç
11ï¼åéè·åbinlogä½ç½®æ¶æ¯
åéæ¥è¯¢è¯·æ±ç»Masterï¼æ¥è¯¢å 容为âshow master statusâã
12ï¼è§£ææ¶æ¯ï¼è·å¾ä½ç½®
ç±äºè¦è¯»åæ°æ®ï¼å æ¤è§£æçè¿ç¨ç±»ä¼¼äºstep10ã
æåè·å¾binlogæ件å称åä½ç½®ä¸¤ä¸ªä¿¡æ¯ãè¿éææ¯ä¸ºäºæµè¯ï¼å®é ä¸è¿ä¸ªæ°æ®æ¯éè¦åå¨å¨Slaveæ¬å°çï¼å¦åSlaveåMasterçæ°æ®ä¸åæ¥äºã
13ï¼åéåæ¥binlog请æ±
åébinlog请æ±ç»Masterï¼æ¶æ¯æ ¼å¼å¦ä¸ï¼
bytes | æè¿° |
---|---|
1 | command |
n | arg |
4 | binlog position to start at (little endian) |
2 | binlog flags (currently not used; always 0) |
4 | server_id of the slave (little endian) |
n | binlog file name (optional) |
14:å¯å¨è¯»å线ç¨ï¼å¾ªç¯è¯»å
å¯å¨è¯»å线ç¨ï¼æ¥åmasteråéè¿æ¥çåæ¥logã
logçç§ç±»å¾å¤ï¼canalä¸é½æ详ç»è¯´æï¼è¿éä¸åè¿è¡æ¶æ¯è¯´æï¼å¯ä»¥åècanalç代ç è¿è¡ç解ã
代ç
代ç æäºä¹±ï¼ä¸»è¦çmainæ¹æ³å°±å¥½äºã
public class SlaveMain
{
// è¿æ¥ææçscramble_buff
private static byte[] joinAndCreateScrambleBuff(HandshakeInitializationPacket handshakePacket) throws IOException
{
byte[] dest = new byte[handshakePacket.seed.length + handshakePacket.restOfScrambleBuff.length];
System.arraycopy(handshakePacket.seed, , dest, , handshakePacket.seed.length);
System.arraycopy(handshakePacket.restOfScrambleBuff, , dest, handshakePacket.seed.length,
handshakePacket.restOfScrambleBuff.length);
return dest;
}
private static void auth323(SocketChannel channel, byte packetSequenceNumber, byte[] seed, String password)
throws IOException
{
// auth 323
Reply323Packet r323 = new Reply323Packet();
// 1.对å¯ç è¿è¡å å¯
if (password != null && password.length() > )
{
r323.seed = MySQLPasswordEncrypter.scramble323(password, new String(seed)).getBytes();
}
byte[] b323Body = r323.toBytes();
HeaderPacket h323 = new HeaderPacket();
h323.setPacketBodyLength(b323Body.length);
h323.setPacketSequenceNumber((byte) (packetSequenceNumber + ));
PacketManager.write(channel, new ByteBuffer[] { ByteBuffer.wrap(h323.toBytes()), ByteBuffer.wrap(b323Body) });
System.out.println("client 323 authentication packet is sent out.");
// check auth result
HeaderPacket header = PacketManager.readHeader(channel, );
byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength());
assert body != null;
switch (body[])
{
case :
break;
case -:
ErrorPacket err = new ErrorPacket();
err.fromBytes(body);
throw new IOException("Error When doing Client Authentication:" + err.toString());
default:
throw new IOException("unpexpected packet with field_count=" + body[]);
}
}
// æ´æ°æä½
public static OKPacket update(SocketChannel channel, String updateString) throws IOException
{
QueryCommandPacket cmd = new QueryCommandPacket();
cmd.setQueryString(updateString);
byte[] bodyBytes = cmd.toBytes();
PacketManager.write(channel, bodyBytes);
System.out.println("read update result...");
byte[] body = PacketManager.readBytes(channel, PacketManager.readHeader(channel, ).getPacketBodyLength());
if (body[] < )
{
ErrorPacket packet = new ErrorPacket();
packet.fromBytes(body);
throw new IOException(packet + "\n with command: " + updateString);
}
OKPacket packet = new OKPacket();
packet.fromBytes(body);
return packet;
}
// æ¥è¯¢æä½
public static ResultSetPacket query(SocketChannel channel, String queryString) throws IOException
{
QueryCommandPacket cmd = new QueryCommandPacket();
cmd.setQueryString(queryString);
byte[] bodyBytes = cmd.toBytes();
PacketManager.write(channel, bodyBytes);
byte[] body = readNextPacket(channel);
if (body[] < )
{
ErrorPacket packet = new ErrorPacket();
packet.fromBytes(body);
throw new IOException(packet + "\n with command: " + queryString);
}
ResultSetHeaderPacket rsHeader = new ResultSetHeaderPacket();
rsHeader.fromBytes(body);
List<FieldPacket> fields = new ArrayList<FieldPacket>();
for (int i = ; i < rsHeader.getColumnCount(); i++)
{
FieldPacket fp = new FieldPacket();
fp.fromBytes(readNextPacket(channel));
fields.add(fp);
}
readEofPacket(channel);
List<RowDataPacket> rowData = new ArrayList<RowDataPacket>();
while (true)
{
body = readNextPacket(channel);
if (body[] == -)
{
break;
}
RowDataPacket rowDataPacket = new RowDataPacket();
rowDataPacket.fromBytes(body);
rowData.add(rowDataPacket);
}
ResultSetPacket resultSet = new ResultSetPacket();
resultSet.getFieldDescriptors().addAll(fields);
for (RowDataPacket r : rowData)
{
resultSet.getFieldValues().addAll(r.getColumns());
}
resultSet.setSourceAddress(channel.socket().getRemoteSocketAddress());
return resultSet;
}
private static void readEofPacket(SocketChannel channel) throws IOException
{
byte[] eofBody = readNextPacket(channel);
if (eofBody[] != -)
{
throw new IOException("EOF Packet is expected, but packet with field_count=" + eofBody[] + " is found.");
}
}
protected static byte[] readNextPacket(SocketChannel channel) throws IOException
{
HeaderPacket h = PacketManager.readHeader(channel, );
return PacketManager.readBytes(channel, h.getPacketBodyLength());
}
public static void main(String[] args) throws Exception
{
// MysqlConnector.connect()
// 1.è¿æ¥å°MysqlServerï¼åæ¶è®¾ç½®ä¸äºåæ°
int soTimeout = * ;
int receiveBufferSize = * ;
int sendBufferSize = * ;
String ip = "127.0.0.1";
int port = ;
String username = "root";
String password = "root";
SocketChannel channel = SocketChannel.open();
InetSocketAddress masterAddress = new InetSocketAddress(ip, port);
channel.socket().setKeepAlive(true);
channel.socket().setReuseAddress(true);
channel.socket().setSoTimeout(soTimeout);
channel.socket().setTcpNoDelay(true);
channel.socket().setReceiveBufferSize(receiveBufferSize);
channel.socket().setSendBufferSize(sendBufferSize);
channel.connect(masterAddress);
// MysqlConnector.negotiate()
// 2.读åæ¶æ¯å¤´ä¿¡æ¯
// æ¶æ¯å¤´ï¼å¤´3个byteæ¯æ¶æ¯é¿åº¦ï¼1个byteæ¯åºåå·
HeaderPacket header = PacketManager.readHeader(channel, );
System.out.println("header length=" + header.getPacketBodyLength());
// 3.读åæå®é¿åº¦çæ¥æ
byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength());
// 4.é误æ£æ¥
if (body[] < )
{// check field_count
if (body[] == -)
{
ErrorPacket error = new ErrorPacket();
error.fromBytes(body);
throw new IOException("handshake exception:\n" + error.toString());
}
else if (body[] == -)
{
throw new IOException("Unexpected EOF packet at handshake phase.");
}
else
{
throw new IOException("unpexpected packet with field_count=" + body[]);
}
}
// 5.æ¡ææ¶æ¯åå§å
HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
handshakePacket.fromBytes(body);// 解ææ¡ææ¶æ¯
long connectionId = -;
byte charsetNumber = ;
String defaultSchema = "testdb";
connectionId = handshakePacket.threadId; // è®°å½ä¸ä¸connection
System.out
.println("handshake initialization packet received, prepare the client authentication packet to send");
// 6.ç»ç»è®¤è¯æ¶æ¯
ClientAuthenticationPacket clientAuth = new ClientAuthenticationPacket();
clientAuth.setCharsetNumber(charsetNumber);
clientAuth.setUsername(username);// ç¨æ·å
clientAuth.setPassword(password);// å¯ç
clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);// 容éæ
åµï¼è¿ä¸ªæ¶æ¯æ¯æ¡ææ¶æ¯ä¸æ¿å°ç
clientAuth.setDatabaseName(defaultSchema);// é»è®¤æ°æ®åº
clientAuth.setScrumbleBuff(joinAndCreateScrambleBuff(handshakePacket));// å°ä¸¤ä¸ªä¸åä½ç½®çscramble_buffå并å°ä¸ä¸ªæ°ç»ä¸
byte[] clientAuthPkgBody = clientAuth.toBytes();
HeaderPacket h = new HeaderPacket();
h.setPacketBodyLength(clientAuthPkgBody.length);// 设置æ¥æå
容é¿åº¦
h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + ));// åºå+1
// 7.åé认è¯ä¿¡æ¯
PacketManager.write(channel,
new ByteBuffer[] { ByteBuffer.wrap(h.toBytes()), ByteBuffer.wrap(clientAuthPkgBody) });
System.out.println("client authentication packet is sent out.");
// Mysqlä¼ç«å»è¿åä¿¡æ¯,解æMysqlç第äºä¸ªä¿¡æ¯
// check auth result
// 8.读åæ¶æ¯å¤´
header = null;
header = PacketManager.readHeader(channel, );
body = null;
// 9.读åæ¶æ¯ä½
body = PacketManager.readBytes(channel, header.getPacketBodyLength());
assert body != null;
if (body[] < )
{
if (body[] == -)
{
ErrorPacket err = new ErrorPacket();
err.fromBytes(body);
throw new IOException("Error When doing Client Authentication:" + err.toString());
}
else if (body[] == -)
{
auth323(channel, header.getPacketSequenceNumber(), handshakePacket.seed, password);
}
else
{
throw new IOException("unpexpected packet with field_count=" + body[]);
}
}
// Connection å·²ç»å»ºç«å®æäºï¼æ¥ä¸æ¥å°±å¯ä»¥å»è¯·æ±å½åbinlogæ件å称ï¼ä½ç½®çä¿¡æ¯ã
// æ´æ°æ°æ®åºä¿¡æ¯
try
{
update(channel, "set wait_timeout=9999999");
}
catch (Exception e)
{
e.printStackTrace();
}
try
{
update(channel, "set net_write_timeout=1800");
}
catch (Exception e)
{
e.printStackTrace();
}
try
{
update(channel, "set net_read_timeout=1800");
}
catch (Exception e)
{
}
try
{
// 设置æå¡ç«¯è¿åç»ææ¶ä¸åç¼ç 转åï¼ç´æ¥æç
§æ°æ®åºçäºè¿å¶ç¼ç è¿è¡åéï¼ç±å®¢æ·ç«¯èªå·±æ ¹æ®éæ±è¿è¡ç¼ç 转å
update(channel, "set names 'binary'");
}
catch (Exception e)
{
e.printStackTrace();
}
try
{
// mysql5.6é对checksumæ¯æéè¦è®¾ç½®sessionåé
// å¦æä¸è®¾ç½®ä¼åºç°éè¯¯ï¼ Slave can not handle replication events with the
// checksum that master is configured to log
// ä½ä¹ä¸è½ä¹±è®¾ç½®ï¼éè¦åmysql serverçchecksumé
ç½®ä¸è´ï¼ä¸ç¶RotateLogEventä¼åºç°ä¹±ç
// '@@global.binlog_checksum'éè¦å»æåå¼å·,å¨mysql 5.6.29ä¸å¯¼è´masteréåº
update(channel, "set @master_binlog_checksum= @@global.binlog_checksum");
}
catch (Exception e)
{
e.printStackTrace();
}
// æ¥è¯¢binlog_checksum
ResultSetPacket rs = null;
try
{
rs = query(channel, "select @master_binlog_checksum");
}
catch (IOException e)
{
e.printStackTrace();
}
int binlogChecksum;
List<String> columnValues = rs.getFieldValues();
if (columnValues != null && columnValues.size() >= && columnValues.get().toUpperCase().equals("CRC32"))
{
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
}
else
{
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
}
// 10.åé请æ±æ¥è¯¢ä½ç½®
ResultSetPacket packet = query(channel, "show master status");
List<String> fields = packet.getFieldValues();
if (fields == null || fields.size() == )
{
System.out.println("æ æ³æ¾å°å½åçä½ç½®");
return;
}
EntryPosition endPosition = new EntryPosition(fields.get(), Long.valueOf(fields.get()));
System.out.println(
String.format("fileName= %s pos=%d", endPosition.getJournalName(), endPosition.getPosition()));
// å®é
ä¸è¿ä¸ªæ件å称åä½ç½®æ¯éè¦åå¨å°æ¬å°çï¼ç¶åå»è¿è¡åæ¥
// 10.åéç³è¯·binlogçå½ä»¤
BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
// è¿ä¸ªæ°æ®æ¯éè¦æ¥åºæ¥ç
binlogDumpCmd.binlogFileName = endPosition.getJournalName();
binlogDumpCmd.binlogPosition = endPosition.getPosition();
binlogDumpCmd.slaveServerId = ;
byte[] cmdBody = binlogDumpCmd.toBytes();
HeaderPacket binlogDumpHeader = new HeaderPacket();
binlogDumpHeader.setPacketBodyLength(cmdBody.length);
binlogDumpHeader.setPacketSequenceNumber((byte) );
PacketManager.write(channel,
new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()), ByteBuffer.wrap(cmdBody) });
DirectLogFetcher fetcher = new DirectLogFetcher();
// 11.å¯å¨è¯»åchannelç线ç¨
fetcher.start(channel);
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
context.setLogPosition(new LogPosition(binlogDumpCmd.binlogFileName));
context.setFormatDescription(new FormatDescriptionLogEvent(, binlogChecksum));
// 12.循ç¯è¯»åæ¶æ¯
while (fetcher.fetch())
{
LogEvent event = null;
event = decoder.decode(fetcher, context);
if (event == null)
{
throw new RuntimeException("parse failed");
}
int eventType = event.getHeader().getType();
System.out.println("eventType=" + eventType);
switch (eventType)
{
case LogEvent.ROTATE_EVENT:
// binlogFileName = ((RotateLogEvent)
// event).getFilename();
break;
case LogEvent.WRITE_ROWS_EVENT_V1:
case LogEvent.WRITE_ROWS_EVENT:
parseRowsEvent(endPosition.getJournalName(), (WriteRowsLogEvent) event);
break;
case LogEvent.UPDATE_ROWS_EVENT_V1:
case LogEvent.UPDATE_ROWS_EVENT:
// parseRowsEvent((UpdateRowsLogEvent) event);
break;
case LogEvent.DELETE_ROWS_EVENT_V1:
case LogEvent.DELETE_ROWS_EVENT:
// parseRowsEvent((DeleteRowsLogEvent) event);
break;
case LogEvent.QUERY_EVENT:
// parseQueryEvent((QueryLogEvent) event);
break;
case LogEvent.ROWS_QUERY_LOG_EVENT:
// parseRowsQueryEvent((RowsQueryLogEvent) event);
break;
case LogEvent.ANNOTATE_ROWS_EVENT:
break;
case LogEvent.XID_EVENT:
break;
default:
break;
}
}
}
protected static Charset charset = Charset.forName("utf-8");
protected static void parseRowsEvent(String binlogFileName, RowsLogEvent event)
{
try
{
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s]", binlogFileName,
event.getHeader().getLogPos() - event.getHeader().getEventLen(), event.getTable().getDbName(),
event.getTable().getTableName()));
RowsLogBuffer buffer = event.getRowsBuf(charset.name());
BitSet columns = event.getColumns();
BitSet changeColumns = event.getChangeColumns();
while (buffer.nextOneRow(columns))
{
// å¤çrowè®°å½
int type = event.getHeader().getType();
if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type)
{
// insertçè®°å½æ¾å¨beforeå段ä¸
parseOneRow(event, buffer, columns, true);
}
else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type)
{
// deleteçè®°å½æ¾å¨beforeå段ä¸
parseOneRow(event, buffer, columns, false);
}
else
{
// updateéè¦å¤çbefore/after
System.out.println("-------> before");
parseOneRow(event, buffer, columns, false);
if (!buffer.nextOneRow(changeColumns))
{
break;
}
System.out.println("-------> after");
parseOneRow(event, buffer, changeColumns, true);
}
}
}
catch (Exception e)
{
throw new RuntimeException("parse row data failed.", e);
}
}
protected static void parseOneRow(RowsLogEvent event, RowsLogBuffer buffer, BitSet cols, boolean isAfter)
throws UnsupportedEncodingException
{
TableMapLogEvent map = event.getTable();
if (map == null)
{
throw new RuntimeException("not found TableMap with tid=" + event.getTableId());
}
final int columnCnt = map.getColumnCnt();
final ColumnInfo[] columnInfo = map.getColumnInfo();
for (int i = ; i < columnCnt; i++)
{
if (!cols.get(i))
{
continue;
}
ColumnInfo info = columnInfo[i];
buffer.nextValue(info.type, info.meta);
if (buffer.isNull())
{
//
}
else
{
final Serializable value = buffer.getValue();
if (value instanceof byte[])
{
System.out.println(new String((byte[]) value));
}
else
{
System.out.println(value);
}
}
}
}
}
Mysqlæåºé误
Could not find first log file name in binary log index file
åçäºè¿ä¸ªé®é¢ï¼ç½ä¸æå¾å¤è§£å³æ¹æ³ã
åçè¿ä¸ªé®é¢çåå æ述为ï¼
请æ±binlogæ件åæè ä½ç½®åºç°äºé误ã
masterä¸ä¿åçbinlogæ件åä½ç½®ç»å¸¸ä¼åºéãï¼è¿ä¸ªè¿æ²¡æè±æ¶é´å»æ¾ï¼æç¥éçç个è¨ï¼
ä½ ç¨show master statusè·åçä¿¡æ¯å¯è½æé误ã
è¿ä¸ªæ¶åéè¦è®²binlogéè¿mysqlbinlogå½ä»¤å°binlogæ件转æ¢ætxtï¼ç¶åæ¥çæ件æ¾å°æåä¸ä¸ª#at 123 è¿ä¸ª123å°±æ¯æ£ç¡®çposï¼è¯·æ±çæ¶åéè¦è®¾ç½®è¿ä¸ª123æ¥è¯·æ±
mysqlbinglog binlogæ件>info.txt
è¿æä¸ä¸ªæ¯è¾ç¬¨ä½æ¯å¾ææçåæ³ï¼å°±æ¯æmasteråäºï¼ç¶åéå¯å°±å¯ä»¥äºã
é¨ååè
http://blog.csdn.net/hackerwin7/article/details/37923607
https://dev.mysql.com/doc/refman/5.5/en/binary-log.html
https://github.com/alibaba/canal