通過Kafka協定通路DataHub
基本概念
Kafka是目前主流的消息隊列之一,Appache頂級開源項目。DataHub服務基于飛天平台的自研消息隊列産品,集團内的使用者可能很多對DataHub并不是很熟悉,但是對TimeTunnel(簡稱TT)會更了解一些,DataHub和TT現在底層是同一套服務,隻是集團内和集團外的應用場景不同,是以部署模式不同,目前DataHub主要服務于公有雲和專有雲,集團内業務較少,TT主要服務于集團内的使用者。
DataHub現在已經全面相容Kafka的讀寫,使用者使用原生的Kafka SDK即可直接通過Kafka協定通路DataHub,對于一直使用Kafka的使用者,僅需要修改用戶端配置參數即可,實作最小成本的遷移上雲。
Datahub和Kafka的差別
DataHub和Kafka的功能基本上一緻,這裡不針對功能上的差別做展開,主要介紹上底層的差別。
差別1 協定不同
Kafka基于TCP的定制了一套協定,而DataHub的協定是基于HTTP的restful風格協定。
差別2 Kafka用戶端直連Broker,而DataHub通過Frontend轉發
DataHub:
Kafka:
如上圖所示,紅色虛線上方表示用戶端,下方表示服務端。Kafka在讀寫資料時,會與Broker直接建立TCP連接配接,這也就要求用戶端必須可以直接通路服務端的機器,這也在一定程度上降低了服務端整體的安全性。對DataHub而言,使用者是無法直接通路服務端的broker位址的,所有的使用者請求,都需要經過VIP分發到各個Frontend機器上,然後再轉發到Broker上,相對于Kafka,叢集的安全性更高。
Kafka用戶端通路模式的不同就決定了,Kafka必須是使用者自建或者雲上的執行個體型産品,每個Kafka執行個體使用者獨享,而DataHub的定位就是雲上服務型産品,所有的使用者共享同一個叢集的實體資源,并在Frontend做一系列的權限控制達到資源隔離的效果。是以決定了 是以在穩定性、彈性等方面DataHub都是遠遠高于Kafka的。
差別3 資料存儲模式不同
Kafka和DataHub都是利用順序讀寫和Page cache等特點實作的高吞吐,主要差別是存儲方式不同。如上圖所示,虛線框内的部分表示一台機器,Kafka leader接收到資料之後,會直接寫入本地存儲中,然後再由follower讀取資料并分别寫入各自的存儲來保證分布式一緻性,而DataHub的broker則是會随機的寫入叢集中N台機器的存儲中,是以Kafka和DataHub在Broker層面都存在一定程度的通訊開銷,Kafka的内部網絡開銷為follower和leader的資料一緻性上,而DataHub的broker内部通訊開銷主要在broker寫其他機器上,但是DataHub可以實作非常重要的一個特點——計算存儲分離。
使用Kafka協定通路datahub實踐
示例一: 原生Kafka遷移到DataHub的Kafka
對于使用自建Kafka的使用者,隻需要修改部分參數配置即可直接将業務遷移到DataHub上。具體的配置可以參考下表:
C=Consumer, P=Producer
參數 | C/P | 可選配置 | 是否必須 | 描述 |
bootstrap.servers | * | 參考 Kafka域名清單 | 是 | |
security.protocol | SASL_SSL | 為了保證資料傳輸的安全性,Kafka寫入DataHub預設使用SSL加密傳輸 | ||
sasl.mechanism | PLAIN | AK認證方式,僅支援PLAIN | ||
compression.type | P | LZ4 | 否 | 是否開啟壓縮傳輸,目前僅支援LZ4 |
group.id | C | project.topic:subId | 必須和訂閱的topic保持一緻,否則無法讀取資料 | |
session.timeout.ms | [60000, 180000] | kafka預設為10000, 但是因為DataHub限制最小為60000,是以這裡預設會變為60000 | ||
heartbeat.interval.ms | 建議session.timeout.ms的 2/3 | Kafka預設為3000,但是因為 會被預設修改為60000,是以這裡建議顯示設定為40000,否則heartbeat請求會過于頻繁 |
Producer示例:
生成kafka_client_producer_jaas.conf檔案
建立檔案kafka_client_producer_jaas.conf,儲存到任意路徑,檔案内容如下。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
maven依賴
Kafka-client版本至少大于等于0.10.0.0,推薦2.4.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
示例代碼
public class ProducerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "lz4");
String KafkaTopicName = "test_project.test_topic";
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
List<Header> headers = new ArrayList<>();
RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
headers.add(header1);
headers.add(header2);
ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
// sync send
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
運作結果
運作結果
運作成功之後,可以再DataHub抽樣一下,确認是否正常DataHub。
Consumer示例
生成kafka_client_producer_jaas.conf檔案和maven依賴參考Producer示例
新加入的consumer需要1分鐘左右配置設定shard,配置設定完成後即可消費。
public class ConsumerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}
運作成功之後,便可以在終端看到讀取到的資料。
ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)
注意:這裡同一個請求傳回的資料的LogAppendTime是相同的,是該請求傳回所有的資料的寫入DataHub時間的最大值
示例二: Canal采集MySQL增量資料到DataHub(Kafka)并用Flink讀取
1.配置MySQL
Canal采集MySQL要求源庫開啟binlog,并要求配置主庫位址,從庫無法采集,這裡不再贅述。
本文以表orders為例介紹後續的配置流程,表結構如下所示:
mysql> desc orders;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
| oid | int(11) | YES | | 5 | |
| pid | int(11) | YES | | NULL | |
| num | int(11) | YES | | NULL | |
2. 建立DataHub Topic
如果想要使用Kafka協定通路DataHub,在建立Topic時必須選擇為shard擴充模式,表示topic的擴容方式為添加shard。
//TODO 添加截圖
測試所用的project、topic、訂閱id分别為: test_project、test_topic、xxxxxx
3.配置Canal
Canal可以采集MySQL的binlog并寫入Kafka,但是因為開源Canal的一些限制導緻Canal無法通過Kafka協定通路DataHub,是以DataHub團隊對開源Canal做了一些改造,使得使用者可以使用Canal直接将增量資料寫入DataHub(Kafka),具體的改動内容可以參考
DataHub官方文檔。使用者可以直接下載下傳改造好的
Canal。
将canal.deployer 複制到固定目錄并解壓
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
修改instance配置
conf/example/instance.properties
# 按需修改成自己的資料庫資訊
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,資料庫的使用者名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 針對庫名或者表名發送動态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#庫名.表名: 唯一主鍵,多個表之間用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
對應ip 位址的MySQL 資料庫需進行相關初始化與設定, 可參考
Canal QuickStart。針對庫名的動态TopicName和根據主鍵哈希的設定,可參考
mq參數說明修改canal配置檔案
conf/canal.properties
# ...
canal.serverMode = kafka
# 這裡以杭州網際網路位址為例
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
其中必須配置參數為
canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism
,其他參數使用者可根據實際情況自主進行調優。
修改jass配置檔案
conf/kafka_client_producer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
啟動
cd /usr/local/canal/
sh bin/startup.sh
4.配置Flink
1). Flink源表
阿裡雲雲上的Flink通路Kafka必須使用VPC Kafka域名位址,其他Flink根據實際情況選擇。
create table kafka_source(
`oid` BIGINT,
`pid` BIGINT,
`num` BIGINT
) with (
'connector' = 'kafka',
'topic' = 'test_project.test_topic1',
'properties.bootstrap.servers' = 'dh-cn-hangzhou-int-vpc.aliyuncs.com:9094',
'properties.group.id' = 'test_project.test_topic1:subId',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="AccessId" password="AccessKey";'
);
2). Flink結果表
為了友善展示結果,本例以print類型作為結果表。
CREATE TABLE print_table (
oid BIGINT,
pid BIGINT,
num BIGINT
) WITH (
'connector'='print',
'logger'='true'
);
3). Flink sql
INSERT INTO print_canal_table select
`oid BIGINT`,
`pid BIGINT`,
`num BIGINT`
FROM kafka_canal_source;
結果展示
在MySQL中執行一條sql語句
mysql> insert into orders values(1,2,3);
1). canal日志
檢視 logs/canal/canal.log
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
檢視instance的日志:
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2). DataHub寫入結果
{
"data":[
{
"oid":"1",
"pid":"2",
"num":"3"
}
],
"database":"ggtt",
"es":1591092305000,
"id":2,
"isDdl":false,
"mysqlType":{
"oid":"int(11)",
"pid":"int(11)",
"num":"int(11)"
},
"old":null,
"pkNames":null,
"sql":"",
"sqlType":{
"oid":4,
"pid":4,
"num":4
},
"table":"orders",
"ts":1591092305813,
"type":"INSERT"
}
3). Flink輸出
在Flink日志中會有如下輸出
2021-02-19 19:54:57,638 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I(1,2,3)
針對原生Kafka的優劣勢
優勢
- DataHub的Kafka屬于服務型産品,大叢集共享實體資源,彈性更大,穩定性更強
- DataHub可與衆多雲上産品無縫銜接,提升易用性
- 節省大量運維成本,開箱即用,使用者隻需關注業務相關内容,DataHub官方保證可用性不低于99.9%
- 按量付費,寫入不收費,同步其他雲産品不收費,目前僅收取存儲、讀取以及shard租用費,成本相對更低
劣勢
- 因為産品的差異性是以導緻無法對Kafka接口完全相容,目前DataHub隻相容Kafka讀寫相關接口
- DataHub的Kafka接口目前不支援幂等性、事務性等功能
總結
目前DataHub已經全面相容Kafka讀寫接口,對于一直在使用Kafka的使用者,無需再投入精力學習一套其他接口,可以直接使用原生Kafka用戶端實作業務的全面上雲。
如果您在使用中有任何問題,可以參考官方文檔
相容kafka,或者加入釘釘群咨詢,群内值班同學會耐心解決您的問題。