1. 實驗環境
CPU:4
記憶體:8G
ip:192.168.0.187
開啟iptables防火牆
關閉selinux
java >=1.5
使用yum方式安裝的java,提前配置好JAVA_HOME環境變量
vim /etc/profile.d/java.sh
#!/bin/bash
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk # 路徑根據實際情況而定
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile.d/java.sh
2. MySQL資訊
mysql賬号
root
MySQL密碼
liykpntuu9?C
操作
vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複
service mysqld restart
登陸資料庫後操作
CREATE USER canal IDENTIFIED BY 'canal!%123AD';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
3. canal操作
# 下載下傳
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal
# 修改連接配接資料庫的配置檔案
cd /usr/local/canal
vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 123
#position info,需要改成自己的資料庫資訊
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的資料庫資訊
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal!%123AD
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
# 啟動
bash bin/startup.sh
# 檢視 server 日志
tail -n 30 logs/canal/canal.log
2019-09-20 09:48:46.987 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-09-20 09:48:47.019 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-09-20 09:48:47.028 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2019-09-20 09:48:47.059 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.187(192.168.0.187):11111]
2019-09-20 09:48:48.228 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
# 檢視 instance 的日志
2019-09-20 09:48:47.395 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-09-20 09:48:47.399 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-09-20 09:48:47.580 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2019-09-20 09:48:47.626 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-09-20 09:48:47.626 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-09-20 09:48:48.140 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2019-09-20 09:48:48.147 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2019-09-20 09:48:48.147 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2019-09-20 09:48:48.165 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2019-09-20 09:48:49.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1568943354000] cost : 989ms , the next step is binlog dump
# 關閉
bash bin/stop.sh
# 端口使用情況
ss -tlnp
State Recv-Q Send-Q Local Address:Port Peer Address:Port
LISTEN 0 50 *:11110 *:* users:(("java",pid=2078,fd=109))
LISTEN 0 50 *:11111 *:* users:(("java",pid=2078,fd=105))
LISTEN 0 3 *:11112 *:* users:(("java",pid=2078,fd=87))
# 端口号說明
# admin端口:11110
# tcp端口:11111
# metric端口:11112
# canal-admin 使用WEB UI界面檢視管理canal
# canal-admin的限定依賴:
# MySQL,用于存儲配置和節點等相關資料
# canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動态運維管理接口)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
tar -zxv -f canal-1.1.4/canal.admin-1.1.4.tar.gz -C /usr/local/src/canal_admin
vim conf/application.yml
server:
port: 8089 # 端口号,防火牆放行該端口号
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306 # 資料庫位址和端口
database: canal_manager # 資料庫名
username: canal_admin # 資料庫賬号 ,注意跟一開始建立的canal賬号區分開,需要修改一下
password: ABC123,.abc@#11 # 資料庫密碼
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin # 平台賬号
adminPasswd: admin # 平台密碼
# 注意,資料庫名,賬号和密碼需要提前建立好
# 若修改預設的資料庫名,則示例sql檔案中也需要修改
# 這裡隻修改預設的資料庫賬号和密碼,其餘保持預設
# 初始化中繼資料庫
# 初始化SQL腳本裡會預設建立canal_manager的資料庫,建議使用root等有超級權限的賬号進行初始化 b. canal_manager.sql預設會在conf目錄下
mysql -hlocalhost -uroot -p
mysql> source /usr/local/canal_admin/conf/canal_manager.sql;
# 啟動
bash bin/startup.sh
# 檢視 admin 日志
tail -n 30 logs/admin.log
2019-09-20 14:50:54.595 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2019-09-20 14:50:54.624 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2019-09-20 14:50:54.812 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2019-09-20 14:50:54.818 [main] INFO com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 11.057 seconds (JVM running for 12.731)
# 浏覽器通路,防火牆放行8089端口号
# 位址:http://192.168.0.187:8089/ 通路,預設密碼:admin/123456
# 使用
# 建立一個叢集,添加已有的canal
# 因為端口的問題,暫時隻能添加一個
# 另外canal是否可以元件叢集,還有待研究
# 停止
bash bin/stop.sh
4. zookeeper
# 設定zookeeper叢集
cd /usr/local/src
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local
cd /usr/local/apache-zookeeper-3.5.5-bin
mkdir -p /zkdata/{zookeeper-1,zookeeper-2,zookeeper-3}
cp conf/zoo_sample.cfg conf/zoo-1.cfg
# vim conf/zoo-1.cfg
dataDir=/zkdata/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
cp conf/zoo-1.cfg conf/zoo-2.cfg
cp conf/zoo-1.cfg conf/zoo-3.cfg
vim conf/zoo-2.cfg
dataDir=/zkdata/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
vim conf/zoo-3.cfg
dataDir=/zkdata/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
echo '1' > /zkdata/zookeeper-1/myid
echo '2' > /zkdata/zookeeper-2/myid
echo '3' > /zkdata/zookeeper-3/myid
# 修改啟動檔案,避免後續出現如下錯誤
# stat is not executed because it is not in the whitelist.
# envi is not executed because it is not in the whitelist.
# nc指令需要安裝其他軟體
yum install nmap-ncat
# envi指令執行報錯提示:envi is not executed because it is not in the whitelist.
# 解決辦法 修改啟動指令 zkServer.sh ,往裡面添加 :ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" # 注意找到這個資訊
fi
# 如果不想添加在這裡,注意位置和指派的順序
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
# 然後重新開機zookeeper
# 叢集啟動腳本
vim start.sh
bash bin/zkServer.sh start conf/zoo-1.cfg
bash bin/zkServer.sh start conf/zoo-2.cfg
bash bin/zkServer.sh start conf/zoo-3.cfg
# 叢集關閉腳本
vim start.sh
bash bin/zkServer.sh stop conf/zoo-1.cfg
bash bin/zkServer.sh stop conf/zoo-2.cfg
bash bin/zkServer.sh stop conf/zoo-3.cfg
# 檢測叢集狀态
[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-1.cfg
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: conf/zoo-1.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-2.cfg
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: conf/zoo-2.cfg
Client port found: 2182. Client address: localhost.
Mode: leader
[root@bogon apache-zookeeper-3.5.5-bin]# bash bin/zkServer.sh status conf/zoo-3.cfg
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: conf/zoo-3.cfg
Client port found: 2183. Client address: localhost.
Mode: follower
# 使用WEB UI檢視監控叢集-zk ui安裝
cd /usr/local
git clone https://github.com/DeemOpen/zkui.git
yum install -y maven
# 更換使用阿裡雲maven源
vim /etc/maven/settings.xml
<mirrors>
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
</mirrors>
cd zkui/
mvn clean install
# 修改配置檔案預設值
vim config.cfg
serverPort=9090 #指定端口
zkServer=localhost:2181,localhost:2182,localhost:2183 # 不使用127.0.0.1
sessionTimeout=300
# userSet中是登陸web界面的使用者名和密碼
#管理者
#admin:manager
#使用者
#appconfig:appconfig
# 啟動程式至背景
vim start.sh
#!/bin/bash
nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
# 浏覽器通路
# 防火牆放行9090端口,後期改用nginx代理
http://192.168.0.187:9090/
5. Kafka
# kafka叢集,僞叢集
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -zxv -f kafka_2.12-2.3.0.tgz -C /usr/local/
cd /usr/local/kafka_2.12-2.3.0/config
mkdir -p /kafkadata/{kafka-1,kafka-2,kafka-3}
cp server.properties server-1.properties
vim server-1.properties
broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/kafkadata/kafka-1
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
cp server-1.properties server-2.properties
vim server-2.properties
broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://:9093
log.dirs=/kafkadata/kafka-2
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
cp server-1.properties server-3.properties
vim server-3.properties
broker.id=3
delete.topic.enable=true
listeners=PLAINTEXT://:9094
log.dirs=/kafkadata/kafka-3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# 啟動叢集
vim start.sh
#!/bin/bash
bash bin/kafka-server-start.sh -daemon config/server-1.properties
bash bin/kafka-server-start.sh -daemon config/server-2.properties
bash bin/kafka-server-start.sh -daemon config/server-3.properties
# 停止叢集
vim stop.sh
#!/bin/bash
bash bin/kafka-server-stop.sh -daemon config/server-1.properties
bash bin/kafka-server-stop.sh -daemon config/server-2.properties
bash bin/kafka-server-stop.sh -daemon config/server-3.properties
# 監控kafka叢集
# 有一個問題,需要在kafka-server-start.sh檔案中配置端口,有如下三種辦法
# 第一種:複制并修改kafka目錄,比如kafka-1,kafka-2,kafka-3,然後再每個目錄下修改kafka-server-start.sh檔案
# 第二種:在啟動腳本start.sh中添加指定端口
# 第三種:多複制幾個kafka-server-start.sh檔案,然後進行修改,最後在start.sh中修改一下
# 以下三種方法任選其一即可
# 第一種方式辦法,相應行修改成如下形式,注意端口号不同
# 使用的是不同目錄下的不同kafka-server-start.sh檔案
# start.sh檔案也需要做相應的修改
# kafka-1/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9997"
fi
# kafka-2/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9998"
fi
# kafka-3/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi
# start.sh
#!/bin/bash
bash kafka-1/bin/kafka-server-start.sh -daemon config/server-1.properties
bash kafka-2/bin/kafka-server-start.sh -daemon config/server-2.properties
bash kafka-3/bin/kafka-server-start.sh -daemon config/server-3.properties
# 第二種方法
# 使用的用一個目錄下的同一個檔案,隻是在每個指令前指定端口号
vim start.sh
#!/bin/bash
JMX_PORT=9997 bash bin/kafka-server-start.sh -daemon config/server-1.properties
JMX_PORT=9998 bash bin/kafka-server-start.sh -daemon config/server-2.properties
JMX_PORT=9999 bash bin/kafka-server-start.sh -daemon config/server-3.properties
# 第三種方法
# 使用的是同一個目錄下的不同kafka-server-start檔案
# start.sh檔案也需要做相應的修改
cp kafka-server-start.sh kafka-server-start-1.sh
cp kafka-server-start.sh kafka-server-start-2.sh
cp kafka-server-start.sh kafka-server-start-3.sh
vim kafka-server-start-1.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9997"
fi
vim kafka-server-start-2.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9998"
fi
vim kafka-server-start-3.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi
vim start.sh
#!/bin/bash
bash bin/kafka-server-start-1.sh -daemon config/server-1.properties
bash bin/kafka-server-start-2.sh -daemon config/server-2.properties
bash bin/kafka-server-start-3.sh -daemon config/server-3.properties
cd /usr/local/src
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz
# 多次解壓縮後得到kafka-eagle-web-1.3.9目錄,然後把該目錄複制到/usr/local目錄下
cd /usr/local/kafka-eagle-web-1.3.9/conf
vim system-config.properties
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181,localhost:2182,localhost:2183
kafka.eagle.metrics.charts=true
# 其餘保持預設,資料庫使用sqlite,注意路徑需要事先建立好或修改成目前目錄
# 資料庫也可以更換成MySQL
kafka.eagle.url=jdbc:sqlite:/usr/local/kafka-eagle-web-1.3.9/db/ke.db
# 注意
# kafka.eagle.zk.cluster.alias的值需要跟下面的這個cluster1.zk.list小數點第一個保持一緻,比如都是cluster1,否則擷取不到資料
# 添加環境變量
vim /etc/profile.d/kafka_eagle.sh
#!/bin/bash
export KE_HOME=/usr/local/kafka-eagle-web-1.3.9
export PATH=$PATH:$KE_HOME/bin
source /etc/profile.d/kafka_eagle.sh
# 指令相關
bash bin/ke.sh start|stop|status|stats|restart
# 啟動
bash bin/ke.sh start
*******************************************************************
* Kafka Eagle system monitor port successful...
*******************************************************************
[2019-09-20 12:10:32] INFO: Status Code[0]
[2019-09-20 12:10:32] INFO: [Job done!]
Welcome to
__ __ ___ ____ __ __ ___ ______ ___ ______ __ ______
/ //_/ / | / __/ / //_/ / | / ____/ / | / ____/ / / / ____/
/ ,< / /| | / /_ / ,< / /| | / __/ / /| | / / __ / / / __/
/ /| | / ___ | / __/ / /| | / ___ | / /___ / ___ |/ /_/ / / /___ / /___
/_/ |_| /_/ |_|/_/ /_/ |_| /_/ |_| /_____/ /_/ |_|\____/ /_____//_____/
Version 1.3.9
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
# 浏覽器通路,防火牆放行該端口,後期改用Nginx代理
位址:192.168.0.187:8048/ke
賬号:admin,密碼:123456
6. 投遞資料到Kafka
# 先進行canal配置,改動配置檔案canal.properties
# serverMode改為kafka
vim conf/canal.properties
canal.serverMode = kafka
canal.mq.servers = localhost:9092,localhost:9093,localhost:9094
vim conf/example/instance.propties
# mq config
canal.mq.topic=canal_manager # 填寫資料庫庫名,表示這個資料庫的所有表的操作都在這個topic下
# dynamic topic route by schema or table regex
# canal.mq.dynamicTopic=.*\\..*
canal.mq.partition=0
# hash partition config
# canal.mq.partitionsNum=10
# canal.mq.partitionHash=.*\\..*
# 以上具體規則詳看官方文檔
# kafka開啟消息隊列的自動建立topic模式,相關配置在kafka的server.properties
echo 'auto.create.topics.enable=true' >> config/server-1.properties
echo 'auto.create.topics.enable=true' >> config/server-2.properties
echo 'auto.create.topics.enable=true' >> config/server-3.properties
# 相關改動完成後重新開機canal和kafka
# 使用canal_admin平台檢視canal的狀态
# Server管理,操作,日志
# 使用zu ui平台檢視kafka的topic情況
# 左側導航Topic-List檢視生成的topic,這裡顯示的是canal_manager,上面設定的那個資料庫庫名
# 點開Topic Name可以檢視具體的資料個數
# 使用指令行kafka-console-consumer.sh --topic canal_manager --bootstrap-server localhost:9092 --from-beginning檢視canal傳遞給kafka的資料
# 插入一條資料
{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972329000,"id":10,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972329456,"type":"INSERT"}
# 删除一條資料
{"data":[{"id":"13","username":"13","password":"6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9","name":"Canal Manager","roles":"admin","introduction":null,"avatar":null,"creation_date":"2019-07-14 00:05:28"}],"database":"canal_manager","es":1568972368000,"id":11,"isDdl":false,"mysqlType":{"id":"bigint(20)","username":"varchar(31)","password":"varchar(128)","name":"varchar(31)","roles":"varchar(31)","introduction":"varchar(255)","avatar":"varchar(255)","creation_date":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"username":12,"password":12,"name":12,"roles":12,"introduction":12,"avatar":12,"creation_date":93},"table":"canal_user","ts":1568972369005,"type":"DELETE"}