天天看點

使用canal同步Mysql操作到Oracle(windows)配置canal.deployer.1.1.3\conf\example\instance.properties

版本資訊

java version “1.8.0_141”

Mysql version mysql-8.0.19-winx64

Canal version canal.deployer.1.1.3

rt = 11112

配置mysql的my.ini配置檔案

[mysqld]

mysql_native_password default_authentication_plugin=mysql_native_password

server-id=1

bind-address=0.0.0.0

#開啟binlog日志

log-bin=mysql-bin

binlog_format = ROW

[mysql]

#設定mysql用戶端預設字元集

default-character-set=utf8mb4

[client]

#設定mysql用戶端連接配接服務端時預設使用的端口

port=3306

default-character-set=utf8mb4

配置canal.deployer.1.1.3\conf\example\instance.properties

#position info

canal.instance.master.address=127.0.0.1:3306

canal.instance.master.journal.name=

canal.instance.master.position=

canal.instance.master.timestamp=

canal.instance.master.gtid=

#rds oss binlog

canal.instance.rds.accesskey=

canal.instance.rds.secretkey=

canal.instance.rds.instanceId=

#table meta tsdb info

canal.instance.tsdb.enable=true

#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

#canal.instance.tsdb.dbUsername=canal

#canal.instance.tsdb.dbPassword=canal

配置canal.deployer.1.1.3\conf\canal.properties

#本地IP192.168.31.1:3306

canal.manager.jdbc.url=jdbc:mysql://192.168.31.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8

#canal.manager.jdbc.username=root

#canal.manager.jdbc.password=121212

canal.destinations=example

#與my.ini内的server id= 不同即可

canal.id = 111111

canal.ip =

canal.port = 11111

canal.metrics.pull.port = 11112

#table meta tsdb info

canal.instance.tsdb.enable = true

canal.instance.tsdb.dir = c a n a l . f i l e . d a t a . d i r : . . / c o n f / {canal.file.data.dir:../conf}/ canal.file.data.dir:../conf/{canal.instance.destination:}

canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

canal.instance.tsdb.dbUsername = canal

canal.instance.tsdb.dbPassword = canal

#dump snapshot interval, default 24 hour

canal.instance.tsdb.snapshot.interval = 24

#purge snapshot expire , default 360 hour(15 days)

canal.instance.tsdb.snapshot.expire = 360

#aliyun ak/sk , support rds/mq

canal.aliyun.accessKey =

canal.aliyun.secretKey =

#################################################

######### destinations #############

#################################################

canal.destinations = example

#conf root dir

canal.conf.dir = …/conf

#auto scan instance dir add/remove and start/stop instance

canal.auto.scan = true

canal.auto.scan.interval = 5

重新開機mysql,登入mysql内

建立canal使用者和授權;

// 新增使用者

CREATE USER ‘canal’@’%’ IDENTIFIED BY ‘canal’;

// 授權

GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;

// 重新整理

FLUSH PRIVILEGES;

開啟canal.deployer.1.1.3\bin\start.bat

cmd界面:

使用canal同步Mysql操作到Oracle(windows)配置canal.deployer.1.1.3\conf\example\instance.properties

檢視:canal.deployer.1.1.3\logs\canal\canal.log

使用canal同步Mysql操作到Oracle(windows)配置canal.deployer.1.1.3\conf\example\instance.properties

canal配置的其他參數以及解釋:

參考:寫的很詳細,在此感謝大神的辛苦創作

https://blog.csdn.net/u012758088/article/details/78789616?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

javacanal連接配接。代碼

注意事項一:下面的代碼需要保證mysql與oracle導入導出的資料庫名稱和表名稱完全一緻,而且在mysql端建立删除表,oracle也會有對應的操作,建立表之後就可以插入資料,

因為Oracle與mysql的sql語句不通,而且拿到sql語句不支援oracle直接操作,是以該代碼不支援delete,insert,update,create,drop之外的操作,需要另外開發(沒必要)

注意事項二:因為拿到的資料均為string,是以在對資料進行操作時,除了基本資料類型和String類型,其他類型均要轉換成oracle支援的資料類型,date的已經做了轉換,基本資料類型和string不需要裝換,如果有其他資料類型時,請先校驗轉換,參考date類型的處理方式,除date外,sql語句拼接的都是string(oracle可以用string資料導入基本資料類型的字段,例如 “1”等同于int 1)

注意事項3:如果有其他資料類型而且不能做轉換,請聯系QQ:1078442730,有另外的處理方式,但是要犧牲靈活性,所有的表都要事先建立對象,如果表有100個字段,就要建立100個屬性,重複代碼太多,靈活性太低,但是可以保證所有資料類型在mysql端和oracle端是一緻的。

canla連接配接:通過配置檔案連接配接
package com.zzw.Conn;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Properties;

public class CanalConn {
    static String hostname = null;
    static int    port = 0;
    static String destination = null;
    static String username = null;
    static String password = null;

    static {
        Properties properties = new Properties();
        try {
         properties.load(newFileInputStream("D:\\idea_code\\CanalMysqlToOrcal\\lib\\canal"));
            hostname = properties.getProperty("hostname");
            port = Integer.parseInt(properties.getProperty("port"));
            destination = properties.getProperty("destination");
            username = properties.getProperty("username");
            password = properties.getProperty("Cpassword");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static CanalConnector getconn(){
        InetSocketAddress isa= new InetSocketAddress(hostname,port);
        CanalConnector connector = CanalConnectors.newSingleConnector(isa,destination,username,password);
        System.out.println("connection Successfully");
        return connector;
    }
}
           

Canal的java配置檔案:

hostname=192.168.31.1 canal運作的用戶端IP

port=11111 canal的conf下的配置檔案中配置的port

destination=example canal的conf下的配置檔案中配置的destination

username=canal 第四步建立授權的canal使用者與密碼,與canal的conf下的

配置檔案中配置的保持一緻

Password=canla

CanalMysqlToOrcal下的代碼部分詳解

package com.zzw.RunSoft;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.beimingsoft.Conn.CanalConn;
import com.beimingsoft.Conn.OracleConn;
import com.beimingsoft.actiontooracle.ActionToOracle;

import java.sql.Connection;
import java.sql.PreparedStatement;

/**
 * long batchId = message.getId();此參數類似于KAFKA的偏移量,當操作成功時,偏移量增加,否則復原
 * connector.ack(batchId):送出偏移量
 * connector.rollback(batchId):復原偏移量
 * tips: batchid=message.getId(),指的是同一個庫同一個表的偏移量,例如如果操作的表未在目标庫建立,則在建立後消費對應的message.getId()
 * 也就是:每個表都有對應的message.getId(),并且互相之間互不影響
 */
public class MysqlToOracle {
    public static void main(String[] args)  {
        //擷取oracle連接配接
        Connection conn = OracleConn.getConnection();
        //建立PreparedStatement
        PreparedStatement ps = null;
        //建立GetTabFileds 對象
        //擷取canal的連接配接
        CanalConnector connector = CanalConn.getconn();
        connector.connect();
        /*bin-log的分隔符,與cmd界面的:
        [New I/O server worker #1-2] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert
        - --> init table filter : ^.*\..*$保持一緻,預設就是.*\..*  不建議修改canal的該項配置
        */
        connector.subscribe(".*\\..*");
        //持續拉取資料,有bin-log産生就會消費
        while (true) {
            //i,用于判斷是否執行成功,i==1,成功,送出message.get.Id,否則復原message.get.Id
            int i = 0;
            // 擷取指定數量的資料
            Message message = connector.getWithoutAck(100);
            //拿到偏移量
            long batchId = message.getId();
            //如果沒有資料可以拉取,則休眠1s
            if (batchId == -1 || message.getEntries().isEmpty()) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            //得到i值,并執行操作
            i = ActionToOracle.action(conn, ps, message.getEntries());
            if (i == 1) {
   	 // 送出确認,消費成功,通知server删除資料
    	      connector.ack(batchId);
    	      System.out.println("偏移量更新成功");
//*****************************************************************
//因為在代碼測試過程中 mysql與oracle的sql語句不通,是以會不斷報錯,是以設定了無論成功還是失敗都送出偏移量
// 不然會不停的復原不停地拉取不停的報錯,是以按照需要這部分代碼需要按照實際更改
	} else if (i==0) {
    	      Connector.ack(batchId);
    	      System.out.println("再見蠢貨");
   ***//為防止不停報錯,以下語句實際不會出現,按照需求修改***
	}else {
    // 處理失敗, 復原資料,後續重新擷取資料
    	      connector.rollback(batchId);
    	      System.out.println("偏移量復原成功");
	}
        }
    }
}
           

對oracle操作部分的代碼

package com.zzw.actiontooracle;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;

public class ActionToOracle {
//每個方法的傳回值 1,代表成功,0代表失敗 ,預設0
    private static int status = 0;
//從sql語句中抽取中繼資料表的庫名和表名
    public static String from_tab = null;
    public static String from_db = null;
//ddl操作的sql語句
private static String ddl_sql = null;

    public static int action(Connection conn, PreparedStatement ps, List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
//拿到binlog
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
//從sql語句中抽取中繼資料表的庫名和表名
            from_tab = entry.getHeader().getTableName();
            from_db = entry.getHeader().getSchemaName();
//拿到sql,該SQL智能拿到DDLSQL語句,DMLSQL語句拿不到,dml操作的話,rowChange.getSql()不執行(測試結果如此,沒有查到資料講為什麼這樣)
            ddl_sql = rowChange.getSql();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
/**
如果是ddl語句,比對insert,delete,update,執行相關操作
*/
            if (!rowChange.getIsDdl()) {
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    switch (eventType) {

                        case INSERT:
                            System.out.println();
                            System.out.println("INSERT ");
                            status = ActionToOracleFuction.insertToOracle(conn, ps, rowData.getAfterColumnsList());
                            break;
                        case UPDATE:
                            System.out.println();
                            System.out.println("UPDATE ");
                            status = ActionToOracleFuction.updateToOracle(conn, ps, rowData.getAfterColumnsList(),rowData.getBeforeColumnsList());
                            break;
                        case DELETE:
                            System.out.println();
                            System.out.println("DELETE ");
                            status = ActionToOracleFuction.deleteFromOracle(conn, ps, rowData.getBeforeColumnsList());
                            break;
                        default:
                            System.out.println(eventType);
                            break;

                    }
                }
            } else {
//如果是DDL操作 執行下面的語句
                System.out.println();
                System.out.println("ddl操作");
                status = ActionToOracleFuction.ddlAction(conn, ps, ddl_sql, eventType);

            }
        }
        return status;

    }
}
           

執行oracle實際操作的函數類

package com.zzw.actiontooracle;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.beimingsoft.Conn.OracleConn;

import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import static com.beimingsoft.actiontooracle.ActionToOracle.from_db;
import static com.beimingsoft.actiontooracle.ActionToOracle.from_tab;

;


public class ActionToOracleFuction {
    private static int deletestatus = 0;
    private static int updatestatus = 0;
    private static int insertstatus = 0;
    private static int ddlactionstatus = 0;
    private static String sql = null;
    private static Date date = null;
    private static List<Date> dates = new ArrayList<>();
//beforecolumns:執行DML操作之前的資料集 aftercolimns :之心DML操作之後的資料集
//具體思路檢視insertFromOracle函數,總之一句話,拿到字段名,對應的值,重新組裝一個适合oracle的SQL語句
//但是要注意date類型的資料,從beforecolumns或者aftercolumns中拿到的都是String,對于oracle插入的整數,//小數,字元串沒有影響,對date類型不可操作,具體解決辦法檢視insertFromOracle,是否還有其他資料類型需要做校對//還不清楚
    public static int deleteFromOracle(Connection conn, PreparedStatement ps, List<CanalEntry.Column> beforecolumns) {
        int i = 1;
        sql = "delete from " + from_db + "." + from_tab + " where  ";
        for (CanalEntry.Column column : beforecolumns) {
            try {
                date = Date.valueOf(column.getValue());
            } catch (IllegalArgumentException e) {
                sql = sql + column.getName() + "='" + column.getValue() + "' and ";
            }
            if (date != null) {
                sql = sql + column.getName() + "=? and ";
                dates.add(date);
                date = null;
            }
        }
        sql = sql.substring(0, sql.lastIndexOf("and"));
        System.err.println(sql);
        conn = OracleConn.getConnection();

        try {
            ps = conn.prepareStatement(sql);
            for (Date date : dates) {
                ps.setDate(i, date);
                i++;
            }
            ps.execute();
            System.out.println("sql送出成功");
            conn.commit();
            System.out.println("commit成功,執行狀态:success");
            deletestatus = 1;
        } catch (SQLException e) {
            e.printStackTrace();
            System.err.println("表或視圖不存在,請檢查設定");
        } finally {
            OracleConn.close(ps, conn);
            dates.clear();
        }

        return deletestatus;

    }

    public static int updateToOracle(Connection conn, PreparedStatement ps, List<CanalEntry.Column> aftercolumns, List<CanalEntry.Column> beforecolumns) {
        String sql = "update " + from_db + "." + from_tab + " set  ";
        int i = 1;
        for (CanalEntry.Column column : aftercolumns) {
            try {
                date = Date.valueOf(column.getValue());
            } catch (IllegalArgumentException e) {
                sql = sql + column.getName() + "='" + column.getValue() + "',";
            }
            if (date != null) {
                sql = sql + column.getName() + "=?,";
                dates.add(date);
                date = null;
            }
        }
        sql = sql.substring(0, sql.lastIndexOf(",")) + " where ";
        for (CanalEntry.Column column : beforecolumns) {
            try {
                date = Date.valueOf(column.getValue());
            } catch (IllegalArgumentException e) {
                sql = sql + column.getName() + "='" + column.getValue() + "' and ";
            }
            if (date != null) {
                sql = sql + column.getName() + "=? and ";
                dates.add(date);
                date = null;
            }
        }
        sql = sql.substring(0, sql.lastIndexOf("and"));
        System.err.println(sql);
        conn = OracleConn.getConnection();
        try {
            conn = OracleConn.getConnection();
            ps = conn.prepareStatement(sql);
            for (Date date : dates) {
                //把dates中的date取出,i=1,是以如果有date類型的資料就傳到sql,同時i自增,i的值與參數的順序就有序了
                ps.setDate(i, date);
                i++;
            }
            ps.execute();
            System.out.println("sql送出成功");
            conn.commit();
            System.out.println("commit成功,執行狀态:success");
            updatestatus = 1;
        } catch (SQLException e) {
            e.printStackTrace();
            System.err.println("表或視圖不存在,請檢查設定");
        } finally {
            OracleConn.close(ps, conn);
            //等把所有的dates中的date取出後清空dates,不然影響dates的資料會一直增加,傳參到SQL有誤
            dates.clear();
        }
        return updatestatus;
    }

    public static int insertToOracle(Connection conn, PreparedStatement ps, List<CanalEntry.Column> aftercolumns) {
        //ps的第一個參數,SQL語句的第一個?的索引值
        int i = 1;
        //拼接sql : 例如:insert into scott.userinfo (
        String sql = "insert into " + from_db + "." + from_tab + " (";
        //拼接sql:例如:insert into scott.userinfo (u_id,u_name,u_age,
        for (CanalEntry.Column column : aftercolumns) {
            sql = sql + column.getName() + ",";
        }
        //截取sql并拼接:例如 insert into scott.userinfo (u_id,u_name,u_age) values (
        sql = sql.substring(0, sql.lastIndexOf(",")) + ") values(";
        //拼接sql:例如insert into scott.userinfo (u_id,u_name,u_age) values (1,,張三',1,
        for (CanalEntry.Column column : aftercolumns) {
            try {
                //把String轉成sqlDate,如果不Date類型則下面語句不執行,執行catch内容
                date = Date.valueOf(column.getValue());
            } catch (IllegalArgumentException e) {
                sql = sql + "'" + column.getValue() + "'" + ",";
            }
            //如果沒有catch到異常,則拼接sql,把?拼接到sql中,同時把date放到list中
            if (date != null) {
                sql = sql + "?,";
                dates.add(date);
                date = null;
            }
        }
        //截取拼接sql:例如insert into scott.userinfo (u_id,u_name,u_age) values (1,,張三',1)
        sql = sql.substring(0, sql.lastIndexOf(",")) + ")";
        System.err.println(sql);

        try {
            conn = OracleConn.getConnection();
            ps = conn.prepareStatement(sql);
            for (Date date : dates) {
                //把dates中的date取出,i=1,是以如果有date類型的資料就傳到sql,同時i自增,i的值與參數的順序就有序了
                ps.setDate(i, date);
                i++;
            }
            ps.execute();
            System.out.println("sql送出成功");
            conn.commit();
            System.out.println("commit成功,執行狀态:success");
            insertstatus = 1;
        } catch (SQLException e) {
            e.printStackTrace();
            System.err.println("表或視圖不存在,請檢查設定");
        } finally {
            OracleConn.close(ps, conn);
            //等把所有的dates中的date取出後清空dates,不然影響dates的資料會一直增加,傳參到SQL有誤
            dates.clear();
        }

        return insertstatus;
    }

    public static int ddlAction(Connection conn, PreparedStatement ps, String ddl_sql, CanalEntry.EventType eventType) {
        switch (eventType) {
            case CREATE:
                sql = ddl_sql;
                break;
            case ERASE:
                //解析出來的binlog日志内的DROPsql:DROP TABLE `u` /* generated by server */
                //oracl無法直接使用該sql,是以通過元資訊中的資料庫名,和表名重組sql
                //是以需要保證mysql和oracle中同步的資料庫表名和資料名一緻。不一緻也可以,但是需要添加配置,在配置中設定
                //而且會有局限性。
                sql = "drop table " + from_db + "." + from_tab;

        }

        System.err.println(sql);
        try {
            conn = OracleConn.getConnection();
            ps = conn.prepareStatement(sql);
            ps.execute();
            System.out.println("sql送出成功");
            conn.commit();
            System.out.println("commit成功,執行狀态:success");
            ddlactionstatus = 1;
        } catch (SQLException e) {
            System.err.println("操作失敗");

        } finally {

            OracleConn.close(ps, conn);
        }
        return ddlactionstatus;
    }

}