天天看点

es sync之 adapter 使用启动器数据源适配器

Adapter 分为两部分,一个是启动器,一个是适配器,启动器为 Springboot 项目,适配器以 fat jar 的形式将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载

启动器

配置内容

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka # tcp # kafka rocketMQ
  # canalServerHost: 127.0.0.1:11111
  zookeeperHosts: 120.78.200.102:2181
  mqServers: 120.78.200.102:9092 #or rocketmq
  #  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    zyloopsDS:
      url: jdbc:mysql://120.78.200.102:3306/x1?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
    - instance: zyloops # canal instance Name or mq topic name
    # - topic: zyloops
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es
              hosts: 192.168.71.246:9300
              properties:
                cluster.name: ebuy-cloud-cluster
                searchguard.ssl.transport:
                  pemkey_filepath: D:\projects\canal\client-adapter\launcher\target\canal-adapter\conf\ssl\spock.key
                  pemcert_filepath: D:\projects\canal\client-adapter\launcher\target\canal-adapter\conf\ssl\spock.pem
                  pemtrustedcas_filepath: D:\projects\canal\client-adapter\launcher\target\canal-adapter\conf\ssl\root-ca.pem
                  pemkey_password: 3QgfFoYd8Ken
           

连接 CANAL

如果 CANAL 是单例模式,则配置

canalServerHost: 127.0.0.1:11111
           

HA 模式

zookeeperHosts: 120.78.200.102:2181,120.78.200.102:2182,120.78.200.102:2183
           

TCP

canal.conf.mode: tcp
           

KAFKA 及 RabbitMQ

mqServers: 120.78.200.102:9092 # 这里需要配置单机地址
           

数据源

srcDataSources:
  zyloopsDS:
    url: jdbc:mysql://120.78.200.102:3306/x1?useUnicode=true
    username: root
    password: 123456
           

适配器

canalAdapters:
  - instance: zyloops # canal instance Name or mq topic name
  # - topic: zyloops
    groups:
      - groupId: g1
        outerAdapters:
          - name: logger
          - name: es
            hosts: 192.168.71.246:9300
            properties:
              cluster.name: ebuy-cloud-cluster
              searchguard.ssl.transport:
                pemkey_filepath: D:\projects\canal\client-adapter\launcher\target\canal-adapter\conf\ssl\spock.key
                pemcert_filepath: D:\projects\canal\client-adapter\launcher\target\canal-adapter\conf\ssl\spock.pem
                pemtrustedcas_filepath: D:\projects\canal\client-adapter\launcher\target\canal-adapter\conf\ssl\root-ca.pem
                pemkey_password: 3QgfFoYd8Ken
           

 instance 与 canal 实例名称对应

groupId 为分组 ID,组间适配器并行,组内串行,避免使用组内适配器

-name 指定适配器类型,此配置中加载 logger,es 适配器

hosts 为 ES 地址及端口

properties 为 特定配置,此处为连接 ES 的相关配置

ES 适配器

dataSourceKey: zyloopsDS
destination: zyloops
groupId: g1
esMapping:
  _index: zyloops_user
  _type: _doc
  _id: _id
  upsert: true
  sql: "select a.id as _id, a.name as _name, a.age as _age, b.name as _clzName, c.score as _score from t_user a left join t_clz b on a.id = b.user_id left join t_score c on a.id = c.user_id"
  commitBatch: 3000 
           

具体限制可参考:https://github.com/alibaba/canal/tree/master/client-adapter#521%E5%8D%95%E8%A1%A8%E6%98%A0%E5%B0%84%E7%B4%A2%E5%BC%95%E7%A4%BA%E4%BE%8Bsql

继续阅读