Push方式整合之服务器环境联调
<生产环境中使用的方式>
把代码打成jar包的方式 (如果没有依赖需加上packages)
spark-submit --class 类名 --master local[2] --packages 参数(主机IP 端口号)
官网介绍
地址 http://spark.apache.org/docs/latest/streaming-flume-integration.html
Flume配置文件 (lib需导包)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop
a1.sources.r1.port = 44444
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = hadoop
a1.sinks.k1.port = 55555
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Pom文件(加入依赖)
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
IDEA Application
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamingFlumePull extends App {
if(args.length!=2){
System.err.println("Usage: SparkStreamingFlume <hostname> <port>")
System.exit(1)
}
val Array(hostname,port) = args
//生产中打成jar包,不加.setMaster("local[2]").setAppName("WordCount")
val saprkConf = new SparkConf() //.setMaster("local[2]").setAppName("WordCount")
val ssc = new StreamingContext(saprkConf,Seconds(5))
val flume = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
flume.map(x=>new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
spark提交作业