天天看點

Mac Docker環境,利用Canal實作MySQL同步ES

作者:馬士兵教育CTO
Mac Docker環境,利用Canal實作MySQL同步ES

Canal的使用

使用docker環境安裝mysql、canal、elasticsearch,基于binlog利用canal實作mysql的資料同步到elasticsearch中,并在springboot進行整合。

  1. 安裝mysql并配置binlog
  2. 安裝elasticsearch
  3. 安裝Canal server
  4. 在springboot實作整合

所用環境版本類型:

  • MacOS Monterey
  • mysql 5.7.36
  • es 7.17.9
  • cannal.server: 1.1.5

1、安裝mysql

安裝指令

shell複制代碼// 拉取鏡像
docker pull mysql:5.7.36
// 建立容器
docker run --name mysql5.7.36 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36
// 進入容器
docker exec -it mysql5.7.36 /bin/bash
// 添加vim編輯
apt-get update
apt-get install vim
#注意 apt-get報錯,可換yum指令使用。
// 配置mysql
cd /etc/mysql/mysql.conf.d
vim mysqld.cnf  // 修改mysql配置
           

配置:與官網一緻

bash複制代碼[mysqld]
#binlog setting
log-bin=mysql-bin  // 開啟logbin
binlog-format=ROW  // binlog日志格式
server-id=1  // mysql主從備份serverId,canal中不能與此相同
           

資料庫建立一個canal賬号,并且設定slave,dump權限

bash複制代碼CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
           

因為mysql8.0.3後身份檢驗方式為caching_sha2_password,但canal使用的是mysql_native_password,是以需要設定檢驗方式(如果該版本之前的可跳過),否則會報錯IOException: caching_sha2_password Auth failed

bash複制代碼ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;
           

儲存退出,然後重新開機mysql容器

bash複制代碼docker restart mysql5.7.36
           

做完這一步可檢視前面是否有誤:

bash複制代碼// 進入容器
docker exec -it mysql5.7.36 /bin/bash
mysql -uroot -p
show master status  // binlog日志檔案
reset master; // 重新開機日志
show variables like 'binlog_format'; //檢視是否配置成功
           
Mac Docker環境,利用Canal實作MySQL同步ES

檢視日志檔案:

bash複制代碼cd /var/lib/mysql  // 進入日志檔案目錄
mysqlbinlog -vv mysql-bin.000001  // row格式檢視日志
           

mysql已經安裝成功了。

2、安裝es

略。

3、安裝canal-server

shell複制代碼docker pull canal/canal-server:v1.1.5
docker run --name canal1.1.5 -p 11111:11111  --link mysql5.7.36:mysql5.7.36 -id canal/canal-server:v1.1.5
           

修改對應的配置:

bash複制代碼docker exec -it canal1.1.5 /bin/bash
cd canal-server/conf/example/
vi instance.properties  // 修改配置

# 把0改成10,隻要不和mysql的id相同就行
canal.instance.mysql.slaveId=10
# 修改成mysql對應的賬号密碼,mysql5.7.36就是mysql鏡像的連結别名
canal.instance.master.address=mysql5.7.36:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
           
properties複制代碼#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=10

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=mysql5.7.36:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
           

也可精确點對點同步(不是很複雜預設就好,無需配置下面内容)

properties複制代碼# 開始同步的binlog日志檔案,注意這裡的binlog檔案名以你自己查出來的為準
canal.instance.master.journal.name=mysql-bin.000001
# 開始同步的binlog檔案位置
canal.instance.master.position=0
# 開始同步時間點 時間戳形式
canal.instance.master.timestamp=1546272000
# 配置不同步的mysql庫
canal.instance.filter.black.regex=mysql\..*

mysql資料同步起點說明:
canal.instance.master.journal.name + canal.instance.master.position : 精确指定一個binlog位點,進行啟動
canal.instance.master.timestamp : 指定一個時間戳,canal會自動周遊mysql binlog,找到對應時間戳的binlog位點後,進行啟動不指定任何資訊:預設從目前資料庫的位點,進行啟動。(show master status)
           

instance.properties檔案的配置詳情可通路github.com/alibaba/can…

檢視配置是否成功:

bash複制代碼#首先重新開機一下canal
docker restart  canal1.1.5

docker exec -it canal1.1.5 /bin/bash
cd canal-server/logs/example/
tail -100f example.log  // 檢視日志
           
Mac Docker環境,利用Canal實作MySQL同步ES

如以上狀況,則說明mysql連接配接canal-server成功,此時mysql中的資料變化,都會在canal中有同步。

可以通過Java程式測試有沒連接配接上mysql:

xml複制代碼<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>
           
java複制代碼
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 建立連結
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 擷取指定數量的資料
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 送出确認
                // connector.rollback(batchId); // 處理失敗, 復原資料
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

           

在mysql的可視化工具進行資料更新後idea控制台就會有資料的顯示。

4、整合springboot

自定義用戶端client

1、建立一個springboot項目

引入依賴spring-boot-starter-data-elasticsearch、canal-spring-boot-starter

xml複制代碼<!--  Canal用戶端服務   -->
<dependency>
  <groupId>top.javatool</groupId>
  <artifactId>canal-spring-boot-starter</artifactId>
  <version>1.2.1-RELEASE</version>
</dependency>
<!-- elasticsearch-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
           

2、修改配置檔案application.yml

yml複制代碼# 應用名稱
spring:
	# es配置
  elasticsearch:
    uris: http://localhost:9200
    username: root
    password: 123456
  # 資料庫配置
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8
    username: root
    password: 123456
    
# canal服務端位址
canal:
  destination: example # canal的叢集名字,要與安裝canal時設定的名稱一緻
  server: 127.0.0.1:11111 # canal服務位址
  
# 設定canal消息日志列印級别
logging:
  level:
    top.javatool.canal.client: warn

           

3、建立實體類為下面資料同步做準備。

建立mysql表實體類

java複制代碼import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;

/**
 * 文章
 *
 */
@TableName(value = "post")
@Data
public class Post implements Serializable {

    /**
     * id
     */
    @TableId(type = IdType.ASSIGN_ID)
    private Long id;

    /**
     * 标題
     */
    private String title;

    /**
     * 内容
     */
    private String content;

    /**
     * 标簽清單 json
     */
    private String tags;

    /**
     * 點贊數
     */
    private Integer thumbNum;

    /**
     * 收藏數
     */
    private Integer favourNum;

    /**
     * 建立使用者 id
     */
    private Long userId;

    /**
     * 建立時間
     */
    private Date createTime;

    /**
     * 更新時間
     */
    private Date updateTime;

    /**
     * 是否删除
     */
    @TableLogic
    private Integer isDelete;

    @TableField(exist = false)
    private static final long serialVersionUID = 1L;
}
           

建立es實體類

java複制代碼import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.song.search.model.entity.Post;
import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
import java.util.Date;
import java.util.List;

/**
 * 文章 ES 包裝類
 *
 *
 **/
@Document(indexName = "post_v1")
@Data
public class PostEsDTO implements Serializable {

    private static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";

    /**
     * id
     */
    @Id
    private Long id;

    /**
     * 标題
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String title;

    /**
     * 内容
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String content;

    /**
     * 标簽清單
     */
    @Field(type = FieldType.Keyword)
    private List<String> tags;

    /**
     * 建立使用者 id
     */
    @Field(type = FieldType.Keyword)
    private Long userId;
    /**
     * 建立時間
     */
    @Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)
    private Date createTime;

    /**
     * 更新時間
     */
    @Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)
    private Date updateTime;

    /**
     * 是否删除
     */
    @Field(type = FieldType.Keyword)
    private Integer isDelete;

    private static final long serialVersionUID = 1L;

    private static final Gson GSON = new Gson();

    /**
     * 對象轉包裝類
     *
     * @param post
     * @return
     */
    public static PostEsDTO objToDto(Post post) {
        if (post == null) {
            return null;
        }
        PostEsDTO postEsDTO = new PostEsDTO();
        BeanUtils.copyProperties(post, postEsDTO);
        String tagsStr = post.getTags();
        if (StringUtils.isNotBlank(tagsStr)) {
            postEsDTO.setTags(GSON.fromJson(tagsStr, new TypeToken<List<String>>() {
            }.getType()));
        }
        return postEsDTO;
    }

    /**
     * 包裝類轉對象
     *
     * @param postEsDTO
     * @return
     */
    public static Post dtoToObj(PostEsDTO postEsDTO) {
        if (postEsDTO == null) {
            return null;
        }
        Post post = new Post();
        BeanUtils.copyProperties(postEsDTO, post);
        List<String> tagList = postEsDTO.getTags();
        if (CollectionUtils.isNotEmpty(tagList)) {
            post.setTags(GSON.toJson(tagList));
        }
        return post;
    }
}

           

4、接下來我們基于canal-client提供的EntryHandler類來實作對于資料表的監控,進而達到資料的增删改同步。

java複制代碼import com.song.search.esdao.PostEsDao;
import com.song.search.model.dto.post.PostEsDTO;
import com.song.search.model.entity.Post;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import javax.annotation.Resource;

@CanalTable("post") // mysql資料庫表明
@Component
@Slf4j
public class CanalSyncPostToEs implements EntryHandler<Post> {

    @Resource
    private PostEsDao postEsDao;

    /**
     * mysql中資料有新增時自動執行
     *
     * @param hotel 新增的資料
     */
    @Override
    public void insert(Post hotel) {

        PostEsDTO postEsDTO = PostEsDTO.objToDto(hotel);
        //把新增資料hotel,添加到ES即可
        postEsDao.save(postEsDTO);

    }

    /**
     * mysql中資料有修改時自動執行
     *
     * @param before 修改前的資料
     * @param after  修改後的資料
     */
    @Override
    public void update(Post before, Post after) {

        //把修改資料,更新到ES即可
        PostEsDTO postEsDTO = PostEsDTO.objToDto(after);
        //把新增資料hotel,添加到ES即可
        postEsDao.save(postEsDTO);

    }

    /**
     * ysql中資料有删除時自動執行
     *
     * @param hotel 要删除的資料
     */
    @Override
    public void delete(Post hotel) {

        //把要删除的資料hotel,從ES删除即可
        postEsDao.deleteById(hotel.getId());
    }
}
           

補充:es的dao操作/與mybatis操作資料庫相同

java複制代碼/**
 * 文章 ES 操作
 *
 *
 */
public interface PostEsDao extends ElasticsearchRepository<PostEsDTO, Long> {

}
           

最後測試一下資料庫的增删改。

總結

自此利用canal實作mysql的資料同步到elasticsearch就示範完成了,如果有更加複雜的同步邏輯,也可以在代碼中自定義實作,并且第三方元件canal-spring-boot-starter極大的簡化了自定義canal用戶端的難度。

遺憾的是canal-spring-boot-starter的作者目前已經停止了對其的維護,其最新版對應的canal實際是1.1.3版本的,不過實測還不影響我們對接canal1.1.5。對canal用戶端又更高性能的需求,可以研究源碼,高度二開。