天天看點

flume 單機問題解決與架構更改

[TOC]

引言

今天針對線上生産環境下單機 flume 拉取kafka資料并存儲資料入Hdfs 出現大批量資料延遲. 在網上官網各種搜尋資料,并結合官網資料,現進行以下總結

1. 線上單機存在問題簡述

目前flume拉取kafa資料量并不大 ,根據flume用戶端日志 ,每半分鐘hdfs檔案寫入一次資料生成檔案

發現問題:

**拉取kafka資料過慢**
           

2. 解決思路

  1. 加大kafka拉取資料量
  2. 加大flume中channel,source,sink 各通道的單條資料量
  3. 将flume拉取資料單機版本改成多資料拉取,通過flume-avore-sink-> flume-avore-source 進行資料多資料采取并合并

3 加大kafka拉取資料量

3.1 kafka-source簡述

  • flume 輸入單線程拉取資料并将資料發送内置channel并通過sink元件進行資料轉發和處理,故對于kafka叢集多副本方式拉取資料的時候,應适當考慮多個flume節點拉取kafka多副本資料,以避免flume節點在多個kafka叢集副本中輪詢。加大flume拉取kafka資料的速率。
  • flume-kafka-source 是flume内置的kafka source資料元件,是為了拉取kafka資料,配置如下:
agent.sources = r1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest           
  • flume-kafka-source 的offset是交由zk叢集去維護offset

3.2 kafka-source配置詳解

Kafka Source是一個Apache Kafka消費者,它從Kafka主題中讀取消息。 如果您正在運作多個Kafka源,則可以使用相同的使用者組配置它們,以便每個源都讀取一組唯一的主題分區。

Property Name Default Description
channels 配置的channels 可配置多個channels 後續文章會說到
type org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers 配置kafka叢集位址
kafka.consumer.group.id flume 唯一确定的消費者群體。 在多個源或代理中設定相同的ID表示它們是同一個使用者組的一部分
kafka.topics 你需要消費的topic
kafka.topics.regex 正規表達式,用于定義源訂閱的主題集。 此屬性的優先級高于

kafka.topics

,如果存在則覆寫

kafka.topics

batchSize 1000 一批中寫入Channel的最大消息數 (優化項)
batchDurationMillis 将批次寫入通道之前的最長時間(以毫秒為機關)隻要達到第一個大小和時間,就會寫入批次。(優化項)
backoffSleepIncrement Kafka主題顯示為空時觸發的初始和增量等待時間。 等待時間将減少對空

kafka

主題的激進ping操作。 一秒鐘是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。
maxBackoffSleep 5000 Kafka主題顯示為空時觸發的最長等待時間。 5秒是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。
useFlumeEventFormat false 預設情況下,事件從Kafka主題直接作為位元組直接進入事件主體。 設定為true以将事件讀取為Flume Avro二進制格式。 與KafkaSink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時,這将保留在生成端發送的任何Flume标頭。
setTopicHeader true 設定為true時,将檢索到的消息的主題存儲到标題中,該标題由

topicHeader

屬性定義。
topicHeader topic 如果

setTopicHeader

屬性設定為

true

,則定義用于存儲接收消息主題名稱的标題的名稱。 如果與Kafka Sink

topicHeader

屬性結合使用,應該小心,以避免在循環中将消息發送回同一主題。
migrateZookeeperOffsets 如果找不到Kafka存儲的偏移量,請在Zookeeper中查找偏移量并将它們送出給Kafka。 這應該是支援從舊版本的Flume無縫Kafka用戶端遷移。 遷移後,可以将其設定為false,但通常不需要這樣做。 如果未找到Zookeeper偏移量,則Kafka配置kafka.consumer.auto.offset.reset定義如何處理偏移量。 檢視[Kafka文檔]( http://kafka.apache.org/documentation.html#newconsumerconfigs )了解詳細資訊
kafka.consumer.security.protocol PLAINTEXT 如果使用某種級别的安全性寫入Kafka,則設定為SASL_PLAINTEXT,SASL_SSL或SSL。
Other Kafka Consumer Properties 這些屬性用于配置Kafka Consumer。 可以使用Kafka支援的任何消費者财産。 唯一的要求是在字首為“kafka.consumer”的字首中添加屬性名稱。 例如:

kafka.consumer.auto.offset.reset

注意:

Kafka Source會覆寫兩個Kafka使用者參數:source.com将auto.commit.enable設定為“false”,并送出每個批處理。 Kafka源至少保證一次消息檢索政策。 源啟動時可以存在重複項。 Kafka Source還提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的預設值。 不建議修改這些參數。

官方配置示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used           

本案例kafka-source配置

agent.sources = r1
agent.sources.r1.channels=c1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest           

官網配置檔案位址 :

kafka-source

3.3 配置優化

主要是在放入flume-channels 的批量資料加大

更改參數:

agent.sources.r1.batchSize = 50000

agent.sources.r1.batchDurationMillis = 2000

更改解釋:

**即每2秒鐘拉取 kafka 一批資料 批資料大小為50000  放入到flume-channels 中 。即flume該節點 flume-channels 輸入端資料已放大**
           

更改依據:

  • 需要配置kafka單條資料 broker.conf 中配置

    message.max.bytes

  • 目前flume channel sink 元件最大消費能力如何?

4. 加大flume中channel,source,sink 各通道的單條資料量

4.1 source 發送至channels 資料量大小已配置 見 3.3

4.2 channel 配置

The component type name, needs to be

memory

capacity 100 通道中存儲的最大事件數 (優化項)
transactionCapacity 每個事務通道從源或提供給接收器的最大事件數 (優化項)
keep-alive 3 添加或删除事件的逾時(以秒為機關)
byteCapacityBufferPercentage 20 定義byteCapacity與通道中所有事件的估計總大小之間的緩沖區百分比,以計算标頭中的資料。 見下文。
byteCapacity see description 允許的最大總位元組作為此通道中所有事件的總和。 實作隻計算Event

body

,這也是提供

byteCapacityBufferPercentage

配置參數的原因。 預設為計算值,等于JVM可用的最大記憶體的80%(即指令行傳遞的-Xmx值的80%)。 請注意,如果在單個JVM上有多個記憶體通道,并且它們碰巧保持相同的實體事件(即,如果您使用來自單個源的複制通道選擇器),那麼這些事件大小可能會因為通道byteCapacity目的而被重複計算。 将此值設定為“0”将導緻此值回退到大約200 GB的内部硬限制。

配置 capacity 和 transactionCapacity 值 。預設配置規則為:

$$

channels.capacity >= channels.transactionCapacity >= source.batchSize

官方channels配置示例

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000           

本案例修改之後的channels 配置

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000           

5. 将flume拉取資料單機版本改成多資料拉取,通過flume-avore-sink-> flume-avore-source 進行資料多資料采取并合并

5.1 存在問題

通過上續修改會發現單機版本的flume會在多副本kafka輪詢造成效率浪費

單機版本flume處理資料時會存在單機瓶頸,單機channels可能最多隻能處理最大資料無法擴充

單機flume配置多個資料源不友善,不能适合後續多需求開發

5.2 修改架構

flume 單機問題解決與架構更改

5.3采集節點配置檔案

收集節點配置(3台):

agent.sources = r1
agent.channels = c1
agent.sinks = k1

agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

agent.sinks.k1.type = avro
agent.sinks.k1.hostname = test-hadoop03
agent.sinks.k1.port=4545

agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1           

彙總節點配置(1台):

agent.sources = r1
agent.channels = memoryChannel
agent.sinks = hdfsSink


agent.sources.r1.type = avro
agent.sources.r1.bind = ip
agent.sources.r1.port = 4545
agent.sources.r1.batchSize = 100000
agent.sources.r1.batchDurationMillis = 1000


agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.keep-alive=30
agent.channels.memoryChannel.capacity=120000
agent.channels.memoryChannel.transactionCapacity=100000


agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H
agent.sinks.hdfsSink.hdfs.writeFormat=Text

agent.sinks.hdfsSink.hdfs.rollCount = 0

agent.sinks.hdfsSink.hdfs.rollSize = 134217728

agent.sinks.hdfsSink.hdfs.rollInterval = 60
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.idleTimeout=65
agent.sinks.hdfsSink.hdfs.callTimeout=65000
agent.sinks.hdfsSink.hdfs.threadsPoolSize=300

agent.sinks.hdfsSink.channel = memoryChannel
agent.sources.r1.channels = memoryChannel           

5.4 架構注意點

  • 目前架構需要保證聚合節點機器的性能
  • 目前架構新的瓶頸可能會存在存儲Hdfs資料時過慢 ,導緻聚合節點Channels 占用率居高不下,導緻堵塞 。
  • 需要關注avro 自定義source sink 的發送效率

6.flume 監控工具(http)

flume 監控工具總共有三種方式 ,我們這裡為友善簡單,使用内置http接口監控方式進行操作

6.1 配置

在啟動腳本處設定 參數

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

即可

6.2 通路 位址 :

http://flumeIp:34545

6.3 傳回結果示例 和字段解釋 :

{
    "CHANNEL.memoryChannel": {
        "ChannelCapacity": "550000",
        "ChannelFillPercentage": "0.18181818181818182",
        "Type": "CHANNEL",
        "ChannelSize": "1000",
        "EventTakeSuccessCount": "33541400",
        "EventTakeAttemptCount": "33541527",
        "StartTime": "1536572886273",
        "EventPutAttemptCount": "33542500",
        "EventPutSuccessCount": "33542500",
        "StopTime": "0"
    },
    "SINK.hdfsSink": {
        "ConnectionCreatedCount": "649",
        "ConnectionClosedCount": "648",
        "Type": "SINK",
        "BatchCompleteCount": "335414",
        "BatchEmptyCount": "27",
        "EventDrainAttemptCount": "33541500",
        "StartTime": "1536572886275",
        "EventDrainSuccessCount": "33541400",
        "BatchUnderflowCount": "0",
        "StopTime": "0",
        "ConnectionFailedCount": "0"
    },
    "SOURCE.avroSource": {
        "EventReceivedCount": "33542500",
        "AppendBatchAcceptedCount": "335425",
        "Type": "SOURCE",
        "EventAcceptedCount": "33542500",
        "AppendReceivedCount": "0",
        "StartTime": "1536572886465",
        "AppendAcceptedCount": "0",
        "OpenConnectionCount": "3",
        "AppendBatchReceivedCount": "335425",
        "StopTime": "0"
    }
}           

參數定義:

字段名稱 含義 備注
SOURCE.OpenConnectionCount 打開的連接配接數
SOURCE.TYPE 元件類型
SOURCE.AppendBatchAcceptedCount 追加到channel中的批數量
SOURCE.AppendBatchReceivedCount source端剛剛追加的批數量
SOURCE.EventAcceptedCount 成功放入channel的event數量
SOURCE.AppendReceivedCount source追加目前收到的數量
SOURCE.StartTime(StopTIme) 元件開始時間、結束時間
SOURCE.EventReceivedCount source端成功收到的event數量
SOURCE.AppendAcceptedCount source追加目前放入channel的數量
CHANNEL.EventPutSuccessCount
CHANNEL.ChannelFillPercentage 通道使用比例
CHANNEL.EventPutAttemptCount 嘗試放入将event放入channel的次數
CHANNEL.ChannelSize 目前在channel中的event數量
CHANNEL.EventTakeSuccessCount 從channel中成功取走的event數量
CHANNEL.ChannelCapacity 通道容量
CHANNEL.EventTakeAttemptCount 嘗試從channel中取走event的次數
SINK.BatchCompleteCount 完成的批數量
SINK.ConnectionFailedCount 連接配接失敗數
SINK.EventDrainAttemptCount 嘗試送出的event數量
SINK.ConnectionCreatedCount 建立連接配接數
SINK.Type
SINK.BatchEmptyCount 批量取空的數量
SINK.ConnectionClosedCount 關閉連接配接數量
SINK.EventDrainSuccessCount 成功發送event的數量
SINK.BatchUnderflowCount 正處于批量處理的batch數

參考位址

flume-document

:

http://flume.apache.org/FlumeUserGuide.html