1.jar包准备
参考官方文档: http://spark.apache.org/docs/latest/streaming-flume-integration.html
当前测试flume使用到的jar包版本如下:
spark-streaming-flume-sink_2-.jar
scala-library-.jar
commons-lang3-.jar
这几个jar包下载后放到flume安装目录
./flume/lib/
中。
spark streaming用到的jar版本如下:
spark-streaming-flume-assembly_2-.jar
在 http://search.maven.org 下载后放到spark jar依赖目录。
2.flume 配置启动
假设flume数据源为本地日志文件: /tmp/log_source/src.log
新增config文件, 如flume-spark.conf:
a1.channels = c1
a1.sinks = spark
a1.sources = r1
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.spark.hostname = your_hostname
a1.sinks.spark.port =
a1.sinks.spark.channel = c1
a1.sources.r1.type=exec
a1.sources.r1.channels=c1
a1.sources.r1.command=tail -F /tmp/log_source/src.log
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/tmp/flume-spark/tmp/checkpoint
a1.channels.c1.dataDirs=/tmp/flume-spark/tmp/data
启动测试:
hadoop@1:/usr/local/flume$ bin/flume-ng agent -c conf -f conf/flume-spark.conf -n a1 -Dflume.root.logger=DEBUG,console
3.spark streaming 接收数据(python)
词频统计逻辑实现test_streaming.py:
from __future__ import print_function
import sys
import logging
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
def flume_main():
sc = SparkContext(appName="streaming_analysis_wordcount")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, )
addrs = [("your_hostname", ), ]
fps = FlumeUtils.createPollingStream(ssc, addrs)
lines = fps.map(lambda x: x[])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, )) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
flume_main()
提交spark任务:
hadoop@1:/data/test$ /usr/local/spark/bin/spark-submit --master yarn --deploy-mode client test_streaming.py
运行后可看到统计输出如:
-------------------------------------------
Time: -- ::
-------------------------------------------
-------------------------------------------
Time: -- ::
-------------------------------------------
('current', )
('20171019', )
('11:48:58', )
('11:48:59', )
('5435854358', )
('5435954359', )
('5436354363', )
('5436454364', )
('5436954369', )
('5435454354', )
...