天天看點

Flume學習02 — Source

Flume内置了大量的Sourece,其中Avro Source、Thrift Source、Spooling Directory Source、Kafka Source具有較好的性能和較廣泛的使用場景,下面主要介紹這幾種Source。

類型 說明
Avro Source 支援Avro協定(實際上是Avro RPC),内置支援。
Thrift Source 支援Thrift協定,内置支援。
Exec Source 基于Unix的command在标準輸出上生産資料。
JMS Source 從JMS系統(消息、主題)中讀取資料,該Source目前隻在ActiveMQ中測試。
Spooling Directory Source 監控指定目錄内資料變更。
Twitter 1% firehose Source 通過API持續下載下傳Twitter資料,試驗性質。
Netcat Source 監控某個端口,将流經端口的每一個文本行資料作為Event輸入。
Sequence Generator Source 序列生成器資料源,生産序列資料。
Syslog Sources 讀取syslog資料,産生Event,支援UDP和TCP兩種協定。
HTTP Source 基于HTTP POST或GET方式的資料源,支援JSON、BLOB表示形式。
Kafka Source 從Kafka Topic中讀取資料。
Stress Source 壓力測試用。
Legacy Sources 相容老的Flume OG中Source(0.9.x版本)
Custom Source 自定義Source
Scribe Source

Avro Source

Avro Source監聽Avro端口,接收外部Avro用戶端發送過來的Avro Event資料。在多級流中,Avro Source可以和前一個Flume Agent的Avro Sink配對,建立分層收集拓撲。Avro Source的配置如下表所示,表中隻列出必填項,更多配置項請參考官方使用者手冊。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type Source的類型名稱,值為avro。
port Source監聽的端口
bind 伺服器主機名或者IP位址

Avro Source配置參考如下,a1為Agent的執行個體名稱。

a1.sources = r1                #a1中的Source
a1.channels = c1               #a1中的Channel
a1.sources.r1.type = avro      #Source的類型
a1.sources.r1.channels = c1    #指定Source r1資料發送到的Channel
a1.sources.r1.bind =    #Source綁定的位址
a1.sources.r1.port =       #Source監聽的端口
           

Thrift Source

Thrift Source監聽Thrift端口,接收外部Thrift用戶端發送過來的Thrift Event資料。在多級流中,Thrift Source可以和前一個Flume Agent的Thrift Sink配對,建立分層收集拓撲。Thrift Source支援基于Kerberos身份驗證的安全模式。Thrift Source的配置如下表所示,表中隻列出必填項,更多配置項請參考官方使用者手冊。Thrift Source支援基于Kerberos身份驗證的安全模式。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type Source的類型名稱,type值為thrift。
port Source監聽的端口
bind 伺服器主機名或者IP位址

Thrift Source配置參考如下,a1為Agent的執行個體名稱。

a1.sources = r1                #a1中的Source
a1.channels = c1               #a1中的Channel
a1.sources.r1.type = thrift    #Source的類型
a1.sources.r1.channels = c1    #指定Source r1資料發送到的Channel
a1.sources.r1.bind =    #Source綁定的位址
a1.sources.r1.port =       #Source監聽的端口
           

Exec Source

Exec Source在啟動時調用的Unix指令,該指令程序會持續地把标準日志資料輸出到Exec Source,如果指令程序關閉,Exec Source也會關閉。Exec Source支援cat [named pipe]或者tail -F [file]指令。Exec Source最大的問題就是資料有可能丢失,因為當Channel接收Exec Source資料出錯時或者抛出異常時,Exec Client并不能捕獲到該錯誤。建議使用Spooling Directory Source代替。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type Source的類型名稱,type值為exec。
command 運作的指令

Thrift Source配置參考如下,a1為Agent執行個體名稱。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
           

JMS Source

JMS Source從隊列或者Topic中讀取資料,目前隻在ActiveMQ中測試。在使用JMS Source時,必須在Flume ClassPath中添加JMS JAR包。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type Source的類型名稱,type值為jms。
initialContextFactory org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory JNDI名稱
providerURL JMS Provider URL
destinationName
destinationType 取值queue或者topic

JMS Source配置參考如下,a1為Agent執行個體名稱。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = 
org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
           

Spooling Directory Source

Spooling Directory Source監聽系統上的指定目錄,當目錄中有新的檔案建立時,Spooling Directory Source會把新檔案的内容讀取并推送到Channel中,并且把已讀取的檔案重命名成指定格式或者把檔案删除。由于資料是以檔案的形式存放的系統中,Spooling Directory Source的可靠性非常高,即使是Flume Agent崩潰或者重新開機,資料也可以恢複。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type Source的類型名稱,type值為spooldir。
spoolDir Source監聽的目錄

Spooling Directory Source配置參考如下,a1為Agent執行個體名稱。

a1.sources = s1                 #a1中的Source
a1.channels = c1                #a1中的Channel
a1.sources.s1.type = spooldir   #Source的類型
a1.sources. s1.channels = c1    #指定Source r1資料發送到的Channel
a1.sources. s1.spoolDir = /var/log/apache/flumeSpool
           

Kafka Source

Kafka Source從消息隊列Kafka Topic中讀取日志消息,Kafka Source相當于消息隊列的Consumer。可以把多個Kafka Source配置到同一個分組裡面,這樣每個Source都可以讀取同一個Topic中的資料,進而提高性能。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type org.apache.flume.source.kafka,KafkaSource
zooKeeperConnect Kafka叢集的ZooKeeper路徑
groupId flume Kafka Source分組
topic Kafka Topic

Kafka Source配置參考如下,a1為Agent執行個體名稱。

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.channels = channel1
a1.sources.source1.zookeeperConnect = localhost:
a1.sources.source1.topic = test1
a1.sources.source1.groupId = flume
a1.sources.source1.kafka.consumer.timeout.ms = 
           

NetCat Source

NetCat Source監聽指定的端口,把接收到的資料按行劃分,每行文本都封裝成一個Event資料發送給Channel。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type netcat
bind 位址
port 端口

Netcat Source配置參考如下, a1為Agent執行個體名稱。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 
a1.sources.r1.bind = 
a1.sources.r1.channels = c1
           

HTTP Source

HTTP Source接收POST和GET發送的Event資料,其中GET主要使用者測試,不建議生産環境使用。HTTP資料通過handler(實作HTTPSourceHandler接口)轉換成Event,該handler接收HttpServletRequest并傳回Event數組。如果handler出現異常,HTTP Source傳回400錯誤。如果Channel滿了或者Channel無法接收Event,HTTP Source傳回503錯誤。

配置項 預設值 說明
channels 與Source連接配接的通道名稱。
type http
port HTTP Source監聽的端口
handler org.apache.flume.source.http.JSONHandler 可選值有org.apache.flume.source.http.JSONHandler和org.apache.flume.sink.solr.morphline.BlobHandler

HTTP Source配置參考如下, a1為Agent的執行個體名稱。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props