本人相關文章
docke 安裝canal-admin ui管理canal-server(系列一)
canal-admin投遞binglog到kafka,多程序消費kafka同步資料變更(系列二)
相關資料
- rdkafka官方詳細參數配置文檔
- rdkafka擴充所有類和方法詳細結構和參數文檔
- canal投遞消息到mq中順序性問題
canal資訊新增上直接投遞到mq的配置
- 原來的資料庫配置不變 可以登陸管理位址直接檢視配置資訊
# 表過濾正則
canal.instance.filter.regex = test\\.test.*
# 投遞到單個topic
canal.mq.topic = test_binlog_icr
#這裡是多個動态topic配置,暫時不用這麼多topic
#canal.mq.dynamicTopic = topic:test\\.qa_.*
canal.mq.partition=0
# hash6個分區
canal.mq.partitionsNum = 6
# 根據主鍵散裂分區 可保證資料熱點問題不會出現,并且保證同一個主鍵下變更記錄在同一個分區,而且我們隻是做觸發記錄不是直接根據值做處理根據表散列就會出現有的分區很多資料 有的表沒啥通路
canal.mq.partitionHash = .*\\..*:$pk$
- 關于分區mq配置 可參考官方文檔:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart#mq%E9%A1%BA%E5%BA%8F%E6%80%A7%E9%97%AE%E9%A2%98
topic 和 group.id 使用
- topic 使用 test_binlog_icr 相關的mysql binlog 全部投遞到這個主題 (本主題可以被多個不同的消費組去消費)
- 消費組 search_incr 檢索同步消費組 目前訂閱了 test_binlog_icr 這一個主題,後面增加其他檢索主題也可以繼續訂閱
kafka同步
- test_binlog_icr 設定6分區 可開啟6消費程序同步消費 看消費延遲上是否會縮短,還是會增長,理論上會縮短 (因為canal新版本性能有150%的提升,用kafka 6個程序去消費)
-
已經預設關閉自動送出位移,請使用例子的 commit 或者 commitAsync 在代碼邏輯執行完成手動送出位置
原因:自動送出 比如設定1000ms送出一次 就是1秒,這個時候消費了3個,但是未送出,程式代碼崩潰了,就會出現3個重複消費,位移不準确的問題,如果是在業務上也可以通過自己業務做幂等邏輯判斷,日志同步的時候不好做判斷
- 包裝的用戶端\Lib\Mq\RdKafkaMq::getClient() 簡單的例子,有需要的可以留言問部落客索要
/**
* 擷取kafka消費者
* 擷取後 通過consume(毫秒ms)擷取消費消息 commit() 或者 commitAsync() 送出目前位移
* 也可以設定多少毫秒自動送出位移enable.auto.commit=true auto.commit.interval.ms = 1000ms
* 這裡建議邏輯執行成功後手動送出位移
* 更多配置參數和詳解,建議參考如下官方文檔
* @link https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
* @param string $groupId 消費組id
* @param array $topics 訂閱的主題清單
* @param array $config kafka配置 如果傳入則設定該配置
* @return \RdKafka\KafkaConsumer
*/
public function getConsumer(string $groupId,array $topics,array $config = [])
{
$confs = array_merge([
//kafka broker 清單
'metadata.broker.list' => $this->brokerList,
//當沒有初始偏移量時,從哪裡開始讀取 earliest從頭,latest從最後,設定預設從最後消費
'auto.offset.reset' => 'latest',
//配置groud.id 具有相同 group.id 的consumer将會處理不同分區的消息,消費者程序的數量多于分區的數量是沒有意義的。
'group.id' => $groupId,
//自動送出位移 預設設定為不送出
'enable.auto.commit' => 'false',
],$config);
$kafkaConf = new Conf();
foreach ($confs as $confKey => $conf) {
$kafkaConf->set($confKey,$conf);
}
// 當有新的消費程序加入或者退出消費組時,kafka 會自動重新配置設定分區給消費者程序,這裡注冊了一個回調函數,當分區被重新配置設定時觸發
$kafkaConf->setRebalanceCb($this->getRebalanceCallBack($groupId));
$kafkaConsumer = new KafkaConsumer($kafkaConf);
$kafkaConsumer->subscribe($topics);
return $kafkaConsumer;
}
- 簡單的消費代碼
$kafkaMq = \Lib\Mq\RdKafkaMq::getClient();
$kafkaConsumer = $kafkaMq->getConsumer('search_incr',['test_binlog_icr'],['auto.offset.reset' => 'earliest']);
//需要等待分區配置設定
while (true) {
$message = $kafkaConsumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message->partition.":".$message->offset);
//執行邏輯并且确定沒問題則送出位移
$kafkaConsumer->commit();
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
檢視所有topic
- –bootstrap-server 192.168.3.101:9092 參數為server位址
./bin/kafka-topics.sh --bootstrap-server 192.168.3.101:9092 --list
建立topic
- 建立主題時用到三個參數:
- –topic :主題名字
- –partitions :分區數量
- –replication-factor :副本因子 幾個副本(不能超過叢集broker量 比如3叢集 最多3)
./bin/kafka-topics.sh --bootstrap-server 192.168.3.101:9092 --create --topic test_binlog_icr --partitions 6 --replication-factor 1
删除topic
./bin/kafka-topics.sh --bootstrap-server 192.168.3.101:9092 --delete --topic test_binlog_icr
檢視所有消費組
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.101:9092 --list
建立消費組
- 新版本連接配接時會自動建立 不需要提前建立
删除消費組
- 新版本所有成員離開會自動處理不用删除