天天看點

flume 配置

[root@dtpweb data]#tar -zxvf apache-flume-1.7.0-bin.tar.gz

[root@dtpweb conf]# cp flume-env.sh.template flume-env.sh

修改java_home

[root@dtpweb conf]# cp flume-env.sh

export JAVA_HOME=/data/jdk

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

flume.conf

#定義agent名, source,channel,sink的名稱

a4.sources = r1

a4.channels = c1

a4.sinks = k1

#具體定義source

a4.sources.r1.type = spooldir

a4.sources.r1.spoolDir = /data/logs

#具體定義channel

a4.channels.c1.type = memory

a4.channels.c1.capacity = 10000

a4.channels.c1.transactionCapacity = 100

#定義攔截器,攔截一些無效的資料, 為消息添加時間戳,按照日志存入到當天的時間中

a4.sources.r1.interceptors = i1

a4.sources.r1.interceptors.i1 = org.apache.flume.interceptor.TimestampInterceptor$Builder

#定義sinks

a4.sinks.k1.type = hdfs

a4.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d

a4.sinks.k1.hdfs.filePrefix = events

a4.sinks.k1.hdfs.fileType = DataStream

#不按照條數生成檔案

a4.sinks.k1.hdfs.rollCount = 0

#HDFS上的檔案達到128M時生成一個檔案

a4.sinks.k1.hdfs.rollSize = 134217728

#HDFS上的檔案達到60秒生成一個檔案

a4.sinks.k1.hdfs.rollInterval = 60

#組裝 source、channel、sink

a4.sources.r1.channels = c1

a4.sinks.k1.channels = c1

[root@dtpweb lib]# scp namenode:/data/hadoop/etc/hadoop/{core-site.xml,hdfs-site.xml} /data/apache-flume-1.7.0-bin/conf

[root@dtpweb bin]# ./flume-ng agent -n a4 -c ../conf -f ../conf/a4.conf -Dflume.root.logger=INFO,console

報錯1:

java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType

[root@dtpweb lib]# scp 192.168.20.184:/data/hadoop//share/hadoop/common/hadoop-common-2.7.3.jar ./

報錯2:

java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration

[root@dtpweb lib]# scp 192.168.20.184:/data/hadoop/share/hadoop/common/lib/commons-configuration-1.6.jar ./

[root@dtpweb lib]# scp 192.168.20.184:/data/hadoop/share/hadoop/common/lib/hadoop-auth-2.7.3.jar ./

[root@dtpweb lib]# scp 192.168.20.184:/data/hadoop/share/hadoop/common/lib/htrace-core-3.1.0-incubating.jar ./

報錯3

Caused by: java.lang.NoClassDefFoundError: org/apache/commons/io/Charsets

[root@dtpweb lib]# scp 192.168.20.184:/data/hadoop/share/hadoop/common/lib/commons-io-2.4.jar ./

修改hdfs 屬主,預設為hadoop

[hadoop@namenode bin]$ ./hdfs dfs -mkdir /flume

[hadoop@namenode bin]$ ./hdfs dfs -chown -R root /flume

[root@dtpweb bin]# mkdir /data/logs

##############################################################################################################

!!!!使用root使用者進行收集!!!!!

log4j ----- > kafka -----flume(收集所有的topic) -----> hdfs

flume 配置

flume hosts主機名

192.168.20.206 kafka1

192.168.20.207 kafka2

192.168.20.208 kafka3

192.168.20.181 journalnode1

192.168.20.182 journalnode2

192.168.20.183 journalnode3

192.168.20.184 namenode

192.168.20.185 standbynamenode

192.168.20.186 datanode1

192.168.20.187 datanode2

192.168.20.188 datanode3

192.168.20.37 zookeeper1

192.168.20.38 zookeeper2

192.168.20.39 zookeeper3

192.168.20.189 rm1

192.168.20.193 rm2

192.168.20.190 hmaster1

192.168.20.191 hmaster2

192.168.20.194 hregionserver1

192.168.20.195 hregionserver2

kafaka建立topic id

[root@kafka1 bin]# ./kafka-topics.sh --create --topic testtopic1 --replication-factor 1 --partitions 1 --zookeeper zookeeper1:2181

[root@kafka1 bin]# ./kafka-console-producer.sh --broker-list kafka1:9092 --topic testtopic1

發送消........................

[root@kafka1 bin]# .

namenode節點修改hdfs 屬主,預設為hadoop

./kafka-console-consumer.sh --zookeeper zookeeper1:2181 --topic testtopic1 --from-beginning

cat /data/flume/conf/a5.conf

a5.sources = r1

a5.channels = c1

a5.sinks = k1

#auto.commit.enable = true

a5.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

#a5.sources.r1.zookeeperConnect = zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/testkafka

a5.sources.r1.kafka.bootstrap.servers = kafka1:9092,kafka2:9092,kafka3:9092

# 配置消費的kafka topic

a5.sources.r1.kafka.topics = testtopic1,testtopic2

#a5.sources.r1.topics.regex = testtopic[0-9]$

# 配置消費者組的id

#a5.sources.r1.kafka.consumer.group.id = flume

#a5.sources.r1.topic = itil_topic_4097

#a5.sources.r1.batchSize = 10000

#定義具體的channel

a5.channels.c1.type = memory

a5.channels.c1.capacity = 100000

a5.channels.c1.transactionCapacity = 10000

#定義具體的sink

a5.sinks.k1.type = hdfs

a4.sinks.k1.hdfs.filePrefix = testtoppic

a5.sinks.k1.hdfs.path = hdfs://ns1/flume/_%Y%m%d

a5.sinks.k1.hdfs.rollCount = 0

a5.sinks.k1.hdfs.rollSize = 134217728

a5.sinks.k1.hdfs.rollInterval = 60

a5.sinks.k1.hdfs.threadsPoolSize = 300

a5.sinks.k1.hdfs.callTimeout = 50000

#a5.sinks.k1.hdfs.writeFormat=Text

#a5.sinks.k1.hdfs.codeC = gzip

#a5.sinks.k1.hdfs.fileType = CompressedStream

#組裝source、channel、sink

a5.sources.r1.channels = c1

a5.sinks.k1.channel = c1

繼續閱讀