本期導讀 :【OSS 通路加速】第八講
主題:Flume 高效寫入 OSS
講師:焱冰,阿裡巴巴計算平台事業部 EMR 技術專家
内容架構:
- Flume 簡介
- Flume 常用元件
- Flume 使用 JindoFS SDK
- Flume 實戰 JindoFS SDK
直播回放連結:(7/8講)
https://developer.aliyun.com/live/246851一、Flume 簡介
Apache Flume 簡介
- Apache Flume 是 Apache 基金會的一個頂級項目,以下簡稱 Flume。
- Flume 是一個分布式、可靠、高可用的系統,支援從不同資料源高效地收集、聚合、遷移大量日志資料,聚合到中心化的資料存儲服務。
- Flume 使用最多的場景是日志收集,也可以通過定制 Source 來傳輸其他不同類型的資料。
- E-MapReduce 從 3.16.0 版本開始支援 Apache Flume。

Flume 中的概念及術語
一個 Flume Agent 由 Source、Channel、Sink 組成。
Event
- 資料流通過 Flume Agent 的基本機關。
- Event 由一個裝載位元組數組負載(Payload)和一個可選的字元串屬性集合組成。
Source
- 資料源收集器,從外部資料源收集資料,并發送到 Channel。
Channel
- Source 和 Sink 之間的緩沖隊列。
Sink
- 從 Channel 中擷取 Event ,并将以事務的形式 commit 到外部存儲中。一旦事務 commit 成功,該 Event 會從 Channel 中移除。
二、Flume 常用元件
常用元件介紹
常見 Source
- Avro Source:通過監聽 Avro 端口擷取 Avro Client 發送的事件。Avro 是 Hadoop 提供的一種協定,用于序列化反序列化資料。
- Exec Source:通過監聽指令行輸出擷取資料,如 tail -f /var/log/messages。
- NetCat TCP Source: 監聽指定 tcp 端口擷取資料。類似的還有 Netcat UDP Source。
- Taildir Source: 監控目錄下的多個檔案,會記錄偏移量,不會丢失資料,最為常用。
常見 Channel
- Memory Channel: 緩存到記憶體中,性能高,最為常用。
- File Channel: 緩存到檔案中,會記錄 checkpoint 和 data 檔案,可靠性高,但性能較差。
- JDBC Channel: 緩存到關系型資料庫中。
- Kakfa Channel:通過 Kafka 來緩存資料。
常見 Sink
- Logger Sink: 用于測試
- Avro Sink: 轉換成 Avro Event,主要用于連接配接多個 Flume Agent。
- HDFS Sink: 寫入 HDFS,最為常用。
- Hive sink: 寫入 Hive 表或分區,使用 Hive 事務寫 events。
- Kafka sink: 寫入 Kafka。
文檔
-
官方文檔:
https://flume.apache.org/documentation.html
-
中文文檔:
https://flume.liyifeng.org/
三、Flume 使用 JindoFS SDK
Flume 使用 JindoFS SDK 寫入 OSS
環境要求
在叢集上已經部署 Flume,已部署 JindoSDK 3.4 以上版本。
為什麼需要使用 JindoFS SDK 寫入 OSS
Flume 通過 flush() 調用保證事務性寫入,OSS 本身不支援 Flush 功能,通過 JindoFS SDK 寫入 OSS,雖然不能讓 flush 後的資料立刻可見,但是可以保證 flush() 後的資料不丢失,Flume 作業失敗後,可以使用 JindoFS 指令恢複 flush 過的資料。
配置示例
xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%Y-%m-%d/%H
xxx.sinks.oss_sink.hdfs.batchSize = 100000
xxx.sinks.oss_sink.hdfs.round = true
xxx.sinks.oss_sink.hdfs.roundValue = 15
xxx.sinks.oss_sink.hdfs.Unit = minute
xxx.sinks.oss_sink.hdfs.filePrefix = your_topic
xxx.sinks.oss_sink.rollSize = 3600
xxx.sinks.oss_sink.threadsPoolSize = 30
- 文檔連結⭐
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md在 EMR 叢集内對 Flush 檔案恢複
jindo jfs -recover [-R]
[-flushStagingPath {flushStagingPath}]
[-accessKeyId ${accessKeyId}]
[-accessKeySecret ${accessKeySecret}]
<path>
注:如需遞歸恢複(-R),建議先停止 Flume 任務,避免 Flume 任務運作異常。
在 EMR 叢集外對 Flush 檔案恢複
JindoOssFileSystem jindoFileSystem = (JindoOssFileSystem) fs;
boolean isFolder = true;
jindoFileSystem.recover(path, isFolder);
四、Flume 實戰 JindoFS SDK
自建Flume 使用 JindoFS SDK 壓縮寫入 OSS
環境準備
Hadoop-2.8.5
下載下傳
Flume-1.9.0:wget
https://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz添加依賴
cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
cp commons-configuration-1.6.jar $FLUME_HOME/lib
cp hadoop-auth-2.8.5.jar $FLUME_HOME/lib
cp hadoop-common-2.8.5.jar $FLUME_HOME/lib
cp hadoop-hdfs-2.8.5.jar $FLUME_HOME/lib
cp commons-io-2.4.jar $FLUME_HOME/lib
cp htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib
wget https://smartdata-binary.oss-cn-shanghai.aliyuncs.com/jindofs-sdk-3.5.0.jar -O
$FLUME_HOME/lib/jindofs-sdk-3.5.0.jar
配置 JindoFS SDK
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_how_to_hadoop.md配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/test.log
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 20
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = oss://yanbin-hd2-test/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.batchSize = 20
a1.sinks.k1.hdfs.codeC = gzip
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.rollCount = 20
a2.sinks.k1.hdfs.minBlockReplicas = 1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
日志仿真
while true; do echo `date` >> /tmp/test.log; sleep 1; done
Flume 啟動
bin/flume-ng agent --name a1 -c conf -f conf/flume-exec-oss.conf -Dflume.root.logger=INFO,console
結果
直接觀看第四課(7/8講)視訊回放,擷取執行個體講解~
⭐Github連結:
https://github.com/aliyun/alibabacloud-jindofs不錯過每次直播資訊、探讨更多資料湖 JindoFS+OSS 相關技術問題,歡迎掃碼加入釘釘交流群!