案例1:syslog-memory-kafka
将flume采集到的資料落地到kafka上,即sink是kafka(生産者身份)
vim syslog-mem-kafka.conf
# 命名個元件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source屬性
a1.sources.r1.type = syslogtcp
a1.sources.r1.host=mypc01
a1.sources.r1.port=10086
# 描述channel屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述sink屬性
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = mypc01:9092,mypc:9092,mypc03:9092
# 主題必須提前存在
a1.sinks.k1.kafka.topic = pet
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 關聯source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume
#bin/bash
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf \
-f /usr/local/flume/flumeconf/syslog-mem-kafka.conf \
-n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31002
先啟動消費者準備接受消息
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
測試
echo "aaaaa" | nc mypc01 10086
案例2 kafka-memory-hdfs
kafka的source類型從kafka叢集讀取資料,就是消費者身份,将資料封裝成event落地到hdfs
vim kafka-mem-kafka.conf
# 命名個元件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source屬性
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = mypc01:9092,mypc02:9092,mypc03:9092
a1.sources.r1.kafka.consumer.group.id=g1
a1.sources.r1.kafka.topics=pet
# 描述channel屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述sink屬性
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://mypc01:8020/kafka/pet/%Y%m%d
a1.sinks.k1.hdfs.filePrefix=FlumeData
a1.sinks.k1.hdfs.fileSuffix = .kafka
a1.sinks.k1.hdfs.rollSize=102400
a1.sinks.k1.hdfs.rollCount = 0
#機關為s
b1001.sinks.k1.hdfs.rollInterval=60
b1001.sinks.k1.hdfs.useLocalTimeStamp = true
# 關聯source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume
#bin/bash
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf \
-f /usr/local/flume/flumeconf/kafka-mem-hdfs.conf \
-n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31002
啟動生産者,使用生産者發送消息
kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
之後就可以在hdfs上看到生成的檔案了.
總結
- kafka可以作為source,也可以作為sink