天天看點

Flume日志收集系統詳解----硬核解析Flume日志收集系統詳解一、Flume簡介二、Flume原理三、flume建立執行個體

Flume日志收集系統詳解

  • 一、Flume簡介
    • 1.1 Flume特點
      • 1.1.1 可靠性
      • 1.1.2 可恢複性
    • 1.2 Flume架構
  • 二、Flume原理
    • 2.1 主要元件
    • 2.2 工作流程
  • 三、flume建立執行個體
    • 3.1 Exec Source 類型
    • 3.2 spooling directory source類型
    • 3.3 Taildir Source類型
    • 3.4 Netcat Source (TCP)類型
    • 3.4 将讀取檔案上傳至hdfs上
    • 3.5 Java自定義攔截器,将讀取資料上傳至hdfs
    • 3.7 flume讀取資料至kafka

一、Flume簡介

Apache Flume是一個 分布式的、可靠的、可用的 資料收集系統 ,它可以有效地收集、聚合和移動大量的 日志資料 ,這些資料可以從許多不同的來源轉移到一個集中的資料存儲中。

Apache Flume不僅僅限于日志資料聚合。由于資料源是可定制的,是以Flume可用于傳輸大量事件資料,包括但不限于網絡流量資料、社交媒體生成的資料、電子郵件消息以及幾乎所有可能的資料源。

1.1 Flume特點

1.1.1 可靠性

Flume 的核心是把 資料從資料源收集過來,再送到目的地 。為了保證輸送一定成功,在送到目的地之前,會 先緩存資料 待資料真正到達目的地後,删除自己緩存的資料 。Flume 使用 事務性的方式保證傳送 Event整個過程的可靠性 。 Sink 必須在Event 被存入 Channel 後,或者,已經被傳達到下一站 Agent裡,又或者,已經被存入外部資料目的地之後,才能把 Event 從 Channel 中 remove 掉。這樣資料流裡的 Event 無論是在一個 agent 裡還是多個 agent 之間流轉,都能保證可靠,因為以上的事務保證了 Event 會被成功存儲起來。比如 Flume支援在本地儲存一份檔案 Channel 作為備份,而 Memory Channel 将 Event存在記憶體隊列裡,速度快,但丢失的話無法恢複。

1.1.2 可恢複性

Events在 通道中執行,由該通道管理從失敗中恢複 。 Flume支援由本地檔案系統支援的持久檔案通道。還有一個記憶體通道,它隻是簡單地将事件存儲在記憶體隊列中,速度更快,但是當代理程序死亡時,仍然留在記憶體通道中的任何事件都無法恢複。

1.2 Flume架構

Flume架構包括三部分: Client、 Agent、 Event

①Event: 事件,是資料傳輸的基本單元 。 它具有位元組有效載荷和一組可選的字元串屬性 ,通常對應一行資料。 實際包含一個 Map結構的 headers和一個 byte[]類型的 body屬性。 Event是資料流的資料對象,而 Flume資料流( Data Flow) 描述了資料從産生、傳輸、處理并最終寫入目标的一條路徑。

②Agent: 代理,是一個獨立的JVM程序,包含三個元件(Source、Channel、Sink),事件通過元件從外部源流向下一個目标。

③Client: 用戶端,資料産生的地方,如Web伺服器。

注: Flume以一個或多個Agent部署運作,Flume資料流模型(架構圖)如下圖所示:

Flume日志收集系統詳解----硬核解析Flume日志收集系統詳解一、Flume簡介二、Flume原理三、flume建立執行個體

二、Flume原理

2.1 主要元件

Agent元件包括:包括: Source、 SourceRunner、 Interceptor、 Channel、ChannelSelector、ChannelProcessor、 Sink SinkRunner、 SinkProcessor、 SinkSelector,其中 Source、Channel 、 Sink為核心元件,各元件作用如下:

  • Source : 是負責接收資料到 Flume Agent的 元件,用來擷取 Event 并寫入 Channel。
  • SourceRunner : 負責啟動 Source,一個 SourceRunner包含一個 Source對象。
  • Interceptor: 攔截器,是簡單的插件式元件,設定在 Source和 Channel之間。Source接收到的事件 Event,在寫入Channel之前,攔截器都可以進行轉換或者删除這些事件。每個攔截器隻處理同一個 Source接收到的事件,可以自定義攔截器。
  • Channel: 位于 Source和 Sink之間的緩沖區, 中轉 Event 的一個臨時存儲,儲存有 Source 元件傳遞過來的 Event,可以認為是一個隊列。 Channel允許 Source和 Sink運作在不同的速率上。 Channel是線程安全的,可以同時處理幾個 Source的寫入操作和幾個 Sink的讀取操作。
  • ChannelSelector: 選擇器,作用是為 Source選擇下遊的 Channel。有兩種選擇方式, 複制和多路複用 。複制 是把 Source中傳遞過來的 Event複制給所有對應的下遊的Channel。多路複用 是把 Source傳遞過來的 Event按照不同的屬性傳遞到不同的下遊 Channel中去。
  • ChannelProcessor: 通過 ChannelSelector擷取到 Channels後,如何發送 Event到

    Channel。 一個 Source對象包含一個 ChannelProcessor對象,一個 ChannelProcessor對象包含多個 Interceptor對象和一個 ChannelSelector對象。

  • Sink: 從 Channel 中讀取并移除 Event,将 Event 傳遞到 Flow Pipeline 中的下一個 Agent 或者其他存儲系統。 Sink不斷地輪詢 Channel中的事件且批量地移除它們 ,并将這些事件批量寫入到存儲或索引系統、或者被發送到另一個 Flume Agent。Sink是完全事務 性 的。 在從 Channel批量删除資料之前,每個 Sink用Channel啟動一個事務。批量 事件 一旦成功寫出到存儲系統或下一個Flume Agent Sink就利用 Channel送出事務。事務 一旦 被送出,該 Channel從 自己 的内部緩沖區删除事件。
  • SinkRunner: 負責啟動 Sink。在 Agent啟動時,會同時啟動 Channel SourceRunner

    SinkRunner

  • SinkProcessor: Flume提供 FailoverSinkProcessor和 LoadBalancingSinkProcesso,一個是失效備援,一個是負載均衡,那麼 SinkProcessor不同子類的存在就是為了實作不同的配置設定操作和政策,而 sink的 start()通常是啟動線程去執行消費操作。
  • SinkSelector: LoadBalancingSinkProcessor包含 SinkSelector,會根據 SinkSelector在 SinkGroup(邏輯上的一組 Sink)中選擇 Sink并啟動。

2.2 工作流程

Flume日志收集系統詳解----硬核解析Flume日志收集系統詳解一、Flume簡介二、Flume原理三、flume建立執行個體

三、flume建立執行個體

在flume安裝的根目錄/conf目錄下建立檔案job:,在該檔案夾下存放自定義的flume配置檔案

mkdir /opt/install/flume160/conf/job
           

3.1 Exec Source 類型

執行Linux指令,并按照指令傳回結果,如 “tail-f”

示例: 在job目錄下建立 exec.conf 檔案,編輯内容如下:

a1.sources = s1
a1.sinks = sk1
a1.channels = c1

#設定source類型為exec
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /opt/dataFile/flume-0817/exectest.txt
#source和channel連接配接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory

#指定sink
a1.sinks.sk1.type = logger

#sink和channel進行連接配接
a1.sinks.sk1.channel = c1

a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
           

在flume根目錄執行:

./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/job/exec.conf -Dflume.root.logger=INFO,console
           

在 /opt/dataFile/flume-0817/exectest.txt 檔案中寫入資料,flume 即可開始運作,執行相應讀寫操作

注: 在exectest.txt尾部添加内容:

echo hahaha >> /opt/dataFile/flume-0817/exectest.txt
           

3.2 spooling directory source類型

從磁盤檔案夾中擷取檔案資料,可避免重新開機或者發送失敗後資料丢失,還可用于監控檔案夾新檔案

示例: 在job目錄下建立 events-flume-logger.conf 檔案,編輯内容如下:

events.sources = eventsSource
events.channels = eventsChannel
events.sinks = eventsSink

events.sinks.eventsSink.type = logger

events.sources.eventsSource.type = spooldir

#需先建立目錄: /opt/dataFile/flumeFile/events   用于存放需要讀取的.csv檔案
events.sources.eventsSource.spoolDir = /opt/dataFile/flumeFile/events

events.sources.eventsSource.deserializer = LINE
events.sources.eventsSource.deserializer.maxLineLength = 32000

#正則比對需要讀的檔案名
events.sources.eventsSource.includePattern = events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

events.channels.eventsChannel.type = file

#需先建立目錄:/optdataFile/flumeFile/checkpoint/events   設定檢查點
events.channels.eventsChannel.checkpointDir = /optdataFile/flumeFile/checkpoint/events

#需先建立目錄: /opt/dataFile/flumeFile/data/events  存放Channel資料
events.channels.eventsChannel.dataDirs = /opt/dataFile/flumeFile/data/events

events.sources.eventsSource.channels = eventsChannel
events.sinks.eventsSink.channel = eventsChannel
           

在flume根目錄執行:

./bin/flume-ng agent --name events --conf conf/ --conf-file conf/job/events-flume-logger.conf -Dflume.root.logger=INFO,console
           

将要讀取的資料 events.csv 拷貝至 /opt/dataFile/flumeFile/events 目錄下,flume 即可開始運作,執行相應讀寫操作

拷貝資料,注意修改檔案格式:

install events.csv /opt/dataFile/flumeFile/events/events_2020-08-17.csv
           

3.3 Taildir Source類型

Taildir Source監控指定的一些檔案,并在檢測到新的一行資料産生的時候實時地讀取它們,如果新的一行資料還沒寫完, Taildir Source會等到這行寫完後再讀取。 Taildir Source可以從任意指定的位置開始讀取檔案,可以實作斷點續讀,如果發生當機,會從當機前記錄的最後讀取的位置開始讀檔案,而不是從首行重新開始讀取。 預設情況下,它将從每個檔案的第一行開始讀取。

檔案按照修改時間的順序來讀取。修改時間最早的檔案将最先被讀取(簡單記成:先來先走)。 Taildir Source不重命名、删除或修改它監控的檔案。目前不支援讀 取二進制檔案。隻能逐行讀取文本檔案。注意:Taildir Source目前不能運作在 windows系統上。

示例: 在job目錄下建立 tailDir.conf 檔案,編輯内容如下:

a1.sources = s1
a1.sinks = sk1
a1.channels = c1

#設定source類型為TAILDIR
a1.sources.s1.type = TAILDIR
a1.sources.s1.filegroups = f1 f2

#配置filegroups的兩個資料源 f1   f2
a1.sources.s1.filegroups.f1 = /opt/dataFile/flume-0817/tail_1/example.log

a1.sources.s1.filegroups.f2 = /opt/dataFile//flume-0817/tail_2/.*log.*

#指定position的位置, 讀取檔案會記錄位置
a1.sources.s1.positionFile = /opt/dataFile//flume-0817/tail_position/taildir_position.json

#指定headers
a1.sources.s1.headers.f1.headerKey1 = value1
a1.sources.s1.headers.f2.headerKey1 = value2
a1.sources.s1.headers.f2.headerKey1 = value3

a1.sources.s1.fileHeader = true

#source和channel連接配接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory

#指定sink
a1.sinks.sk1.type = logger

#sink和channel進行連接配接
a1.sinks.sk1.channel = c1

a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
           

在flume根目錄執行:

./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/job/tailDir.conf -Dflume.root.logger=INFO,console
           

在f1、f2目錄中建立一些資料,,flume 即可開始運作,執行相應讀寫操作

3.4 Netcat Source (TCP)類型

一個類似netcat的源,它監聽給定的端口并将每行文本轉換成一個事件。就像 nc -k -l [主機 ][端口 ]。換句話說,它打開指定的端口并偵聽資料。期望提供的資料是換行分隔的文本 。每行文本被轉換成一個 Flume事件并通過連接配接的通道發送。

示例: 在job目錄下建立 netcat-flume-logger.conf 檔案,編輯内容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

在flume根目錄執行:

./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
           

測試: 需先下載下傳一個網絡工具netcat,Linux預設情況下是沒有安裝的,安裝過程如下:

yum install -y nc

#列出telnet相關的安裝包
yum list telnet*

#安裝telnet服務
yum install telnet-server	

#安裝telnet用戶端
yum install telnet.*
           

輸入端口号44444,測試連接配接:

telnet localhost 44444
           

3.4 将讀取檔案上傳至hdfs上

示例: 讀取檔案user_friends.csv,上傳至hdfs /data/userFriends目錄下,在job目錄下建立 file-flume-hdfs.conf 檔案,編輯内容如下:

user_friends.sources = userFriendsSource
user_friends.channels = userFriendsChannel
user_friends.sinks = userFriendsSink

user_friends.sources.userFriendsSource.type = spooldir

#需先建立目錄: /opt/dataFile/flumeFile/userFriends 用于存放需要讀取的.csv檔案
user_friends.sources.userFriendsSource.spoolDir = /opt/dataFile/flumeFile/user_friends

user_friends.sources.userFriendsSource.deserializer = LINE
user_friends.sources.userFriendsSource.deserializer.maxLineLength = 600000

#正則比對需要讀的檔案名
user_friends.sources.userFriendsSource.includePattern = userfriends_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

user_friends.channels.userFriendsChannel.type = file

#需先建立目錄:/optdataFile/flumeFile/checkpoint/userFriends  設定檢查點
user_friends.channels.userFriendsChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/userFriends

#需先建立目錄: /opt/dataFile/flumeFile/data/userFriends 存放Channel資料
user_friends.channels.userFriendsChannel.dataDirs = /opt/dataFile/flumeFile/data/userFriends

user_friends.sinks.userFriendsSink.type = hdfs
user_friends.sinks.userFriendsSink.hdfs.fileType = DataStream
user_friends.sinks.userFriendsSink.hdfs.filePrefilx = userFriends
user_friends.sinks.userFriendsSink.hdfs.filePrefilx = .csv
user_friends.sinks.userFriendsSink.hdfs.path = hdfs://192.168.206.129:9000/data/userFriends/%Y-%m-%d
user_friends.sinks.userFriendsSink.hdfs.useLocalTimeStamp = true
user_friends.sinks.userFriendsSink.hdfs.batchSize = 640
user_friends.sinks.userFriendsSink.hdfs.rollCount = 0
user_friends.sinks.userFriendsSink.hdfs.rollSize = 6400000
user_friends.sinks.userFriendsSink.hdfs.rollInterval = 30

user_friends.sources.userFriendsSource.channels = userFriendsChannel
user_friends.sinks.userFriendsSink.channel = userFriendsChannel
           

在flume根目錄執行:

./bin/flume-ng agent --name user_friends --conf conf/ --conf-file conf/job/file-flume-hdfs.conf -Dflume.root.logger=INFO,console
           

将要讀取的資料 user_friends.csv 拷貝至 /opt/dataFile/flumeFile/userFriends 目錄下,flume 即可開始運作,執行相應讀寫操作

拷貝資料,注意修改檔案格式:

install events.csv /opt/dataFile/flumeFile/userFriends/userfriends_2020-08-17.csv
           

3.5 Java自定義攔截器,将讀取資料上傳至hdfs

自定義攔截器實作功能: 讀取每行資料,資料以 “spark” 開頭時,将檔案上傳至 hdfs: /data/sparkDemo目錄下,否則上傳至 hdfs: /data/tmpDemo 目錄下

①: IDEA中需先導入Maven依賴:

<dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.6.0</version>
</dependency>
           

IDEA中建立java檔案 InterceptorDemo ,

自定義攔截器代碼如下:

package cn.com;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class InterceptorDemo implements Interceptor {   //繼承攔截器接口
    private List<Event> addHeaderEvents;
    @Override
    public void initialize() {
        addHeaderEvents = new ArrayList<>();
    }

    @Override
    public Event intercept(Event event) {
        Map<String,String> headers = event.getHeaders();
        String body = new String(event.getBody());
        if(body.startsWith("spark")){
            headers.put("type","spark");
        }else{
            headers.put("type","tmp");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        addHeaderEvents.clear();
        for(Event event:list){
            addHeaderEvents.add(intercept(event));
        }
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

	//建立一個靜态内部類,同過靜态内部類加載  InterceptorDemo  對象
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new InterceptorDemo();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
           

②: 将以上java檔案,打jar包上傳至 /opt/install/flume160/lib/ 目錄

③: Linux中在job目錄下建立 netcat-flume-logerhdfs.conf 檔案,編輯内容如下:

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#設定攔截器,注意設定攔截器所在jar包位置: cn.com.InterceptorDemo$Builder ,用$連接配接
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.com.InterceptorDemo$Builder

#設定選擇器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.gree = c1
a1.sources.r1.selector.mapping.lijia = c2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = spark
a1.sinks.k1.hdfs.fileSuffix = .csv
a1.sinks.k1.hdfs.path = hdfs://192.168.206.129:9000/data/sparkdemo/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 640
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 100
a1.sinks.k1.hdfs.rollInterval = 3

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.filePrefix = tmp
a1.sinks.k2.hdfs.fileSuffix = .csv
a1.sinks.k2.hdfs.path = hdfs://192.168.206.129:9000/data/tmpdemo/%Y-%m-%d
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.batchSize = 640
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollSize = 100
a1.sinks.k2.hdfs.rollInterval = 3

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
           

④: 測試連接配接

telnet localhost 44444
           

分别輸入資料

sparkabc
spark abc 123
abcdefg
123456
sparktest
           

可以在hdfs端口: http://192.168.206.129:50070/ 檢視攔截器是否分類讀取上傳至 hdfs/data 目錄下

3.7 flume讀取資料至kafka

例① 将讀取的資料直接儲存至kafka中:

vi userfriends-flume-kafka.conf ,内容如下:

user_friends.sources = userFriendsSource
user_friends.channels = userFriendsChannel
user_friends.sinks = userFriendsSink

user_friends.sources.userFriendsSource.type = spooldir
user_friends.sources.userFriendsSource.spoolDir = /opt/dataFile/flumeFile/user_friends

user_friends.sources.userFriendsSource.deserializer = LINE
user_friends.sources.userFriendsSource.deserializer.maxLineLength = 60000
user_friends.sources.userFriendsSource.includePattern = userfriends_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

user_friends.channels.userFriendsChannel.type = file
user_friends.channels.userFriendsChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/userFriends

user_friends.channels.userFriendsChannel.dataDir = /opt/dataFile/flumeFile/data/userFriends
 
user_friends.sinks.userFriendsSink.type = org.apache.flume.sink.kafka.KafkaSink
user_friends.sinks.userFriendsSink.batchSize = 640
user_friends.sinks.userFriendsSink.brokerList = 192.168.206.129:9092
user_friends.sinks.userFriendsSink.topic = user_friends_raw

user_friends.sources.userFriendsSource.channels = userFriendsChannel
user_friends.sinks.userFriendsSink.channel = userFriendsChannel
           

例② 使用flume自帶的攔截器過濾掉首行,如下圖:

想要過濾掉首行字段:

Flume日志收集系統詳解----硬核解析Flume日志收集系統詳解一、Flume簡介二、Flume原理三、flume建立執行個體

vi users-flume-kafka.conf ,内容如下:

users.sources = usersSource
users.channels = usersChannel
users.sinks = usersSink

users.sources.usersSource.type = spooldir
users.sources.usersSource.spoolDir = /opt/dataFile/flumeFile/users
users.sources.usersSource.deserializer = LINE
users.sources.usersSource.deserializer.maxLineLength = 3000
users.sources.usersSource.includePattern = users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

#設定自帶的攔截器
users.sources.usersSource.interceptors = head_filter
users.sources.usersSource.interceptors.head_filter.type = regex_filter
users.sources.usersSource.interceptors.head_filter.regex = ^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents = true

users.channels.usersChannel.type = file
users.channels.usersChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/users
users.channels.usersChannel.dataDir = /opt/dataFile/flumeFile/data/users

users.sinks.usersSink.type = org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize = 640
users.sinks.usersSink.brokerList = 192.168.206.129:9092
users.sinks.usersSink.topic = users 

users.sources.usersSource.channels = usersChannel
users.sinks.usersSink.channel = usersChannel
           

例③ 将讀取資料同時儲存到 kafka 和 hdfs 中:

vi train-flume-hdfs_kafka.conf ,内容如下:

train.sources = trainSource
train.channels = kafkaChannel hdfsChannel
train.sinks = kafkaSink hdfsSink

train.sources.trainSource.type = spooldir
train.sources.trainSource.spoolDir = /opt/dataFile/flumeFile/train
train.sources.trainSource.deserializer = LINE
train.sources.trainSource.deserializer.maxLineLength = 3000
train.sources.trainSource.trainSource.includePattern = train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

#使用自帶攔截器過濾首行
train.sources.trainSource.interceptors = head_filter
train.sources.trainSource.interceptors.head_filter.type = regex_filter
train.sources.trainSource.interceptors.head_filter.regex = ^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents = true

train.channels.kafkaChannel.type = file
train.channels.kafkaChannel.checkpointDir =  /opt/dataFile/flumeFile/checkpoint/train
train.channels.kafkaChannel.dataDir = 
/opt/dataFile/flumeFile/data/train

train.channels.hdfsChannel.type = memory
train.channels.hdfsChannel.capacity = 64000
train.channels.hdfsChannel.transactionCapacity = 16000

train.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
train.sinks.kafkaSink.batchSize = 640
train.sinks.kafkaSink.brokerList = 192.168.206.129:9092
train.sinks.kafkaSink.topic = train

train.sinks.hdfsSink.type = hdfs
train.sinks.hdfsSink.hdfs.fileType = DataStream
train.sinks.hdfsSink.hdfs.filePrefix = train
train.sinks.hdfsSink.hdfs.fileSuffix = .csv
train.sinks.hdfsSink.hdfs.path = hdfs://192.168.206.129:9000/data/train/%Y-%m-%d
train.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
train.sinks.hdfsSink.hdfs.batchSize = 6400
train.sinks.hdfsSink.hdfs.rollCount = 0
train.sinks.hdfsSink.hdfs.rollSize = 64000000
train.sinks.hdfsSink.hdfs.rollInterval = 10

train.sources.trainSource.channels = hdfsChannel kafkaChannel
train.sinks.hdfsSink.channel = hdfsChannel
train.sinks.kafkaSink.channel = kafkaChannel