天天看點

Flume配置總結一、概述二、基本概念三、元件四、流動方式五、Source模式六、interceptor七、Processor八、事務機制

目錄

一、概述

二、基本概念

三、元件

1. source

1.1 NetCat Source

1.2 Avro Source

1.3 Exec Source

1.4 Spooling Directory Source

1.5 Squence Generator Source

1.6 HTTP Source

1.7 自定義Custom Source

2. Channel

2.1 Memory Channel

2.2 File Channel

2.3 JDBC Channel

2.4 記憶體溢出通道

3. Sink

3.1 Logger Sink

3.2 File_roll Sink

3.3 HDFS Sink

3.4 Avro Sink

四、流動方式

1. 多級流動

2. 扇出流

3. 扇入案

五、Source模式

1. 複制模式

2. 路由模式

六、interceptor

1. 時間戳Timestamp Interceptor

案列:存到HDFS上,并且按天存放

2. Search And Replace Interceptor

3. Regex Filtering Interceptor

七、Processor

1. 崩潰恢複 Failover Sink Processor

2. 負載均衡 Load Balancing Sink Processor

八、事務機制

1. source——channel:put事務流程

2. channel——sink:Take事務

一、概述

  1. Flume是Cloudera設計的,後來貢獻給了Apache
  2. Flume是一個分布式、可靠的、用于進行日志收集的系統 - 采集、彙總、傳輸
  3. Flume中傳輸的每一條資料是一條日志

二、基本概念

  1. Event:在Flume中,将它傳輸的每一條日志封裝成一個Event對象,也就意味着在Flume中傳輸的Event對象。Event對象展現形式是一個json串,這個json串分為了兩部分:headers和body
  2. Agent:在Flume中,傳輸資料用的元件就是Agent。Agent中包含了三個子元件:

Source:從資料源采集資料

Channel:臨時存儲資料

Sink:将資料發往目的地

      3. 流動方式:單級流動,多級流動,扇入流動,扇出流動     (每一個Sink隻能綁定一個Channel)

三、元件

1. source

1.1 NetCat Source

  1. NetCat Source用來監聽一個指定端口,并接收監聽到的資料,接收的資料是字元串形式
  2. 在測試時使用,實際開發少

配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity=1000

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

用戶端:

[[email protected] software]# rpm -ivh nc-1.84-22.el6.x86_64.rpm     //安裝nc

[[email protected] software]# nc lj02 8090

hello

OK

1.2 Avro Source

  1. avro-source接收到的是經過avro序列化後的資料,然後反序列化資料繼續傳輸。
  2. 利用Avro source可以實作多級流動、扇出流、扇入流等效果
  3. 可以接收通過flume提供的avro用戶端發送的日志資訊

配置:

a1.sources=s1

a1.channels=c1

a1.sinks=k1

a1.sources.s1.type=avro

a1.sources.s1.bind=0.0.0.0

a1.sources.s1.port=8090

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.s1.channels=c1

a1.sinks.k1.channel=c1

啟動伺服器端:

[[email protected] bin]# flume-ng agent -n a1 -c ../conf/ -f ../data/2avrosource.conf  -Dflume.root.logger=INFO,console 

 出現Avro source s1 started.  說明啟動成功

avro用戶端啟動

[[email protected] bin]# flume-ng avro-client -H 0.0.0.0 -p 8090 -F /home/a.txt -c ../conf/

在伺服器端可以看見檔案裡面的資料

Event: { headers:{} body: 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A jjjjjjjjjjjjjjjj }

1.3 Exec Source

  1. 可以将指令産生的輸出作為源來進行傳遞

配置

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = exec

a1.sources.s1.command = cat /home/a.txt

a1.channels.c1.type = memory

a1.channels.c1.capacity =1000

a1.channels.c1.transactionCapacity =100

a1.sinks.k1.type=logger

a1.sources.s1.channels=c1

a1.sinks.k1.channel=c1

伺服器端啟動:[[email protected] bin]# flume-ng agent -n a1 -c ../conf/ -f ../data/3execsource  -Dflume.root.logger=INFO,console

直接顯示結果:

Event: { headers:{} body: 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A 6A jjjjjjjjjjjjjjjj }

2019-07-05 11:28:39,750 (pool-3-thread-1) [INFO - org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:376)] Command [cat /home/a.txt] exited with 0

1.4 Spooling Directory Source

  1. flume會持續監聽指定的目錄,把放入這個目錄中的檔案當做source來處理
  2. 注意:一旦檔案被放到“自動收集”目錄中後,便不能修改,如果修改,flume會報錯。也不能有重名的檔案,負責flume也會報錯

配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = spooldir

a1.sources.s1.spoolDir = /home/flumedata

a1.channels.c1.type = memory

a1.channels.c1.capacity =1000

a1.channels.c1.transactionCapacity =100

a1.sinks.k1.type=logger

a1.sources.s1.channels=c1

a1.sinks.k1.channel=c1

伺服器端啟動:

[[email protected] bin]# flume-ng agent -n a1 -c ../conf/ -f ../data/4spolldirsource.conf  -Dflume.root.logger=INFO,console

檔案再被監聽後,名稱後面會有變化 a.txt.COMPLETED  檔案已被監聽。

1.5 Squence Generator Source

  1. 一個簡單的序列發生器,不斷的産生事件,值是從0開始每次遞增1,主要用來測試

配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = seq

a1.sources.s1.batchSize = 5

a1.channels.c1.type = memory

a1.channels.c1.capacity =1000

a1.channels.c1.transactionCapacity =100

a1.sinks.k1.type=logger

a1.sources.s1.channels=c1

a1.sinks.k1.channel=c1

伺服器端啟動:

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf/ -f ../data/5seqgensource.conf  -Dflume.root.logger=INFO,console

列印結果:是有順序的一大批數字

1.6 HTTP Source

  1. 此Source接受HTTP的GET和POST請求作為Flume的事件
  2. GET方式隻用于試驗,是以實際使用過程中以POST請求居多
  3. 如果想讓flume正确解析Http協定資訊,比如解析出請求頭、請求體等資訊,需要提供一個可插拔的"處理器"來将請求轉換為事件對象,這個處理器必須實作HTTPSourceHandler接口。這個處理器接受一個 HttpServletRequest對象,并傳回一個Flume Envent對象集合

配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = http

a1.sources.s1.port = 8070

a1.channels.c1.type = memory

a1.channels.c1.capacity = 500

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

執行啟動指令

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f httpsource.conf  -Dflume.root.logger=INFO,console

另一個機器執行:執行curl 指令,模拟一次http的Post請求:

curl -X POST -d '[{"headers":{"name":"lj","age":"20"},"body":"you are beautiful"}]'  http://0.0.0.0:8090

1.7 自定義Custom Source

  1. 通常情況下,Flume提供的source應該是夠用的,但是有時候因為業務場景的需求,可能會導緻Flume中提供的source不夠用,那這個時候就需要自定義Source
  2. 實作Configurable,作用是用于擷取檔案中的配置
  3. 實作EventDrivenSource或者PollableSource

3.1 EventDrivenSource:事件驅動source,被動等待 - 隻要處理完資料就會一直等待,等待有新資料過來 - Avro Source、HTTP Source、NetCat Source - 覆寫start和stop方法,啟動邏輯放在start中,中止回收是放在stop中

3.2 PollableSource:輪訓發送Source,主動産生資料發送資料 - 自帶了線程,會不斷的調用這個線程往channel發資料 - Sequence Generator Source、Spooling Directory Source、Exec Source - 覆寫process方法,source的執行邏輯是放在process方法中

2. Channel

2.1 Memory Channel

  1. 事件将被存儲在記憶體中(指定大小的隊列裡)
  2. 非常适合那些需要高吞吐量且允許資料丢失的場景下

2.2 File Channel

  1. 将資料臨時存儲到計算機的磁盤的檔案中
  2. 性能比較低,但是即使程式出錯資料不會丢失

[[email protected] data]# vim filechannel.conf 

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = file

a1.channels.c1.dataDirs = /home/flumechannels

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

啟動:

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f filechannel.conf  -Dflume.root.logger=INFO,console

另一台機器:

[[email protected] data]# cd /home/

[[email protected] home]# mkdir flumechannels

[[email protected] home]# nc 0.0.0.0 8090

gahgha

OK

注:Ctrl+倒退鍵,可以修改資料

2.3 JDBC Channel

  1. 事件會被持久化(存儲)到可靠的資料庫裡
  2. 目前隻支援嵌入式Derby資料庫。但是Derby資料庫不太好用,是以JDBC Channel目前僅用于測試,不能用于生産環境。
  3. 是檔案型,切換路徑資料就找不到,在哪個路徑操作,就會産生 derby.meta檔案
  4. 是單連接配接,隻能一個用戶端連接配接,不能調節連接配接數量(大資料講究高并發,分布式)

2.4 記憶體溢出通道

  1. 優先把Event存到記憶體中,如果存不下,在溢出到檔案中
  2. 目前處于測試階段,還未能用于生産環境

3. Sink

3.1 Logger Sink

  1. 記錄指定級别(比如INFO,DEBUG,ERROR等)的日志,通常用于調試
  2. 要求,在 --conf(-c )參數指定的目錄下有log4j的配置檔案
  3. 根據設計,logger sink将body内容限制為16位元組,進而避免螢幕充斥着過多的内容。如果想要檢視調試的完整内容,那麼你應該使用其他的sink,也許可以使用file_roll sink,它會将日志寫到本地檔案系統中

3.2 File_roll Sink

  1. 每隔指定時長生成檔案儲存這段時間内收集到的日志資訊.

配置:

[[email protected] data]# vim filerollsink.conf

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = file

a1.channels.c1.dataDirs = /home/flumechannels

a1.sinks.k1.type = file_roll

a1.sinks.k1.sink.directory = /home/flumedata   //檔案被存儲的目錄

a1.sinks.k1.sink.rollInterval = 3600  //channel中的資料每隔1小時産生一個新檔案

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

啟動測試

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f filerollsink.conf  -Dflume.root.logger=INFO,console

換機器:

[[email protected] home]# mkdir flumedata

[[email protected] home]# nc 0.0.0.0 8090

lijing

OK

lijing

OK

換機器:我剛剛寫的兩條資料已經自動到了指定目錄下自動生成的檔案裡面了

[[email protected] data]# cd /home/flumedata/

[[email protected] flumedata]# ls

          1560142854808-1

[[email protected] flumedata]# cat 1560142854808-1 

    lijing

    lijing

3.3 HDFS Sink

  1. 目前它支援建立文本檔案和序列化檔案,并且對這兩種格式都支援壓縮
  2. 将事件寫入到Hadoop分布式檔案系統HDFS中

配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://hlj01:9000/flume  //HDFS 目錄路徑

a1.sinks.k1.hdfs.fileType = DataStream  //SequenceFile-序列化檔案,DataStream-文本檔案,CompressedStream-壓縮檔案

a1.sinks.k1.hdfs.rollInterval = 3600  //檔案生成的間隔事件

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

啟動:

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f hdfssink.conf  -Dflume.root.logger=INFO,console

[[email protected] home]# nc 0.0.0.0 8090

lijing

OK

到hdfs檢視

3.4 Avro Sink

  1. 将源資料利用avro進行序列化之後寫到指定的節點上
  2. 是實作多級流動、扇出流(1到多) 扇入流(多到1) 的基礎

四、流動方式

1. 多級流動

Flume配置總結一、概述二、基本概念三、元件四、流動方式五、Source模式六、interceptor七、Processor八、事務機制

配置:vim multi.conf 

01機的配置示例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = lj02

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

傳到其他機器:

[[email protected] data]# scp -r /home/software/flume-1.6.0/  [email protected]:/home/software/

02機的配置示例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = lj03

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

03機的配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

啟動測試: 啟動順序 3 2 1 ,每隔機器都執行下面的啟動指令

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f multi.conf -Dflume.root.logger=INFO,console

複制一個機器01

[[email protected] home]# nc 0.0.0.0 8090

lijing

OK

我們可以看到資料從01機器到了03機器了 

2. 扇出流

編輯配置檔案:vim shanchu.conf 

01機的配置檔案

#配置Agent a1 的元件

a1.sources=s1

a1.channels=c1 c2

a1.sinks=k1 k2

#描述/配置a1的source1

a1.sources.s1.type=netcat

a1.sources.s1.bind=0.0.0.0

a1.sources.s1.port=8090

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory

a1.channels.c2.capacity=1000

a1.channels.c2.transactionCapacity=100

#描述sink

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=lj02

a1.sinks.k1.port=8090

a1.sinks.k2.type=avro

a1.sinks.k2.hostname=lj03

a1.sinks.k2.port=8090

a1.sources.s1.channels=c1 c2

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c2

02,03配置示例:

a1.sources=s1

a1.channels=c1 

a1.sinks=k1 

a1.sources.s1.type=avro

a1.sources.s1.bind=0.0.0.0

a1.sources.s1.port=8090

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.s1.channels=c1

a1.sinks.k1.channel=c1

啟動測試: 啟動順序 3 2 1 

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f shanchu.conf -Dflume.root.logger=INFO,console

[[email protected] home]# nc 0.0.0.0 8090

lijing

OK

效果:01節點發送消息,2,3 節點都收到消息

3. 扇入案

Flume配置總結一、概述二、基本概念三、元件四、流動方式五、Source模式六、interceptor七、Processor八、事務機制

01節點:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = lj03

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

02節點:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = spooldir

a1.sources.s1.bind = /home/flumedata

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = lj03

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

[[email protected] home]# mkdir flumedata

[[email protected] home]# cd flumedata/

[[email protected] flumedata]# vim a.txt

03節點配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

啟動指令:

 [[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f  shanru.conf -Dflume.root.logger=INFO,console

[[email protected] home]# nc 0.0.0.0 8090

lijing

OK

五、Source模式

1. 複制模式

  1. 在複制模式下,當source接收到資料後,會複制多分,分發給每一個avro sink
a1.source.r1.selector.type = replicating(這個是預設的)
Flume配置總結一、概述二、基本概念三、元件四、流動方式五、Source模式六、interceptor七、Processor八、事務機制

2. 路由模式

  1. 使用者可以指定轉發的規則。selector根據規則進行資料的分發。基于扇入模式,用http,source

配置:vim selector.conf

01機器

a1.sources=s1

a1.channels=c1 c2

a1.sinks=k1 k2

#描述/配置a1的source1

a1.sources.s1.type=http

a1.sources.s1.selector.type=multiplexing  //multiplexing  表示路由模式

a1.sources.s1.port=8090

a1.sources.s1.selector.header=name  //指定要監測的頭的名稱

a1.sources.s1.selector.mapping.xiaoli=c1  比對規則

a1.sources.s1.selector.mapping.bobo=c2

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory

a1.channels.c2.capacity=1000

a1.channels.c2.transactionCapacity=100

#描述sink

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=lj02

a1.sinks.k1.port=8090

a1.sinks.k2.type=avro

a1.sinks.k2.hostname=lj03

a1.sinks.k2.port=8090

a1.sources.s1.channels=c1 c2

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c2

02,03配置示例:用 shanchu.conf的内容

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f shanchu.conf -Dflume.root.logger=INFO,console

01節點:

[[email protected] ~]# curl -X POST -d '[{"headers":{"name":"xiaoli","age":"20"},"body":"hello I am fine"}]'  http://0.0.0.0:8090

[[email protected] ~]# curl -X POST -d '[{"headers":{"name":"bobo","age":"18"},"body":"hello happy"}]'  http://0.0.0.0:8090

對應的節點才會收到資訊。

六、interceptor

  1. 攔截器采用了責任鍊模式,多個攔截器可以按指定順序攔截

1. 時間戳Timestamp Interceptor

配置檔案:Vim timestamp.conf

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type=netcat

a1.sources.s1.bind=0.0.0.0

a1.sources.s1.port=8090

a1.sources.s1.interceptors=i1

a1.sources.s1.interceptors.i1.type=timestamp

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

啟動:

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f timestamp.conf -Dflume.root.logger=INFO,console

測試:

[[email protected] home]# nc 0.0.0.0 8090

lijing

OK

效果:在頭多了一個時間

 Event: { headers:{timestamp=1560151351709} body: 6C 6B 6B     

案列:存到HDFS上,并且按天存放

配置檔案:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type=netcat

a1.sources.s1.bind=0.0.0.0

a1.sources.s1.port=8090

a1.sources.s1.interceptors=i1

a1.sources.s1.interceptors.i1.type=timestamp

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://hlj01:9000/log/time=%Y-%m-%D

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

 到hdfs檢視:lj02:50070

2. Search And Replace Interceptor

  1. 基于字元串的正則搜尋和替換功能,過濾替換

 配置檔案:vim search.conf

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type=netcat

a1.sources.s1.bind=0.0.0.0

a1.sources.s1.port=8090

a1.sources.s1.interceptors=i1

a1.sources.s1.interceptors.i1.type= search_replace 

a1.sources.s1.interceptors.i1.searchPattern= [0-9]    //要搜尋和替換的正規表達式

a1.sources.s1.interceptors.i1.replaceString= #    //要替換為的字元串

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = loggera1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

[[email protected] data]# ../bin/flume-ng agent -n a1 -c ../conf -f search.conf -Dflume.root.logger=INFO,console

[[email protected] data]# nc 0.0.0.0 8090

li123jing

OK

接收到  :li###jing

3. Regex Filtering Interceptor

  1. 用來包含或刨除事件

配置:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type=netcat

a1.sources.s1.bind=0.0.0.0

a1.sources.s1.port=8090

a1.sources.s1.interceptors=i1

a1.sources.s1.interceptors.i1.type= regex_filter

a1.sources.s1.interceptors.i1.regex= ^.*[0-9].*$    // 所要比對的正規表達式

a1.sources.s1.interceptors.i1.excludeEvents=true    //如果是true則刨除比對的事件,false則包含比對的事件。

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = loggera1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

七、Processor

  1. Sink Group允許使用者将多個Sink組合成一個實體
  2. Flume Sink Processor 可以通過切換組内Sink用來實作負載均衡的效果,或在一個Sink故障時切換到另一個Sink

1. 崩潰恢複 Failover Sink Processor

配置:

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

#優先級

a1.sinkgroups.g1.processor.priority.k1 = 5 //設定優先級,注意,每個sink的優先級必須是唯一的

a1.sinkgroups.g1.processor.priority.k2 =7

優先發給優先級高的,高的挂了。等一會。才發給優先級低的

如果沒有指定優先級,則優先級順序取決于sink們的配置順序,先配置的預設優先級高于後配置的

2. 負載均衡 Load Balancing Sink Processor

  1. 提供了在多個sink之間實作負載均衡的能力
  2. 它支援輪詢(round_robin)或随機方式(random),哈希取模(hash)的負載均衡,預設值是輪詢方式,通過配置指定
  3. 也可以通過實作AbstractSinkSelector接口實作自定義的選擇機制

配置:

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=s1 s2

a1.sinkgroups.g1.processor.type=load_balance

a1.sinkgroups.g1.processor.selector=round_robin  // 輪叫排程算法(輪詢發送)

八、事務機制

1. source——channel:put事務流程

  1. doPut:将批資料先寫入臨時緩沖區putList(Linkedblockingdequeue)
  2. doCommit:檢查channel記憶體隊列是否足夠合并。
  3. doRollback:channel記憶體隊列空間不足,復原,等待記憶體通道的容量滿足合并
  4. putList就是一個臨時的緩沖區,資料會先put到putList,最後由commit方法會檢查channel是否有足夠的緩沖區,有則合并到channel的隊列

2. channel——sink:Take事務

  1. doTake:先将資料取到臨時緩沖區takeList(linkedBlockingDequeue)
  2. 将資料發送到下一個節點
  3. doCommit:如果資料全部發送成功,則清除臨時緩沖區takeList
  4. doRollback:資料發送過程中如果出現異常,rollback将臨時緩沖區takeList中的資料歸還給channel記憶體隊列