天天看點

資料湖實操講解【OSS 通路加速】第八講:Flume 高效寫入 OSS

本期導讀 :【OSS 通路加速】第八講

主題:Flume 高效寫入 OSS

講師:焱冰,阿裡巴巴計算平台事業部 EMR 技術專家

内容架構:

  • Flume 簡介
  • Flume 常用元件
  • Flume 使用 JindoFS SDK
  • Flume 實戰 JindoFS SDK

直播回放連結:(7/8講)

https://developer.aliyun.com/live/246851

一、Flume 簡介

Apache Flume 簡介

  • Apache Flume 是 Apache 基金會的一個頂級項目,以下簡稱 Flume。
  • Flume 是一個分布式、可靠、高可用的系統,支援從不同資料源高效地收集、聚合、遷移大量日志資料,聚合到中心化的資料存儲服務。
  • Flume 使用最多的場景是日志收集,也可以通過定制 Source 來傳輸其他不同類型的資料。
  • E-MapReduce 從 3.16.0 版本開始支援 Apache Flume。
資料湖實操講解【OSS 通路加速】第八講:Flume 高效寫入 OSS

Flume 中的概念及術語

資料湖實操講解【OSS 通路加速】第八講:Flume 高效寫入 OSS

一個 Flume Agent 由 Source、Channel、Sink 組成。

Event

  • 資料流通過 Flume Agent 的基本機關。
  • Event 由一個裝載位元組數組負載(Payload)和一個可選的字元串屬性集合組成。
資料湖實操講解【OSS 通路加速】第八講:Flume 高效寫入 OSS

Source

  • 資料源收集器,從外部資料源收集資料,并發送到 Channel。

Channel

  • Source 和 Sink 之間的緩沖隊列。

Sink

  • 從 Channel 中擷取 Event ,并将以事務的形式 commit 到外部存儲中。一旦事務 commit 成功,該 Event 會從 Channel 中移除。

二、Flume 常用元件

常用元件介紹

常見 Source

  • Avro Source:通過監聽 Avro 端口擷取 Avro Client 發送的事件。Avro 是 Hadoop 提供的一種協定,用于序列化反序列化資料。
  • Exec Source:通過監聽指令行輸出擷取資料,如 tail -f /var/log/messages。
  • NetCat TCP Source: 監聽指定 tcp 端口擷取資料。類似的還有 Netcat UDP Source。
  • Taildir Source: 監控目錄下的多個檔案,會記錄偏移量,不會丢失資料,最為常用。

常見 Channel

  • Memory Channel: 緩存到記憶體中,性能高,最為常用。
  • File Channel: 緩存到檔案中,會記錄 checkpoint 和 data 檔案,可靠性高,但性能較差。
  • JDBC Channel: 緩存到關系型資料庫中。
  • Kakfa Channel:通過 Kafka 來緩存資料。

常見 Sink

  • Logger Sink: 用于測試
  • Avro Sink: 轉換成 Avro Event,主要用于連接配接多個 Flume Agent。
  • HDFS Sink: 寫入 HDFS,最為常用。
  • Hive sink: 寫入 Hive 表或分區,使用 Hive 事務寫 events。
  • Kafka sink: 寫入 Kafka。

文檔

  • 官方文檔:

    https://flume.apache.org/documentation.html

  • 中文文檔:

    https://flume.liyifeng.org/

三、Flume 使用 JindoFS SDK

Flume 使用 JindoFS SDK 寫入 OSS

環境要求

在叢集上已經部署 Flume,已部署 JindoSDK 3.4 以上版本。

為什麼需要使用 JindoFS SDK 寫入 OSS

Flume 通過 flush() 調用保證事務性寫入,OSS 本身不支援 Flush 功能,通過 JindoFS SDK 寫入 OSS,雖然不能讓 flush 後的資料立刻可見,但是可以保證 flush() 後的資料不丢失,Flume 作業失敗後,可以使用 JindoFS 指令恢複 flush 過的資料。

配置示例

xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%Y-%m-%d/%H 
xxx.sinks.oss_sink.hdfs.batchSize = 100000 
xxx.sinks.oss_sink.hdfs.round = true
xxx.sinks.oss_sink.hdfs.roundValue = 15
xxx.sinks.oss_sink.hdfs.Unit = minute
xxx.sinks.oss_sink.hdfs.filePrefix = your_topic
xxx.sinks.oss_sink.rollSize = 3600
xxx.sinks.oss_sink.threadsPoolSize = 30      

- 文檔連結⭐

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md

在 EMR 叢集内對 Flush 檔案恢複

jindo jfs -recover [-R]
                   [-flushStagingPath {flushStagingPath}]
                   [-accessKeyId ${accessKeyId}]
                   [-accessKeySecret ${accessKeySecret}]
                   <path>      

注:如需遞歸恢複(-R),建議先停止 Flume 任務,避免 Flume 任務運作異常。

在 EMR 叢集外對 Flush 檔案恢複

JindoOssFileSystem jindoFileSystem = (JindoOssFileSystem) fs; 
boolean isFolder = true; 
jindoFileSystem.recover(path, isFolder);      

四、Flume 實戰 JindoFS SDK

自建Flume 使用 JindoFS SDK 壓縮寫入 OSS

環境準備

Hadoop-2.8.5

下載下傳

F​lume-1.9.0:wget

https://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

添加依賴

cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
cp commons-configuration-1.6.jar $FLUME_HOME/lib
cp hadoop-auth-2.8.5.jar $FLUME_HOME/lib
cp hadoop-common-2.8.5.jar $FLUME_HOME/lib
cp hadoop-hdfs-2.8.5.jar $FLUME_HOME/lib
cp commons-io-2.4.jar $FLUME_HOME/lib
cp htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib      
wget https://smartdata-binary.oss-cn-shanghai.aliyuncs.com/jindofs-sdk-3.5.0.jar -O 
$FLUME_HOME/lib/jindofs-sdk-3.5.0.jar      

配置 JindoFS SDK

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_how_to_hadoop.md

配置

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/test.log

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 20

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = oss://yanbin-hd2-test/%Y-%m-%d/%H

a1.sinks.k1.hdfs.filePrefix = test

a1.sinks.k1.hdfs.batchSize = 20

a1.sinks.k1.hdfs.codeC = gzip

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.rollCount = 20

a2.sinks.k1.hdfs.minBlockReplicas = 1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

日志仿真

while true; do echo `date` >> /tmp/test.log; sleep 1; done      

Flume 啟動

bin/flume-ng agent --name a1 -c conf -f conf/flume-exec-oss.conf  -Dflume.root.logger=INFO,console      

結果

資料湖實操講解【OSS 通路加速】第八講:Flume 高效寫入 OSS

直接觀看第四課(7/8講)視訊回放,擷取執行個體講解~

⭐Github連結:

https://github.com/aliyun/alibabacloud-jindofs

不錯過每次直播資訊、探讨更多資料湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!

資料湖實操講解【OSS 通路加速】第八講:Flume 高效寫入 OSS

繼續閱讀