天天看點

flume 整合kafka

1,安裝并成功能運作flume

2,安裝并成功能運作kafka

3,安裝并成功能運作zookeeper

4,開始整合flume收集的資料,寫入kafka

a,修改flume的配置文加:

vim  flume_kafka.conf

agent1.sources = r1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.r1.type = exec

agent1.sources.r1.command=tail -f /opt/logs/usercenter.log

# Use a channel which buffers events in memory

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

agent1.sources.r1.channels = c1

agent1.sinks.k1.channel = c1

# # Describe the sink  這部分就是輸入到kafka的寫法

##############################################

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = test

agent1.sinks.k1.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 20

b,下載下傳第三方插件

把lib目錄下的

flume 整合kafka

和package下的

flume 整合kafka

都放到flume的lib目錄

如果,報錯,請看這個文檔

http://wenda.chinahadoop.cn/question/4079?notification_id=290954&rf=false&item_id=10382#!answer_10382

修改原有的flume-conf檔案

在插件包裡有一個flume-conf.properties,把這個檔案放到flume的conf檔案夾裡

然後修改以下内容

<code>producer.sources.s.</code><code>type</code> <code>= </code><code>exec</code>

<code> </code><code>producer.sources.s.</code><code>command</code> <code>= </code><code>tail</code> <code>-f -n+1  </code><code>/opt/logs/test</code><code>.log</code>

<code> </code><code>producer.sources.s.channels = c</code>

<code>……</code>

<code>producer.sinks.r.custom.topic.name=</code><code>test</code>

<code>consumer.sources.s.custom.topic.name=</code><code>test</code>

c:啟動服務

啟動zookeeper叢集

zkServer.sh start

還需要建立一個新的位址

zookeeper/bin/zkCli.sh

create /kafka  test

啟動kafka broker 叢集

bin/kafka-server-start.sh config/server.properties

建立kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test

啟動kafka consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test --from-beginning

啟動flume

bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.properties --name producer -Dflume.root.logger=INFO,console

測試

<code>echo</code> <code>"this is a test"</code> <code>&gt;&gt; </code><code>/opt/logs/test</code><code>.log</code>

此時隻要能在consumer裡現“this is a test”就表示成功

錯誤總結:

http://472053211.blog.51cto.com/3692116/1655844

      本文轉自crazy_charles 51CTO部落格,原文連結:http://blog.51cto.com/douya/1860896,如需轉載請自行聯系原作者