天天看點

flume與kafka的整合總結

案例1:syslog-memory-kafka

将flume采集到的資料落地到kafka上,即sink是kafka(生産者身份)

vim syslog-mem-kafka.conf
# 命名個元件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
#source屬性
a1.sources.r1.type = syslogtcp
a1.sources.r1.host=mypc01
a1.sources.r1.port=10086

 
# 描述channel屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述sink屬性
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = mypc01:9092,mypc:9092,mypc03:9092
# 主題必須提前存在
a1.sinks.k1.kafka.topic = pet
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
 

# 關聯source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

           
flume與kafka的整合總結

啟動flume

#bin/bash
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf \
-f /usr/local/flume/flumeconf/syslog-mem-kafka.conf \
-n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31002
           

先啟動消費者準備接受消息

kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
           

測試

echo "aaaaa" | nc mypc01 10086
           

案例2 kafka-memory-hdfs

kafka的source類型從kafka叢集讀取資料,就是消費者身份,将資料封裝成event落地到hdfs

flume與kafka的整合總結
vim kafka-mem-kafka.conf
# 命名個元件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
#source屬性
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = mypc01:9092,mypc02:9092,mypc03:9092
a1.sources.r1.kafka.consumer.group.id=g1
a1.sources.r1.kafka.topics=pet

 
# 描述channel屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述sink屬性
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://mypc01:8020/kafka/pet/%Y%m%d
a1.sinks.k1.hdfs.filePrefix=FlumeData
a1.sinks.k1.hdfs.fileSuffix = .kafka
a1.sinks.k1.hdfs.rollSize=102400
a1.sinks.k1.hdfs.rollCount	= 0
#機關為s
b1001.sinks.k1.hdfs.rollInterval=60
b1001.sinks.k1.hdfs.useLocalTimeStamp = true
 

# 關聯source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

           

啟動flume

#bin/bash
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf \
-f /usr/local/flume/flumeconf/kafka-mem-hdfs.conf \
-n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31002
           

啟動生産者,使用生産者發送消息

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
           

之後就可以在hdfs上看到生成的檔案了.

flume與kafka的整合總結

總結

  • kafka可以作為source,也可以作為sink