Flume内置了大量的Sourece,其中Avro Source、Thrift Source、Spooling Directory Source、Kafka Source具有較好的性能和較廣泛的使用場景,下面主要介紹這幾種Source。
類型 | 說明 |
---|---|
Avro Source | 支援Avro協定(實際上是Avro RPC),内置支援。 |
Thrift Source | 支援Thrift協定,内置支援。 |
Exec Source | 基于Unix的command在标準輸出上生産資料。 |
JMS Source | 從JMS系統(消息、主題)中讀取資料,該Source目前隻在ActiveMQ中測試。 |
Spooling Directory Source | 監控指定目錄内資料變更。 |
Twitter 1% firehose Source | 通過API持續下載下傳Twitter資料,試驗性質。 |
Netcat Source | 監控某個端口,将流經端口的每一個文本行資料作為Event輸入。 |
Sequence Generator Source | 序列生成器資料源,生産序列資料。 |
Syslog Sources | 讀取syslog資料,産生Event,支援UDP和TCP兩種協定。 |
HTTP Source | 基于HTTP POST或GET方式的資料源,支援JSON、BLOB表示形式。 |
Kafka Source | 從Kafka Topic中讀取資料。 |
Stress Source | 壓力測試用。 |
Legacy Sources | 相容老的Flume OG中Source(0.9.x版本) |
Custom Source | 自定義Source |
Scribe Source |
Avro Source
Avro Source監聽Avro端口,接收外部Avro用戶端發送過來的Avro Event資料。在多級流中,Avro Source可以和前一個Flume Agent的Avro Sink配對,建立分層收集拓撲。Avro Source的配置如下表所示,表中隻列出必填項,更多配置項請參考官方使用者手冊。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | Source的類型名稱,值為avro。 | |
port | Source監聽的端口 | |
bind | 伺服器主機名或者IP位址 |
Avro Source配置參考如下,a1為Agent的執行個體名稱。
a1.sources = r1 #a1中的Source
a1.channels = c1 #a1中的Channel
a1.sources.r1.type = avro #Source的類型
a1.sources.r1.channels = c1 #指定Source r1資料發送到的Channel
a1.sources.r1.bind = #Source綁定的位址
a1.sources.r1.port = #Source監聽的端口
Thrift Source
Thrift Source監聽Thrift端口,接收外部Thrift用戶端發送過來的Thrift Event資料。在多級流中,Thrift Source可以和前一個Flume Agent的Thrift Sink配對,建立分層收集拓撲。Thrift Source支援基于Kerberos身份驗證的安全模式。Thrift Source的配置如下表所示,表中隻列出必填項,更多配置項請參考官方使用者手冊。Thrift Source支援基于Kerberos身份驗證的安全模式。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | Source的類型名稱,type值為thrift。 | |
port | Source監聽的端口 | |
bind | 伺服器主機名或者IP位址 |
Thrift Source配置參考如下,a1為Agent的執行個體名稱。
a1.sources = r1 #a1中的Source
a1.channels = c1 #a1中的Channel
a1.sources.r1.type = thrift #Source的類型
a1.sources.r1.channels = c1 #指定Source r1資料發送到的Channel
a1.sources.r1.bind = #Source綁定的位址
a1.sources.r1.port = #Source監聽的端口
Exec Source
Exec Source在啟動時調用的Unix指令,該指令程序會持續地把标準日志資料輸出到Exec Source,如果指令程序關閉,Exec Source也會關閉。Exec Source支援cat [named pipe]或者tail -F [file]指令。Exec Source最大的問題就是資料有可能丢失,因為當Channel接收Exec Source資料出錯時或者抛出異常時,Exec Client并不能捕獲到該錯誤。建議使用Spooling Directory Source代替。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | Source的類型名稱,type值為exec。 | |
command | 運作的指令 |
Thrift Source配置參考如下,a1為Agent執行個體名稱。
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
JMS Source
JMS Source從隊列或者Topic中讀取資料,目前隻在ActiveMQ中測試。在使用JMS Source時,必須在Flume ClassPath中添加JMS JAR包。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | Source的類型名稱,type值為jms。 | |
initialContextFactory | org.apache.activemq.jndi.ActiveMQInitialContextFactory | |
connectionFactory | JNDI名稱 | |
providerURL | JMS Provider URL | |
destinationName | ||
destinationType | 取值queue或者topic |
JMS Source配置參考如下,a1為Agent執行個體名稱。
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory =
org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
Spooling Directory Source
Spooling Directory Source監聽系統上的指定目錄,當目錄中有新的檔案建立時,Spooling Directory Source會把新檔案的内容讀取并推送到Channel中,并且把已讀取的檔案重命名成指定格式或者把檔案删除。由于資料是以檔案的形式存放的系統中,Spooling Directory Source的可靠性非常高,即使是Flume Agent崩潰或者重新開機,資料也可以恢複。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | Source的類型名稱,type值為spooldir。 | |
spoolDir | Source監聽的目錄 |
Spooling Directory Source配置參考如下,a1為Agent執行個體名稱。
a1.sources = s1 #a1中的Source
a1.channels = c1 #a1中的Channel
a1.sources.s1.type = spooldir #Source的類型
a1.sources. s1.channels = c1 #指定Source r1資料發送到的Channel
a1.sources. s1.spoolDir = /var/log/apache/flumeSpool
Kafka Source
Kafka Source從消息隊列Kafka Topic中讀取日志消息,Kafka Source相當于消息隊列的Consumer。可以把多個Kafka Source配置到同一個分組裡面,這樣每個Source都可以讀取同一個Topic中的資料,進而提高性能。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | org.apache.flume.source.kafka,KafkaSource | |
zooKeeperConnect | Kafka叢集的ZooKeeper路徑 | |
groupId | flume | Kafka Source分組 |
topic | Kafka Topic |
Kafka Source配置參考如下,a1為Agent執行個體名稱。
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.channels = channel1
a1.sources.source1.zookeeperConnect = localhost:
a1.sources.source1.topic = test1
a1.sources.source1.groupId = flume
a1.sources.source1.kafka.consumer.timeout.ms =
NetCat Source
NetCat Source監聽指定的端口,把接收到的資料按行劃分,每行文本都封裝成一個Event資料發送給Channel。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | netcat | |
bind | 位址 | |
port | 端口 |
Netcat Source配置參考如下, a1為Agent執行個體名稱。
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind =
a1.sources.r1.bind =
a1.sources.r1.channels = c1
HTTP Source
HTTP Source接收POST和GET發送的Event資料,其中GET主要使用者測試,不建議生産環境使用。HTTP資料通過handler(實作HTTPSourceHandler接口)轉換成Event,該handler接收HttpServletRequest并傳回Event數組。如果handler出現異常,HTTP Source傳回400錯誤。如果Channel滿了或者Channel無法接收Event,HTTP Source傳回503錯誤。
配置項 | 預設值 | 說明 |
---|---|---|
channels | 與Source連接配接的通道名稱。 | |
type | http | |
port | HTTP Source監聽的端口 | |
handler | org.apache.flume.source.http.JSONHandler | 可選值有org.apache.flume.source.http.JSONHandler和org.apache.flume.sink.solr.morphline.BlobHandler |
HTTP Source配置參考如下, a1為Agent的執行個體名稱。
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port =
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props