天天看點

聊聊canal的Position

本文主要研究一下canal的Position

聊聊canal的Position

Position

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/Position.java

public abstract class Position implements Serializable {
​
    private static final long serialVersionUID = 2332798099928474975L;
​
    public String toString() {
        return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
    }
​
}           

複制

  • Position定義了toString方法,使用ToStringBuilder.reflectionToString方法以CanalToStringStyle.DEFAULT_STYLE來轉換

TimePosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/TimePosition.java

public class TimePosition extends Position {
​
    private static final long serialVersionUID = 6185261261064226380L;
    protected Long            timestamp;
​
    public TimePosition(Long timestamp){
        this.timestamp = timestamp;
    }
​
    public Long getTimestamp() {
        return timestamp;
    }
​
    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
​
    //......
}           

複制

  • TimePosition繼承了Position,它定義了timestamp屬性

EntryPosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java

public class EntryPosition extends TimePosition {
​
    private static final long serialVersionUID      = 81432665066427482L;
    public static final int   EVENTIDENTITY_SEGMENT = 3;
    public static final char  EVENTIDENTITY_SPLIT   = (char) 5;
​
    private boolean           included              = false;
    private String            journalName;
    private Long              position;
    // add by agapple at 2016-06-28
    private Long              serverId              = null;              // 記錄一下位點對應的serverId
    private String            gtid                  = null;
​
    //......
}           

複制

  • EntryPosition繼承了TimePosition,它定義了included、journalName、position、serverId、gtid屬性

SlaveEntryPosition

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/SlaveEntryPosition.java

public class SlaveEntryPosition extends EntryPosition {
​
    private static final long serialVersionUID = 5271424551446372093L;
    private final String      masterHost;
    private final String      masterPort;
​
    public SlaveEntryPosition(String fileName, long position, String masterHost, String masterPort){
        super(fileName, position);
​
        this.masterHost = masterHost;
        this.masterPort = masterPort;
    }
​
    public String getMasterHost() {
        return masterHost;
    }
​
    public String getMasterPort() {
        return masterPort;
    }
}           

複制

  • SlaveEntryPosition繼承了EntryPosition,它定義了masterHost、masterPort屬性

LogPosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/LogPosition.java

public class LogPosition extends Position {
​
    private static final long serialVersionUID = 3875012010277005819L;
    private LogIdentity       identity;
    private EntryPosition     postion;
​
    public LogIdentity getIdentity() {
        return identity;
    }
​
    public void setIdentity(LogIdentity identity) {
        this.identity = identity;
    }
​
    public EntryPosition getPostion() {
        return postion;
    }
​
    public void setPostion(EntryPosition postion) {
        this.postion = postion;
    }
​
    //......
}           

複制

  • LogPosition繼承了Position,它定義了identity、postion兩個屬性

cursor

canal-1.1.4/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java

public interface CanalMetaManager extends CanalLifeCycle {
    
    //......
​
    /**
     * 擷取 cursor 遊标
     */
    Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
​
    /**
     * 更新 cursor 遊标
     */
    void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException;
​
    //......
​
}           

複制

  • CanalMetaManager定義了getCursor、updateCursor方法;其中getCursor方法根據clientIdentity傳回Position;updateCursor方法則更新指定clientIdentity的position

MemoryMetaManager

canal-1.1.4/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java

public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {
​
    protected Map<String, List<ClientIdentity>>              destinations;
    protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
    protected Map<ClientIdentity, Position>                  cursors;
​
    //......
​
    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
        return cursors.get(clientIdentity);
    }
​
    public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
        cursors.put(clientIdentity, position);
    }
​
    //......
}           

複制

  • MemoryMetaManager實作了CanalMetaManager接口,它定義了Map結構的cursors,key為ClientIdentity,value為Position;getCursor方法則根據clientIdentity從cursors擷取Position;updateCursor方法則更新cursors中key為clientIdentity的value為position

小結

Position定義了toString方法,使用ToStringBuilder.reflectionToString方法以CanalToStringStyle.DEFAULT_STYLE來轉換;它有兩個直接子類,分别是TimePosition、LogPosition;而EntryPosition繼承了TimePosition;SlaveEntryPosition繼承了EntryPosition;LogPosition則組合了EntryPosition

doc

  • Position