天天看點

使用EMR-Flume同步Kafka資料到HDFS使用EMR-Flume同步Kafka資料到HDFS1. 背景2.準備工作3. Flume配置4.運作Flume agent5.load balance

使用EMR-Flume同步Kafka資料到HDFS

1. 背景

Flume是一個分布式、可靠和高效的資料彙聚系統,其source、channel和sink的結構設計,不僅實作了資料生産者與消費者的解耦,還提供了資料緩沖的功能。Flume支援多種source、channel和sink,也可以實作自定義source、channel和sink并以插件的方式加入Flume中。同時,Flume也支援資料處理、負載均衡、failover和資料高可靠等進階特性。E-MapReduce從3.19.0版本開始,提供了EMR-Flume叢集管理、gateway擴充等多種特性友善Flume的使用,詳見

Flume使用說明

一個比較通用的使用場景是使用Flume将Kafka的資料按照時間分區同步至HDFS,進行實時的流式分析,或者使用Hive等工具進行離線的統計。下面就詳細的介紹使用Flume同步Kafka的資料到HDFS。

2.準備工作

建立Kafka叢集并建立topic test,詳細步驟參考

Kafka快速入門

建立Hadoop叢集,在可選軟體中選擇Flume,詳細步驟參考

建立叢集

3. Flume配置

首先對agent和綁定關系進行如下配置

agent.sources source
agent.sinks sink
agent.channels channel
agent.sources.source1.channels
agent.sinks.k1.channel

3.1 Kafka source

因為寫入HDFS的資料是按照時間分區的,如果在HDFS sink中配置useLocalTimeStamp将寫入HDFS的時間作為分區時間,在Flume有資料滞後時,會将資料寫入錯誤的分區。預設的,Kafka Source會在接收資料時将系統時間寫入header中,可以使用該時間作為分區時間。Kafka source配置如下

agent.sources.source.type org.apache.flume.source.kafka.KafkaSource
agent.sources.source.batchSize 5000
agent.sources.source.kafka.bootstrap.servers ip:port
agent.sources.source.kafka.topics test
agent.sources.source.kafka.consumer.group.id test-group

其中,agent.sources.source.kafka.bootstrap.servers為Kafka broker的位址,根據實際配置。

在實際使用中,Kafka topic的資料量可能很大,超過一個Flume agent的負載,可以啟動多個agent,使用相同的consumer group id來共同消費同一個topic的資料;同時,如果其中一個agent失敗,其他agent也會繼續消費topic的資料,達到容災的效果。

3.2 channel

根據實際情況,需要在性能和可靠性做權衡。比如相比file channel,memory channel性能更高,但是在agent停止後channel中的資料會丢失;file channel雖然性能不如memory channel,但是持久化在磁盤的資料可以在agent停止後保證資料不丢失。此處使用file channel做說明

agent.channels.channel.transactionCapacity 51200
agent.channels.channel.checkpointDir /mnt/disk1/flume/file-channel/checkpoint
agent.channels.channel.dataDirs /mnt/disk1/flume/file-channel/data
agent.channels.channel.capacity

3.3 HDFS sink

将資料以時間為分區寫入HDFS。考慮到配合Hive進行查詢,可以在路徑中添加列名。例如添加datetime和hour列,如下所示

agent.sinks.sink.hdfs.path /tmp/flume-data/datetime=%y%m%d/hour=%H
agent.sinks.sink.hdfs.fileType DataStream
agent.sinks.sink.hdfs.rollSize
agent.sinks.sink.hdfs.rollCount
agent.sinks.sink.hdfs.rollInterval 3600
agent.sinks.sink.hdfs.batchSize
agent.sinks.sink.hdfs.round true
agent.sinks.sink.hdfs.roundValue 60
agent.sinks.sink.hdfs.roundUnit minute

其中,batchSize的設定需要在發送效率和延遲中做出選擇,設定過大會資料滞後,設定過小會影響HDFS的吞吐。

為防止生成過多小檔案,此處按照時間(1小時)來生成檔案,也可根據實際情況根據event數或者檔案大小來生成檔案。

4.運作Flume agent

參考

成功運作agent之後,可以檢視HDFS中存儲的資料。如下圖所示檢視2019年4月9日20點的資料

使用EMR-Flume同步Kafka資料到HDFS使用EMR-Flume同步Kafka資料到HDFS1. 背景2.準備工作3. Flume配置4.運作Flume agent5.load balance

5.load balance

為了保證下遊sink的可靠性,可以配置多個sink并使用相同的load balance sink組。這樣,在其中一個sink失敗時,其他sink可以從channel拉取資料并sink到HDFS中。如下設定了兩個avro sink同屬于一個sink組load-balancer-sink-group。

avro-sink-1 avro-sink-2
agent.sinkgroups load-balancer-sink-group
agent.sinkgroups.load-balancer-sink-group.sinks
agent.sinkgroups.load-balancer-sink-group.processor.type load_balance
agent.sinkgroups.load-balancer-sink-group.processor.selector random
agent.sinks.avro-sink-1.type avro
agent.sinks.avro-sink-1.hostname emr-worker-1
agent.sinks.avro-sink-1.port 19999
agent.sinks.avro-sink-2.type
agent.sinks.avro-sink-2.hostname emr-worker-2
agent.sinks.avro-sink-2.port

使用sink組替代2.3 HDFS sink 中介紹的配置後,需要在emr-worker-1和emr-worker-2兩個節點配置source為avro,sink為HDFS的Flume agent。如下所示

agent.sources.source.bind 0.0.0.0
agent.sources.source.port
agent.sinks.sink.type hdfs
agent.channels.channel.type file