[TOC]
引言
今天針對線上生産環境下單機 flume 拉取kafka資料并存儲資料入Hdfs 出現大批量資料延遲. 在網上官網各種搜尋資料,并結合官網資料,現進行以下總結
1. 線上單機存在問題簡述
目前flume拉取kafa資料量并不大 ,根據flume用戶端日志 ,每半分鐘hdfs檔案寫入一次資料生成檔案
發現問題:
**拉取kafka資料過慢**
2. 解決思路
- 加大kafka拉取資料量
- 加大flume中channel,source,sink 各通道的單條資料量
- 将flume拉取資料單機版本改成多資料拉取,通過flume-avore-sink-> flume-avore-source 進行資料多資料采取并合并
3 加大kafka拉取資料量
3.1 kafka-source簡述
- flume 輸入單線程拉取資料并将資料發送内置channel并通過sink元件進行資料轉發和處理,故對于kafka叢集多副本方式拉取資料的時候,應适當考慮多個flume節點拉取kafka多副本資料,以避免flume節點在多個kafka叢集副本中輪詢。加大flume拉取kafka資料的速率。
- flume-kafka-source 是flume内置的kafka source資料元件,是為了拉取kafka資料,配置如下:
agent.sources = r1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
- flume-kafka-source 的offset是交由zk叢集去維護offset
3.2 kafka-source配置詳解
Kafka Source是一個Apache Kafka消費者,它從Kafka主題中讀取消息。 如果您正在運作多個Kafka源,則可以使用相同的使用者組配置它們,以便每個源都讀取一組唯一的主題分區。
Property Name | Default | Description |
---|---|---|
channels | – | 配置的channels 可配置多個channels 後續文章會說到 |
type | org.apache.flume.source.kafka.KafkaSource | |
kafka.bootstrap.servers | 配置kafka叢集位址 | |
kafka.consumer.group.id | flume | 唯一确定的消費者群體。 在多個源或代理中設定相同的ID表示它們是同一個使用者組的一部分 |
kafka.topics | 你需要消費的topic | |
kafka.topics.regex | 正規表達式,用于定義源訂閱的主題集。 此屬性的優先級高于 ,如果存在則覆寫 。 | |
batchSize | 1000 | 一批中寫入Channel的最大消息數 (優化項) |
batchDurationMillis | 将批次寫入通道之前的最長時間(以毫秒為機關)隻要達到第一個大小和時間,就會寫入批次。(優化項) | |
backoffSleepIncrement | Kafka主題顯示為空時觸發的初始和增量等待時間。 等待時間将減少對空 主題的激進ping操作。 一秒鐘是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。 | |
maxBackoffSleep | 5000 | Kafka主題顯示為空時觸發的最長等待時間。 5秒是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。 |
useFlumeEventFormat | false | 預設情況下,事件從Kafka主題直接作為位元組直接進入事件主體。 設定為true以将事件讀取為Flume Avro二進制格式。 與KafkaSink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時,這将保留在生成端發送的任何Flume标頭。 |
setTopicHeader | true | 設定為true時,将檢索到的消息的主題存儲到标題中,該标題由 屬性定義。 |
topicHeader | topic | 如果 屬性設定為 ,則定義用于存儲接收消息主題名稱的标題的名稱。 如果與Kafka Sink 屬性結合使用,應該小心,以避免在循環中将消息發送回同一主題。 |
migrateZookeeperOffsets | 如果找不到Kafka存儲的偏移量,請在Zookeeper中查找偏移量并将它們送出給Kafka。 這應該是支援從舊版本的Flume無縫Kafka用戶端遷移。 遷移後,可以将其設定為false,但通常不需要這樣做。 如果未找到Zookeeper偏移量,則Kafka配置kafka.consumer.auto.offset.reset定義如何處理偏移量。 檢視[Kafka文檔]( http://kafka.apache.org/documentation.html#newconsumerconfigs )了解詳細資訊 | |
kafka.consumer.security.protocol | PLAINTEXT | 如果使用某種級别的安全性寫入Kafka,則設定為SASL_PLAINTEXT,SASL_SSL或SSL。 |
Other Kafka Consumer Properties | 這些屬性用于配置Kafka Consumer。 可以使用Kafka支援的任何消費者财産。 唯一的要求是在字首為“kafka.consumer”的字首中添加屬性名稱。 例如: |
注意:
Kafka Source會覆寫兩個Kafka使用者參數:source.com将auto.commit.enable設定為“false”,并送出每個批處理。 Kafka源至少保證一次消息檢索政策。 源啟動時可以存在重複項。 Kafka Source還提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的預設值。 不建議修改這些參數。
官方配置示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
本案例kafka-source配置
agent.sources = r1
agent.sources.r1.channels=c1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
官網配置檔案位址 :
kafka-source3.3 配置優化
主要是在放入flume-channels 的批量資料加大
更改參數:
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
更改解釋:
**即每2秒鐘拉取 kafka 一批資料 批資料大小為50000 放入到flume-channels 中 。即flume該節點 flume-channels 輸入端資料已放大**
更改依據:
- 需要配置kafka單條資料 broker.conf 中配置
message.max.bytes
- 目前flume channel sink 元件最大消費能力如何?
4. 加大flume中channel,source,sink 各通道的單條資料量
4.1 source 發送至channels 資料量大小已配置 見 3.3
4.2 channel 配置
The component type name, needs to be | ||
capacity | 100 | 通道中存儲的最大事件數 (優化項) |
transactionCapacity | 每個事務通道從源或提供給接收器的最大事件數 (優化項) | |
keep-alive | 3 | 添加或删除事件的逾時(以秒為機關) |
byteCapacityBufferPercentage | 20 | 定義byteCapacity與通道中所有事件的估計總大小之間的緩沖區百分比,以計算标頭中的資料。 見下文。 |
byteCapacity | see description | 允許的最大總位元組作為此通道中所有事件的總和。 實作隻計算Event ,這也是提供 配置參數的原因。 預設為計算值,等于JVM可用的最大記憶體的80%(即指令行傳遞的-Xmx值的80%)。 請注意,如果在單個JVM上有多個記憶體通道,并且它們碰巧保持相同的實體事件(即,如果您使用來自單個源的複制通道選擇器),那麼這些事件大小可能會因為通道byteCapacity目的而被重複計算。 将此值設定為“0”将導緻此值回退到大約200 GB的内部硬限制。 |
配置 capacity 和 transactionCapacity 值 。預設配置規則為:
$$
channels.capacity >= channels.transactionCapacity >= source.batchSize
官方channels配置示例
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
本案例修改之後的channels 配置
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000
5. 将flume拉取資料單機版本改成多資料拉取,通過flume-avore-sink-> flume-avore-source 進行資料多資料采取并合并
5.1 存在問題
通過上續修改會發現單機版本的flume會在多副本kafka輪詢造成效率浪費
單機版本flume處理資料時會存在單機瓶頸,單機channels可能最多隻能處理最大資料無法擴充
單機flume配置多個資料源不友善,不能适合後續多需求開發
5.2 修改架構
5.3采集節點配置檔案
收集節點配置(3台):
agent.sources = r1
agent.channels = c1
agent.sinks = k1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = test-hadoop03
agent.sinks.k1.port=4545
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
彙總節點配置(1台):
agent.sources = r1
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.r1.type = avro
agent.sources.r1.bind = ip
agent.sources.r1.port = 4545
agent.sources.r1.batchSize = 100000
agent.sources.r1.batchDurationMillis = 1000
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.keep-alive=30
agent.channels.memoryChannel.capacity=120000
agent.channels.memoryChannel.transactionCapacity=100000
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.rollSize = 134217728
agent.sinks.hdfsSink.hdfs.rollInterval = 60
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.idleTimeout=65
agent.sinks.hdfsSink.hdfs.callTimeout=65000
agent.sinks.hdfsSink.hdfs.threadsPoolSize=300
agent.sinks.hdfsSink.channel = memoryChannel
agent.sources.r1.channels = memoryChannel
5.4 架構注意點
- 目前架構需要保證聚合節點機器的性能
- 目前架構新的瓶頸可能會存在存儲Hdfs資料時過慢 ,導緻聚合節點Channels 占用率居高不下,導緻堵塞 。
- 需要關注avro 自定義source sink 的發送效率
6.flume 監控工具(http)
flume 監控工具總共有三種方式 ,我們這裡為友善簡單,使用内置http接口監控方式進行操作
6.1 配置
在啟動腳本處設定 參數
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
即可
6.2 通路 位址 :
http://flumeIp:345456.3 傳回結果示例 和字段解釋 :
{
"CHANNEL.memoryChannel": {
"ChannelCapacity": "550000",
"ChannelFillPercentage": "0.18181818181818182",
"Type": "CHANNEL",
"ChannelSize": "1000",
"EventTakeSuccessCount": "33541400",
"EventTakeAttemptCount": "33541527",
"StartTime": "1536572886273",
"EventPutAttemptCount": "33542500",
"EventPutSuccessCount": "33542500",
"StopTime": "0"
},
"SINK.hdfsSink": {
"ConnectionCreatedCount": "649",
"ConnectionClosedCount": "648",
"Type": "SINK",
"BatchCompleteCount": "335414",
"BatchEmptyCount": "27",
"EventDrainAttemptCount": "33541500",
"StartTime": "1536572886275",
"EventDrainSuccessCount": "33541400",
"BatchUnderflowCount": "0",
"StopTime": "0",
"ConnectionFailedCount": "0"
},
"SOURCE.avroSource": {
"EventReceivedCount": "33542500",
"AppendBatchAcceptedCount": "335425",
"Type": "SOURCE",
"EventAcceptedCount": "33542500",
"AppendReceivedCount": "0",
"StartTime": "1536572886465",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "3",
"AppendBatchReceivedCount": "335425",
"StopTime": "0"
}
}
參數定義:
字段名稱 | 含義 | 備注 |
---|---|---|
SOURCE.OpenConnectionCount | 打開的連接配接數 | |
SOURCE.TYPE | 元件類型 | |
SOURCE.AppendBatchAcceptedCount | 追加到channel中的批數量 | |
SOURCE.AppendBatchReceivedCount | source端剛剛追加的批數量 | |
SOURCE.EventAcceptedCount | 成功放入channel的event數量 | |
SOURCE.AppendReceivedCount | source追加目前收到的數量 | |
SOURCE.StartTime(StopTIme) | 元件開始時間、結束時間 | |
SOURCE.EventReceivedCount | source端成功收到的event數量 | |
SOURCE.AppendAcceptedCount | source追加目前放入channel的數量 | |
CHANNEL.EventPutSuccessCount | ||
CHANNEL.ChannelFillPercentage | 通道使用比例 | |
CHANNEL.EventPutAttemptCount | 嘗試放入将event放入channel的次數 | |
CHANNEL.ChannelSize | 目前在channel中的event數量 | |
CHANNEL.EventTakeSuccessCount | 從channel中成功取走的event數量 | |
CHANNEL.ChannelCapacity | 通道容量 | |
CHANNEL.EventTakeAttemptCount | 嘗試從channel中取走event的次數 | |
SINK.BatchCompleteCount | 完成的批數量 | |
SINK.ConnectionFailedCount | 連接配接失敗數 | |
SINK.EventDrainAttemptCount | 嘗試送出的event數量 | |
SINK.ConnectionCreatedCount | 建立連接配接數 | |
SINK.Type | ||
SINK.BatchEmptyCount | 批量取空的數量 | |
SINK.ConnectionClosedCount | 關閉連接配接數量 | |
SINK.EventDrainSuccessCount | 成功發送event的數量 | |
SINK.BatchUnderflowCount | 正處于批量處理的batch數 |
參考位址
flume-document:
http://flume.apache.org/FlumeUserGuide.html