天天看點

DataHub完全相容kafka通過Kafka協定通路DataHub

通過Kafka協定通路DataHub

基本概念

Kafka是目前主流的消息隊列之一,Appache頂級開源項目。DataHub服務基于飛天平台的自研消息隊列産品,集團内的使用者可能很多對DataHub并不是很熟悉,但是對TimeTunnel(簡稱TT)會更了解一些,DataHub和TT現在底層是同一套服務,隻是集團内和集團外的應用場景不同,是以部署模式不同,目前DataHub主要服務于公有雲和專有雲,集團内業務較少,TT主要服務于集團内的使用者。

DataHub現在已經全面相容Kafka的讀寫,使用者使用原生的Kafka SDK即可直接通過Kafka協定通路DataHub,對于一直使用Kafka的使用者,僅需要修改用戶端配置參數即可,實作最小成本的遷移上雲。

Datahub和Kafka的差別

DataHub和Kafka的功能基本上一緻,這裡不針對功能上的差別做展開,主要介紹上底層的差別。

差別1 協定不同

Kafka基于TCP的定制了一套協定,而DataHub的協定是基于HTTP的restful風格協定。

差別2 Kafka用戶端直連Broker,而DataHub通過Frontend轉發

DataHub:

DataHub完全相容kafka通過Kafka協定通路DataHub

Kafka:

DataHub完全相容kafka通過Kafka協定通路DataHub

如上圖所示,紅色虛線上方表示用戶端,下方表示服務端。Kafka在讀寫資料時,會與Broker直接建立TCP連接配接,這也就要求用戶端必須可以直接通路服務端的機器,這也在一定程度上降低了服務端整體的安全性。對DataHub而言,使用者是無法直接通路服務端的broker位址的,所有的使用者請求,都需要經過VIP分發到各個Frontend機器上,然後再轉發到Broker上,相對于Kafka,叢集的安全性更高。

Kafka用戶端通路模式的不同就決定了,Kafka必須是使用者自建或者雲上的執行個體型産品,每個Kafka執行個體使用者獨享,而DataHub的定位就是雲上服務型産品,所有的使用者共享同一個叢集的實體資源,并在Frontend做一系列的權限控制達到資源隔離的效果。是以決定了 是以在穩定性、彈性等方面DataHub都是遠遠高于Kafka的。

差別3 資料存儲模式不同

Kafka和DataHub都是利用順序讀寫和Page cache等特點實作的高吞吐,主要差別是存儲方式不同。如上圖所示,虛線框内的部分表示一台機器,Kafka leader接收到資料之後,會直接寫入本地存儲中,然後再由follower讀取資料并分别寫入各自的存儲來保證分布式一緻性,而DataHub的broker則是會随機的寫入叢集中N台機器的存儲中,是以Kafka和DataHub在Broker層面都存在一定程度的通訊開銷,Kafka的内部網絡開銷為follower和leader的資料一緻性上,而DataHub的broker内部通訊開銷主要在broker寫其他機器上,但是DataHub可以實作非常重要的一個特點——計算存儲分離。

使用Kafka協定通路datahub實踐

示例一: 原生Kafka遷移到DataHub的Kafka

對于使用自建Kafka的使用者,隻需要修改部分參數配置即可直接将業務遷移到DataHub上。具體的配置可以參考下表:

C=Consumer, P=Producer

參數 C/P 可選配置 是否必須 描述
bootstrap.servers * 參考 Kafka域名清單
security.protocol SASL_SSL 為了保證資料傳輸的安全性,Kafka寫入DataHub預設使用SSL加密傳輸
sasl.mechanism PLAIN AK認證方式,僅支援PLAIN
compression.type P LZ4 是否開啟壓縮傳輸,目前僅支援LZ4
group.id C project.topic:subId 必須和訂閱的topic保持一緻,否則無法讀取資料
session.timeout.ms [60000, 180000] kafka預設為10000, 但是因為DataHub限制最小為60000,是以這裡預設會變為60000
heartbeat.interval.ms 建議session.timeout.ms的 2/3 Kafka預設為3000,但是因為

session.timeout.ms

會被預設修改為60000,是以這裡建議顯示設定為40000,否則heartbeat請求會過于頻繁

Producer示例:

生成kafka_client_producer_jaas.conf檔案

建立檔案kafka_client_producer_jaas.conf,儲存到任意路徑,檔案内容如下。

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};      
maven依賴

Kafka-client版本至少大于等于0.10.0.0,推薦2.4.0

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>      
示例代碼
public class ProducerExample {
    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("compression.type", "lz4");
        String KafkaTopicName = "test_project.test_topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        try {
            List<Header> headers = new ArrayList<>();
            RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
            RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
            headers.add(header1);
            headers.add(header2);
            ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
            // sync send
            producer.send(record).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}
運作結果      
運作結果

運作成功之後,可以再DataHub抽樣一下,确認是否正常DataHub。

DataHub完全相容kafka通過Kafka協定通路DataHub

Consumer示例

生成kafka_client_producer_jaas.conf檔案和maven依賴參考Producer示例

新加入的consumer需要1分鐘左右配置設定shard,配置設定完成後即可消費。

public class ConsumerExample {
static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}      

運作成功之後,便可以在終端看到讀取到的資料。

ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)      

注意:這裡同一個請求傳回的資料的LogAppendTime是相同的,是該請求傳回所有的資料的寫入DataHub時間的最大值

示例二: Canal采集MySQL增量資料到DataHub(Kafka)并用Flink讀取

1.配置MySQL

Canal采集MySQL要求源庫開啟binlog,并要求配置主庫位址,從庫無法采集,這裡不再贅述。

本文以表orders為例介紹後續的配置流程,表結構如下所示:

mysql> desc orders;

+-------+--------------+------+-----+---------+-------+

| Field | Type         | Null | Key | Default | Extra |

| oid   | int(11)      | YES  |     | 5       |       |

| pid   | int(11)      | YES  |     | NULL    |       |

| num   | int(11)      | YES  |     | NULL    |       |

2. 建立DataHub Topic

如果想要使用Kafka協定通路DataHub,在建立Topic時必須選擇為shard擴充模式,表示topic的擴容方式為添加shard。

//TODO 添加截圖

測試所用的project、topic、訂閱id分别為: test_project、test_topic、xxxxxx

3.配置Canal

Canal可以采集MySQL的binlog并寫入Kafka,但是因為開源Canal的一些限制導緻Canal無法通過Kafka協定通路DataHub,是以DataHub團隊對開源Canal做了一些改造,使得使用者可以使用Canal直接将增量資料寫入DataHub(Kafka),具體的改動内容可以參考

DataHub官方文檔

。使用者可以直接下載下傳改造好的

Canal

将canal.deployer 複制到固定目錄并解壓

mkdir -p /usr/local/canal

tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

修改instance配置

conf/example/instance.properties

#  按需修改成自己的資料庫資訊
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,資料庫的使用者名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 針對庫名或者表名發送動态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#庫名.表名: 唯一主鍵,多個表之間用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################      

對應ip 位址的MySQL 資料庫需進行相關初始化與設定, 可參考

Canal QuickStart

。針對庫名的動态TopicName和根據主鍵哈希的設定,可參考

mq參數說明

修改canal配置檔案

conf/canal.properties

# ...
canal.serverMode = kafka
# 這裡以杭州網際網路位址為例
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN      

其中必須配置參數為

canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism

,其他參數使用者可根據實際情況自主進行調優。

修改jass配置檔案

conf/kafka_client_producer_jaas.conf

KafkaClient {

  org.apache.kafka.common.security.plain.PlainLoginModule required

  username="accessId"

  password="accessKey";

};

啟動

cd /usr/local/canal/

sh bin/startup.sh

4.配置Flink

1). Flink源表

阿裡雲雲上的Flink通路Kafka必須使用VPC Kafka域名位址,其他Flink根據實際情況選擇。

create table kafka_source(  
  `oid` BIGINT,
  `pid` BIGINT,
  `num` BIGINT
) with (
  'connector' = 'kafka',
  'topic' = 'test_project.test_topic1',
  'properties.bootstrap.servers' = 'dh-cn-hangzhou-int-vpc.aliyuncs.com:9094',
  'properties.group.id' = 'test_project.test_topic1:subId',
  'format' = 'canal-json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="AccessId" password="AccessKey";'
);      

2). Flink結果表

為了友善展示結果,本例以print類型作為結果表。

CREATE TABLE print_table (
  oid BIGINT,
  pid BIGINT,
  num BIGINT
) WITH (
  'connector'='print',
  'logger'='true'
);      

3). Flink sql

INSERT INTO print_canal_table select 
    `oid BIGINT`,
    `pid BIGINT`,
    `num BIGINT`
    FROM kafka_canal_source;      

結果展示

在MySQL中執行一條sql語句

mysql> insert into orders values(1,2,3);

1). canal日志

檢視 logs/canal/canal.log

vi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......      

檢視instance的日志:

vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....      
2). DataHub寫入結果
{
    "data":[
        {
            "oid":"1",
            "pid":"2",
            "num":"3"
        }
    ],
    "database":"ggtt",
    "es":1591092305000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "oid":"int(11)",
        "pid":"int(11)",
        "num":"int(11)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "oid":4,
        "pid":4,
        "num":4
    },
    "table":"orders",
    "ts":1591092305813,
    "type":"INSERT"
}      
3). Flink輸出

在Flink日志中會有如下輸出

2021-02-19 19:54:57,638 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I(1,2,3)

針對原生Kafka的優劣勢

優勢

  • DataHub的Kafka屬于服務型産品,大叢集共享實體資源,彈性更大,穩定性更強
  • DataHub可與衆多雲上産品無縫銜接,提升易用性
  • 節省大量運維成本,開箱即用,使用者隻需關注業務相關内容,DataHub官方保證可用性不低于99.9%
  • 按量付費,寫入不收費,同步其他雲産品不收費,目前僅收取存儲、讀取以及shard租用費,成本相對更低

劣勢

  • 因為産品的差異性是以導緻無法對Kafka接口完全相容,目前DataHub隻相容Kafka讀寫相關接口
  • DataHub的Kafka接口目前不支援幂等性、事務性等功能

總結

目前DataHub已經全面相容Kafka讀寫接口,對于一直在使用Kafka的使用者,無需再投入精力學習一套其他接口,可以直接使用原生Kafka用戶端實作業務的全面上雲。

如果您在使用中有任何問題,可以參考官方文檔

相容kafka

,或者加入釘釘群咨詢,群内值班同學會耐心解決您的問題。

DataHub完全相容kafka通過Kafka協定通路DataHub