【Spark六十一】Spark Streaming結合Flume、Kafka進行日志分析
第一步,Flume和Kakfa對接,Flume抓取日志,寫到Kafka中
第二部,Spark Streaming讀取Kafka中的資料,進行實時分析
本文首先使用Kakfa自帶的消息處理(腳本)來擷取消息,走通Flume和Kafka的對接
1. Flume配置
1. 下載下傳Flume和Kafka內建的插件,下載下傳位址:https://github.com/beyondj2ee/flumeng-kafka-plugin。将package目錄中的flumeng-kafka-plugin.jar拷貝到Flume安裝目錄的lib目錄下
2. 将Kakfa安裝目錄libs目錄下的如下jar包拷貝到Flume安裝目錄的lib目錄下
kafka_2.10-0.8.1.1.jar
scala-library-2.10.1.jar
metrics-core-2.2.0.jar
3.添加agent配置
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
#producer.sources.s.type = seq
producer.sources.s.type = netcat
producer.sources.s.bind = localhost
producer.sources.s.port = 44444
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
3.1 上面指定了sink的類型為KafkaSink,目的是将日志送往Kafka消息隊列,分區類為SinglePartition
3.2 指定topic的名字為test
3.3 指定Flume的消息源來自于netcat,(localhost,44444)
4. 啟動Flume
./flume-ng agent -f ../conf/kafka.conf -c . -n producer
指定配置檔案和agent的名字
Kafka配置
5. 啟動Kafka
./kafka-server-start.sh ../config/server.properties
5.1 啟動Kafka依賴的Zookeeper,添加topic名字為test,詳見
5.2 啟動Kakfa的消息接收程序
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
6.啟動telnet,輸入netcat接受的資料
telnet localhost 44444
資料流轉過程
1. 在telnet終端輸入資料,被Flume的source接受
2. Flume将資料寫入到Kafka消息隊列中,在Flume_Kafka的插件中有向Kafka發送消息的邏輯
3. Kafka消息消費者,監聽到Kafka隊列中來了消息,那麼就在Kakfa的消息接收端看到控制台上有輸出
問題:
1. 此處Kafka使用SinglePartition的方式接收消息,如果是Kafka叢集,那麼Flume如何寫入消息到一個topic的多個partition中
2. Flume的消息源是監聽端口44444實作的,如何監聽日志檔案呢?日志檔案可以自動增長,另外也會自動的建立新的日志檔案,這用Kafka如何處理?
關于Kafka的Partition
1. 第一個問題,SinglePartition的實作
package org.apache.flume.plugins;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SinglePartition implements Partitioner<String> {
public SinglePartition(VerifiableProperties props) {
}
@Override
public int partition(String key, int numberOfPartions) {
return 0;
}
}
可見,隻要把partition方法實作為 key.hashCode()%numberOfPartitions即可
2. 第二個問題,如何設定Kafka的一個topic幾個partition?
在建立topic時,就需要指定partition的個數
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
建立一個分區數為17,複制因為為3的topic,看看zk上記錄了哪些資訊,
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 17 --topic test
2.1. 報錯:也就是說,複制因子不能比brokers的個數大
[[email protected] kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 17 --topic test_many_partitions
Error while executing topic command replication factor: 3 larger than available brokers: 1
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:50)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
2.2 建立了topic後,Kafka server日志顯示
[2015-02-14 02:53:53,526] INFO Completed load of log test_many_partitions-4 with log end offset 0 (kafka.log.Log)
[2015-02-14 02:53:53,526] INFO Created log for partition [test_many_partitions,4] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2015-02-14 02:53:53,527] WARN Partition [test_many_partitions,4] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,4] (kafka.cluster.Partition)
[2015-02-14 02:53:53,540] INFO Completed load of log test_many_partitions-13 with log end offset 0 (kafka.log.Log)
[2015-02-14 02:53:53,541] INFO Created log for partition [test_many_partitions,13] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2015-02-14 02:53:53,541] WARN Partition [test_many_partitions,13] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,13] (kafka.cluster.Partition)
[2015-02-14 02:53:53,554] INFO Completed load of log test_many_partitions-1 with log end offset 0 (kafka.log.Log)
[2015-02-14 02:53:53,555] INFO Created log for partition [test_many_partitions,1] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2015-02-14 02:53:53,555] WARN Partition [test_many_partitions,1] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,1] (kafka.cluster.Partition)
3.3 檢視zk上關于具有多partition的topic,結果如下:
17個partition
[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics
[test_many_partitions, test]
[zk: localhost:2181(CONNECTED) 27] ls /brokers/topics/test_many_partitions
[partitions]
[zk: localhost:2181(CONNECTED) 28] ls /brokers/topics/test_many_partitions/partitions
[15, 16, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
[zk: localhost:2181(CONNECTED) 29]
1個partition
[zk: localhost:2181(CONNECTED) 30] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 31] ls /brokers/topics/test/partitions
[0]
參考:
https://github.com/beyondj2ee/flumeng-kafka-plugin
http://blog.csdn.net/weijonathan/article/details/18301321
http://liyonghui160com.iteye.com/blog/2173235
原文來自:
http://www.myexception.cn/open-source/1850714.html