天天看點

springboot整合canal(超詳細)

一、canal介紹

binlog是mysql的二進制日志,對于操作資料庫的語句,都以此形式儲存。Canal是阿裡MySQL資料庫Binlog的增量訂閱&消費元件 。基于資料庫Binlog可以監控資料庫資料的變化進而用于資料同步等業務。

二、服務端部署

服務端連結: https://github.com/alibaba/canal/releases

解壓zip,目錄如下:

springboot整合canal(超詳細)

conf -> example -> instance.properties

日志檔案名稱和記錄位置,如下圖所示,修改資料庫連接配接位址、日志檔案、連接配接的使用者名和密碼

show master status

(資料庫查詢日志檔案指令)

(詳細配置可以從官網檢視,僅記錄使用步驟)

springboot整合canal(超詳細)

三、用戶端使用

1、POM檔案

<!--canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.3</version>
        </dependency>
           

2、連接配接配置

canal-monitor-mysql:
  hostname: localhost
  port: 11111
  tableName: fas4.0
           

具體的資料庫資料變化 業務實作方面需要 自己手動去實作,僅展示自己使用的部分。

需要注意: 如果是多個用戶端同時使用,要注意:多個用戶端會出現某個用戶端 把消息全部消費,而别的用戶端沒有消息消費的情況,這裡需要特别注意

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.haiot.service.CorpsUploadService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.protocol.Message;

/**
 * @program: fas-haiot-interface
 * @description: 市級平台資料上傳接口相關實作
 * @author: liuAnmin
 * @create: 2021-03-22 15:52
 **/

@Component
@Slf4j
public class CanalUtil {

    @Resource
    CorpsUploadService corpsUploadService;

    @Value("${canal-monitor-mysql.hostname}")
    String canalMonitorHost;

    @Value("${canal-monitor-mysql.port}")
    Integer canalMonitorPort;

    @Value("${canal-monitor-mysql.tableName}")
    String canalMonitorTableName;

    private final static int BATCH_SIZE = 10000;

    /**
     * 啟動服務
     */
    @Async("TaskPool")
    public void startMonitorSQL() {
        while (true) {
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");
            try {
                //打開連接配接
                connector.connect();
                log.info("資料庫檢測連接配接成功!" + canalMonitorTableName);
                //訂閱資料庫表,全部表q
                connector.subscribe(canalMonitorTableName + "\\..*");
                //復原到未進行ack的地方,下次fetch的時候,可以從最後一個沒有ack的地方開始拿
                connector.rollback();
                while (true) {
                    // 擷取指定數量的資料
                    Message message = connector.getWithoutAck(BATCH_SIZE);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                    } else {
                        handleDATAChange(message.getEntries());
                    }
                    // 送出确認
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.error("成功斷開監測連接配接!嘗試重連");
            } finally {
                connector.disconnect();
                //防止頻繁通路資料庫連結: 線程睡眠 10秒
                try {
                    Thread.sleep(10 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 列印canal server解析binlog獲得的實體類資訊
     */
    private void handleDATAChange(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            //RowChange對象,包含了一行資料變化的所有特征
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            log.info("Canal監測到更新:【{}】", entry.getHeader().getTableName());
            switch (eventType) {
                /**
                 * 删除操作
                 */
                case DELETE:
                    corpsUploadService.DeleteOperateToCityInterface(rowChage, entry);
                    break;
                /**
                 * 添加操作
                 */
                case INSERT:
                    corpsUploadService.InsertOperateToCityInterface(rowChage, entry);
                    break;
                /**
                 * 更新操作
                 */
                case UPDATE:
                    corpsUploadService.UpdateOperateToCityInterface(rowChage, entry);
                    break;
                default:
                    break;
            }

        }
    }
}