天天看點

使用canal增量同步mysql資料庫資訊到ElasticSearch

本文介紹如何使用canal增量同步mysql資料庫資訊到ElasticSearch。(注意:是增量!!!)

1.簡介

1.1 canal介紹

Canal是一個基于MySQL二進制日志的高性能資料同步系統。Canal廣泛用于阿裡巴巴集團(包括

https://www.taobao.com

),以提供可靠的低延遲增量資料管道,github位址:

https://github.com/alibaba/canal

Canal Server能夠解析MySQL binlog并訂閱資料更改,而Canal Client可以實作将更改廣播到任何地方,例如資料庫和Apache Kafka。

它具有以下功能:

  1. 支援所有平台。
  2. 支援由Prometheus提供支援的細粒度系統監控。
  3. 支援通過不同方式解析和訂閱MySQL binlog,例如通過GTID。
  4. 支援高性能,實時資料同步。(詳見Performance)
  5. Canal Server和Canal Client都支援HA / Scalability,由Apache ZooKeeper提供支援
  6. Docker支援。

缺點:

不支援全量更新,隻支援增量更新。

完整wiki位址:

https://github.com/alibaba/canal/wiki

1.2 運作原理

原理很簡單:

  1. Canal模拟MySQL的slave的互動協定,僞裝成mysql slave,并将轉發協定發送到MySQL Master伺服器。
  2. MySQL Master接收到轉儲請求并開始将二進制日志推送到slave(即canal)。
  3. Canal将二進制日志對象解析為自己的資料類型(原始位元組流)

如圖所示:

使用canal增量同步mysql資料庫資訊到ElasticSearch

1.3 同步es

在同步資料到es的時候需要使用擴充卡:canal adapter。目前最新版本1.1.3,下載下傳位址:

https://github.com/alibaba/canal/releases

目前es貌似支援6.x版本,不支援7.x版本!!!

2.準備工作

2.1 es和jdk

安裝es可以參考:

https://www.dalaoyang.cn/article/78

安裝jdk可以參考:

https://www.dalaoyang.cn/article/16

2.2 安裝canal server

下載下傳canal.deployer-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz           

解壓檔案

tar -zxvf canal.deployer-1.1.3.tar.gz           

進入解壓後的檔案夾

cd canal.deployer-1.1.3           

修改conf/example/instance.properties檔案,主要注意以下幾處:

  • canal.instance.master.address:資料庫位址,例如127.0.0.1:3306
  • canal.instance.dbUsername:資料庫使用者
  • canal.instance.dbPassword:資料庫密碼

完整内容如下:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
           

回到canal.deployer-1.1.3目錄下,啟動canal:

sh bin/startup.sh           

檢視日志:

vi logs/canal/canal.log           

檢視具體instance日志:

vi logs/example/example.log           

關閉指令

sh bin/stop.sh           

2.3 安裝canal-adapter

下載下傳canal.adapter-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz           

解壓

tar -zxvf canal.adapter-1.1.3.tar.gz           
cd canal.adapter-1.1.3           

修改conf/application.yml檔案,主要注意如下内容,由于是yml檔案,注意我這裡說明的屬性名稱:

  • server.port:canal-adapter端口号
  • canal.conf.canalServerHost:canal-server位址和ip
  • canal.conf.srcDataSources.defaultDS.url:資料庫位址
  • canal.conf.srcDataSources.defaultDS.username:資料庫使用者名
  • canal.conf.srcDataSources.defaultDS.password:資料庫密碼
  • canal.conf.canalAdapters.groups.outerAdapters.hosts:es主機位址,tcp端口
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  mode: tcp
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 12345678
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 127.0.0.1:9300
        properties:
         cluster.name: elasticsearch           

另外需要配置conf/es/*.yml檔案,adapter将會自動加載conf / es下的所有.yml結尾的配置檔案。在介紹配置前,需要先介紹一下本案例使用的表結構,如下:

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(200) NOT NULL,
  `address` varchar(1000) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;           

需要手動去es中建立索引,比如這裡使用es-head建立,如下圖:

使用canal增量同步mysql資料庫資訊到ElasticSearch

test索引結構如下:

{
    "mappings":{
        "_doc":{
            "properties":{
                "name":{
                    "type":"text"
                },
                "address":{
                    "type":"text"
                }
            }
        }
    }
}           

接下來建立test.yml(檔案名随意),内容很好了解_index為索引名稱,sql為對應語句,内容如下:

dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
  _index: test
  _type: _doc
  _id: _id
  upsert: true
  sql: "select a.id as _id,a.name,a.address from test a"
  commitBatch: 3000           

配置完成後,回到canal-adapter根目錄,執行指令啟動

bin/startup.sh           

檢視日志

vi logs/adapter/adapter.log           

關閉canal-adapter指令

bin/stop.sh           

3.測試

都啟動成功後,先檢視一下es-head,如圖,現在是沒有任何資料的。

使用canal增量同步mysql資料庫資訊到ElasticSearch

接下來,我們在資料庫中插入一條資料進行測試,語句如下:

INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, '北京', '北京市朝陽區');           

然後在看一下es-head,如下

使用canal增量同步mysql資料庫資訊到ElasticSearch

接下來看一下日志,如下:

2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":7,"name":"北京","address":"北京市朝陽區"}],"database":"test","destination":"example","es":1561197255000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1561197255384,"type":"INSERT"} 
Affected indexes: test            

小知識點:上面介紹的檢視日志的方法可能不是很好用,推薦使用如下文法,比如檢視日志最後200行:

tail -200f logs/adapter/adapter.log           

4.總結

1.全量更新不能實作,但是增删改都是可以的。

2.一定要提前建立好索引。

3.es配置的是tcp端口,比如預設的9300