Flume日志收集系統詳解
- 一、Flume簡介
-
- 1.1 Flume特點
-
- 1.1.1 可靠性
- 1.1.2 可恢複性
- 1.2 Flume架構
- 二、Flume原理
-
- 2.1 主要元件
- 2.2 工作流程
- 三、flume建立執行個體
-
- 3.1 Exec Source 類型
- 3.2 spooling directory source類型
- 3.3 Taildir Source類型
- 3.4 Netcat Source (TCP)類型
- 3.4 将讀取檔案上傳至hdfs上
- 3.5 Java自定義攔截器,将讀取資料上傳至hdfs
- 3.7 flume讀取資料至kafka
一、Flume簡介
Apache Flume是一個 分布式的、可靠的、可用的 資料收集系統 ,它可以有效地收集、聚合和移動大量的 日志資料 ,這些資料可以從許多不同的來源轉移到一個集中的資料存儲中。
Apache Flume不僅僅限于日志資料聚合。由于資料源是可定制的,是以Flume可用于傳輸大量事件資料,包括但不限于網絡流量資料、社交媒體生成的資料、電子郵件消息以及幾乎所有可能的資料源。
1.1 Flume特點
1.1.1 可靠性
Flume 的核心是把 資料從資料源收集過來,再送到目的地 。為了保證輸送一定成功,在送到目的地之前,會 先緩存資料 待資料真正到達目的地後,删除自己緩存的資料 。Flume 使用 事務性的方式保證傳送 Event整個過程的可靠性 。 Sink 必須在Event 被存入 Channel 後,或者,已經被傳達到下一站 Agent裡,又或者,已經被存入外部資料目的地之後,才能把 Event 從 Channel 中 remove 掉。這樣資料流裡的 Event 無論是在一個 agent 裡還是多個 agent 之間流轉,都能保證可靠,因為以上的事務保證了 Event 會被成功存儲起來。比如 Flume支援在本地儲存一份檔案 Channel 作為備份,而 Memory Channel 将 Event存在記憶體隊列裡,速度快,但丢失的話無法恢複。
1.1.2 可恢複性
Events在 通道中執行,由該通道管理從失敗中恢複 。 Flume支援由本地檔案系統支援的持久檔案通道。還有一個記憶體通道,它隻是簡單地将事件存儲在記憶體隊列中,速度更快,但是當代理程序死亡時,仍然留在記憶體通道中的任何事件都無法恢複。
1.2 Flume架構
Flume架構包括三部分: Client、 Agent、 Event
①Event: 事件,是資料傳輸的基本單元 。 它具有位元組有效載荷和一組可選的字元串屬性 ,通常對應一行資料。 實際包含一個 Map結構的 headers和一個 byte[]類型的 body屬性。 Event是資料流的資料對象,而 Flume資料流( Data Flow) 描述了資料從産生、傳輸、處理并最終寫入目标的一條路徑。
②Agent: 代理,是一個獨立的JVM程序,包含三個元件(Source、Channel、Sink),事件通過元件從外部源流向下一個目标。
③Client: 用戶端,資料産生的地方,如Web伺服器。
注: Flume以一個或多個Agent部署運作,Flume資料流模型(架構圖)如下圖所示:
二、Flume原理
2.1 主要元件
Agent元件包括:包括: Source、 SourceRunner、 Interceptor、 Channel、ChannelSelector、ChannelProcessor、 Sink SinkRunner、 SinkProcessor、 SinkSelector,其中 Source、Channel 、 Sink為核心元件,各元件作用如下:
- Source : 是負責接收資料到 Flume Agent的 元件,用來擷取 Event 并寫入 Channel。
- SourceRunner : 負責啟動 Source,一個 SourceRunner包含一個 Source對象。
- Interceptor: 攔截器,是簡單的插件式元件,設定在 Source和 Channel之間。Source接收到的事件 Event,在寫入Channel之前,攔截器都可以進行轉換或者删除這些事件。每個攔截器隻處理同一個 Source接收到的事件,可以自定義攔截器。
- Channel: 位于 Source和 Sink之間的緩沖區, 中轉 Event 的一個臨時存儲,儲存有 Source 元件傳遞過來的 Event,可以認為是一個隊列。 Channel允許 Source和 Sink運作在不同的速率上。 Channel是線程安全的,可以同時處理幾個 Source的寫入操作和幾個 Sink的讀取操作。
- ChannelSelector: 選擇器,作用是為 Source選擇下遊的 Channel。有兩種選擇方式, 複制和多路複用 。複制 是把 Source中傳遞過來的 Event複制給所有對應的下遊的Channel。多路複用 是把 Source傳遞過來的 Event按照不同的屬性傳遞到不同的下遊 Channel中去。
-
ChannelProcessor: 通過 ChannelSelector擷取到 Channels後,如何發送 Event到
Channel。 一個 Source對象包含一個 ChannelProcessor對象,一個 ChannelProcessor對象包含多個 Interceptor對象和一個 ChannelSelector對象。
- Sink: 從 Channel 中讀取并移除 Event,将 Event 傳遞到 Flow Pipeline 中的下一個 Agent 或者其他存儲系統。 Sink不斷地輪詢 Channel中的事件且批量地移除它們 ,并将這些事件批量寫入到存儲或索引系統、或者被發送到另一個 Flume Agent。Sink是完全事務 性 的。 在從 Channel批量删除資料之前,每個 Sink用Channel啟動一個事務。批量 事件 一旦成功寫出到存儲系統或下一個Flume Agent Sink就利用 Channel送出事務。事務 一旦 被送出,該 Channel從 自己 的内部緩沖區删除事件。
-
SinkRunner: 負責啟動 Sink。在 Agent啟動時,會同時啟動 Channel SourceRunner
SinkRunner
- SinkProcessor: Flume提供 FailoverSinkProcessor和 LoadBalancingSinkProcesso,一個是失效備援,一個是負載均衡,那麼 SinkProcessor不同子類的存在就是為了實作不同的配置設定操作和政策,而 sink的 start()通常是啟動線程去執行消費操作。
- SinkSelector: LoadBalancingSinkProcessor包含 SinkSelector,會根據 SinkSelector在 SinkGroup(邏輯上的一組 Sink)中選擇 Sink并啟動。
2.2 工作流程
三、flume建立執行個體
在flume安裝的根目錄/conf目錄下建立檔案job:,在該檔案夾下存放自定義的flume配置檔案
mkdir /opt/install/flume160/conf/job
3.1 Exec Source 類型
執行Linux指令,并按照指令傳回結果,如 “tail-f”
示例: 在job目錄下建立 exec.conf 檔案,編輯内容如下:
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
#設定source類型為exec
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /opt/dataFile/flume-0817/exectest.txt
#source和channel連接配接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory
#指定sink
a1.sinks.sk1.type = logger
#sink和channel進行連接配接
a1.sinks.sk1.channel = c1
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
在flume根目錄執行:
./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/job/exec.conf -Dflume.root.logger=INFO,console
在 /opt/dataFile/flume-0817/exectest.txt 檔案中寫入資料,flume 即可開始運作,執行相應讀寫操作
注: 在exectest.txt尾部添加内容:
echo hahaha >> /opt/dataFile/flume-0817/exectest.txt
3.2 spooling directory source類型
從磁盤檔案夾中擷取檔案資料,可避免重新開機或者發送失敗後資料丢失,還可用于監控檔案夾新檔案
示例: 在job目錄下建立 events-flume-logger.conf 檔案,編輯内容如下:
events.sources = eventsSource
events.channels = eventsChannel
events.sinks = eventsSink
events.sinks.eventsSink.type = logger
events.sources.eventsSource.type = spooldir
#需先建立目錄: /opt/dataFile/flumeFile/events 用于存放需要讀取的.csv檔案
events.sources.eventsSource.spoolDir = /opt/dataFile/flumeFile/events
events.sources.eventsSource.deserializer = LINE
events.sources.eventsSource.deserializer.maxLineLength = 32000
#正則比對需要讀的檔案名
events.sources.eventsSource.includePattern = events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.channels.eventsChannel.type = file
#需先建立目錄:/optdataFile/flumeFile/checkpoint/events 設定檢查點
events.channels.eventsChannel.checkpointDir = /optdataFile/flumeFile/checkpoint/events
#需先建立目錄: /opt/dataFile/flumeFile/data/events 存放Channel資料
events.channels.eventsChannel.dataDirs = /opt/dataFile/flumeFile/data/events
events.sources.eventsSource.channels = eventsChannel
events.sinks.eventsSink.channel = eventsChannel
在flume根目錄執行:
./bin/flume-ng agent --name events --conf conf/ --conf-file conf/job/events-flume-logger.conf -Dflume.root.logger=INFO,console
将要讀取的資料 events.csv 拷貝至 /opt/dataFile/flumeFile/events 目錄下,flume 即可開始運作,執行相應讀寫操作
拷貝資料,注意修改檔案格式:
install events.csv /opt/dataFile/flumeFile/events/events_2020-08-17.csv
3.3 Taildir Source類型
Taildir Source監控指定的一些檔案,并在檢測到新的一行資料産生的時候實時地讀取它們,如果新的一行資料還沒寫完, Taildir Source會等到這行寫完後再讀取。 Taildir Source可以從任意指定的位置開始讀取檔案,可以實作斷點續讀,如果發生當機,會從當機前記錄的最後讀取的位置開始讀檔案,而不是從首行重新開始讀取。 預設情況下,它将從每個檔案的第一行開始讀取。
檔案按照修改時間的順序來讀取。修改時間最早的檔案将最先被讀取(簡單記成:先來先走)。 Taildir Source不重命名、删除或修改它監控的檔案。目前不支援讀 取二進制檔案。隻能逐行讀取文本檔案。注意:Taildir Source目前不能運作在 windows系統上。
示例: 在job目錄下建立 tailDir.conf 檔案,編輯内容如下:
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
#設定source類型為TAILDIR
a1.sources.s1.type = TAILDIR
a1.sources.s1.filegroups = f1 f2
#配置filegroups的兩個資料源 f1 f2
a1.sources.s1.filegroups.f1 = /opt/dataFile/flume-0817/tail_1/example.log
a1.sources.s1.filegroups.f2 = /opt/dataFile//flume-0817/tail_2/.*log.*
#指定position的位置, 讀取檔案會記錄位置
a1.sources.s1.positionFile = /opt/dataFile//flume-0817/tail_position/taildir_position.json
#指定headers
a1.sources.s1.headers.f1.headerKey1 = value1
a1.sources.s1.headers.f2.headerKey1 = value2
a1.sources.s1.headers.f2.headerKey1 = value3
a1.sources.s1.fileHeader = true
#source和channel連接配接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory
#指定sink
a1.sinks.sk1.type = logger
#sink和channel進行連接配接
a1.sinks.sk1.channel = c1
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
在flume根目錄執行:
./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/job/tailDir.conf -Dflume.root.logger=INFO,console
在f1、f2目錄中建立一些資料,,flume 即可開始運作,執行相應讀寫操作
3.4 Netcat Source (TCP)類型
一個類似netcat的源,它監聽給定的端口并将每行文本轉換成一個事件。就像 nc -k -l [主機 ][端口 ]。換句話說,它打開指定的端口并偵聽資料。期望提供的資料是換行分隔的文本 。每行文本被轉換成一個 Flume事件并通過連接配接的通道發送。
示例: 在job目錄下建立 netcat-flume-logger.conf 檔案,編輯内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在flume根目錄執行:
./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
測試: 需先下載下傳一個網絡工具netcat,Linux預設情況下是沒有安裝的,安裝過程如下:
yum install -y nc
#列出telnet相關的安裝包
yum list telnet*
#安裝telnet服務
yum install telnet-server
#安裝telnet用戶端
yum install telnet.*
輸入端口号44444,測試連接配接:
telnet localhost 44444
3.4 将讀取檔案上傳至hdfs上
示例: 讀取檔案user_friends.csv,上傳至hdfs /data/userFriends目錄下,在job目錄下建立 file-flume-hdfs.conf 檔案,編輯内容如下:
user_friends.sources = userFriendsSource
user_friends.channels = userFriendsChannel
user_friends.sinks = userFriendsSink
user_friends.sources.userFriendsSource.type = spooldir
#需先建立目錄: /opt/dataFile/flumeFile/userFriends 用于存放需要讀取的.csv檔案
user_friends.sources.userFriendsSource.spoolDir = /opt/dataFile/flumeFile/user_friends
user_friends.sources.userFriendsSource.deserializer = LINE
user_friends.sources.userFriendsSource.deserializer.maxLineLength = 600000
#正則比對需要讀的檔案名
user_friends.sources.userFriendsSource.includePattern = userfriends_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
user_friends.channels.userFriendsChannel.type = file
#需先建立目錄:/optdataFile/flumeFile/checkpoint/userFriends 設定檢查點
user_friends.channels.userFriendsChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/userFriends
#需先建立目錄: /opt/dataFile/flumeFile/data/userFriends 存放Channel資料
user_friends.channels.userFriendsChannel.dataDirs = /opt/dataFile/flumeFile/data/userFriends
user_friends.sinks.userFriendsSink.type = hdfs
user_friends.sinks.userFriendsSink.hdfs.fileType = DataStream
user_friends.sinks.userFriendsSink.hdfs.filePrefilx = userFriends
user_friends.sinks.userFriendsSink.hdfs.filePrefilx = .csv
user_friends.sinks.userFriendsSink.hdfs.path = hdfs://192.168.206.129:9000/data/userFriends/%Y-%m-%d
user_friends.sinks.userFriendsSink.hdfs.useLocalTimeStamp = true
user_friends.sinks.userFriendsSink.hdfs.batchSize = 640
user_friends.sinks.userFriendsSink.hdfs.rollCount = 0
user_friends.sinks.userFriendsSink.hdfs.rollSize = 6400000
user_friends.sinks.userFriendsSink.hdfs.rollInterval = 30
user_friends.sources.userFriendsSource.channels = userFriendsChannel
user_friends.sinks.userFriendsSink.channel = userFriendsChannel
在flume根目錄執行:
./bin/flume-ng agent --name user_friends --conf conf/ --conf-file conf/job/file-flume-hdfs.conf -Dflume.root.logger=INFO,console
将要讀取的資料 user_friends.csv 拷貝至 /opt/dataFile/flumeFile/userFriends 目錄下,flume 即可開始運作,執行相應讀寫操作
拷貝資料,注意修改檔案格式:
install events.csv /opt/dataFile/flumeFile/userFriends/userfriends_2020-08-17.csv
3.5 Java自定義攔截器,将讀取資料上傳至hdfs
自定義攔截器實作功能: 讀取每行資料,資料以 “spark” 開頭時,将檔案上傳至 hdfs: /data/sparkDemo目錄下,否則上傳至 hdfs: /data/tmpDemo 目錄下
①: IDEA中需先導入Maven依賴:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
IDEA中建立java檔案 InterceptorDemo ,
自定義攔截器代碼如下:
package cn.com;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class InterceptorDemo implements Interceptor { //繼承攔截器接口
private List<Event> addHeaderEvents;
@Override
public void initialize() {
addHeaderEvents = new ArrayList<>();
}
@Override
public Event intercept(Event event) {
Map<String,String> headers = event.getHeaders();
String body = new String(event.getBody());
if(body.startsWith("spark")){
headers.put("type","spark");
}else{
headers.put("type","tmp");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
addHeaderEvents.clear();
for(Event event:list){
addHeaderEvents.add(intercept(event));
}
return addHeaderEvents;
}
@Override
public void close() {
}
//建立一個靜态内部類,同過靜态内部類加載 InterceptorDemo 對象
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new InterceptorDemo();
}
@Override
public void configure(Context context) {
}
}
}
②: 将以上java檔案,打jar包上傳至 /opt/install/flume160/lib/ 目錄
③: Linux中在job目錄下建立 netcat-flume-logerhdfs.conf 檔案,編輯内容如下:
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#設定攔截器,注意設定攔截器所在jar包位置: cn.com.InterceptorDemo$Builder ,用$連接配接
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.com.InterceptorDemo$Builder
#設定選擇器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.gree = c1
a1.sources.r1.selector.mapping.lijia = c2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = spark
a1.sinks.k1.hdfs.fileSuffix = .csv
a1.sinks.k1.hdfs.path = hdfs://192.168.206.129:9000/data/sparkdemo/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 640
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 100
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.filePrefix = tmp
a1.sinks.k2.hdfs.fileSuffix = .csv
a1.sinks.k2.hdfs.path = hdfs://192.168.206.129:9000/data/tmpdemo/%Y-%m-%d
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.batchSize = 640
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollSize = 100
a1.sinks.k2.hdfs.rollInterval = 3
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
④: 測試連接配接
telnet localhost 44444
分别輸入資料
sparkabc
spark abc 123
abcdefg
123456
sparktest
可以在hdfs端口: http://192.168.206.129:50070/ 檢視攔截器是否分類讀取上傳至 hdfs/data 目錄下
3.7 flume讀取資料至kafka
例① 将讀取的資料直接儲存至kafka中:
vi userfriends-flume-kafka.conf ,内容如下:
user_friends.sources = userFriendsSource
user_friends.channels = userFriendsChannel
user_friends.sinks = userFriendsSink
user_friends.sources.userFriendsSource.type = spooldir
user_friends.sources.userFriendsSource.spoolDir = /opt/dataFile/flumeFile/user_friends
user_friends.sources.userFriendsSource.deserializer = LINE
user_friends.sources.userFriendsSource.deserializer.maxLineLength = 60000
user_friends.sources.userFriendsSource.includePattern = userfriends_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
user_friends.channels.userFriendsChannel.type = file
user_friends.channels.userFriendsChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/userFriends
user_friends.channels.userFriendsChannel.dataDir = /opt/dataFile/flumeFile/data/userFriends
user_friends.sinks.userFriendsSink.type = org.apache.flume.sink.kafka.KafkaSink
user_friends.sinks.userFriendsSink.batchSize = 640
user_friends.sinks.userFriendsSink.brokerList = 192.168.206.129:9092
user_friends.sinks.userFriendsSink.topic = user_friends_raw
user_friends.sources.userFriendsSource.channels = userFriendsChannel
user_friends.sinks.userFriendsSink.channel = userFriendsChannel
例② 使用flume自帶的攔截器過濾掉首行,如下圖:
想要過濾掉首行字段:
vi users-flume-kafka.conf ,内容如下:
users.sources = usersSource
users.channels = usersChannel
users.sinks = usersSink
users.sources.usersSource.type = spooldir
users.sources.usersSource.spoolDir = /opt/dataFile/flumeFile/users
users.sources.usersSource.deserializer = LINE
users.sources.usersSource.deserializer.maxLineLength = 3000
users.sources.usersSource.includePattern = users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
#設定自帶的攔截器
users.sources.usersSource.interceptors = head_filter
users.sources.usersSource.interceptors.head_filter.type = regex_filter
users.sources.usersSource.interceptors.head_filter.regex = ^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents = true
users.channels.usersChannel.type = file
users.channels.usersChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/users
users.channels.usersChannel.dataDir = /opt/dataFile/flumeFile/data/users
users.sinks.usersSink.type = org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize = 640
users.sinks.usersSink.brokerList = 192.168.206.129:9092
users.sinks.usersSink.topic = users
users.sources.usersSource.channels = usersChannel
users.sinks.usersSink.channel = usersChannel
例③ 将讀取資料同時儲存到 kafka 和 hdfs 中:
vi train-flume-hdfs_kafka.conf ,内容如下:
train.sources = trainSource
train.channels = kafkaChannel hdfsChannel
train.sinks = kafkaSink hdfsSink
train.sources.trainSource.type = spooldir
train.sources.trainSource.spoolDir = /opt/dataFile/flumeFile/train
train.sources.trainSource.deserializer = LINE
train.sources.trainSource.deserializer.maxLineLength = 3000
train.sources.trainSource.trainSource.includePattern = train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
#使用自帶攔截器過濾首行
train.sources.trainSource.interceptors = head_filter
train.sources.trainSource.interceptors.head_filter.type = regex_filter
train.sources.trainSource.interceptors.head_filter.regex = ^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents = true
train.channels.kafkaChannel.type = file
train.channels.kafkaChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/train
train.channels.kafkaChannel.dataDir =
/opt/dataFile/flumeFile/data/train
train.channels.hdfsChannel.type = memory
train.channels.hdfsChannel.capacity = 64000
train.channels.hdfsChannel.transactionCapacity = 16000
train.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
train.sinks.kafkaSink.batchSize = 640
train.sinks.kafkaSink.brokerList = 192.168.206.129:9092
train.sinks.kafkaSink.topic = train
train.sinks.hdfsSink.type = hdfs
train.sinks.hdfsSink.hdfs.fileType = DataStream
train.sinks.hdfsSink.hdfs.filePrefix = train
train.sinks.hdfsSink.hdfs.fileSuffix = .csv
train.sinks.hdfsSink.hdfs.path = hdfs://192.168.206.129:9000/data/train/%Y-%m-%d
train.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
train.sinks.hdfsSink.hdfs.batchSize = 6400
train.sinks.hdfsSink.hdfs.rollCount = 0
train.sinks.hdfsSink.hdfs.rollSize = 64000000
train.sinks.hdfsSink.hdfs.rollInterval = 10
train.sources.trainSource.channels = hdfsChannel kafkaChannel
train.sinks.hdfsSink.channel = hdfsChannel
train.sinks.kafkaSink.channel = kafkaChannel