天天看点

Spark Streaming实时流处理项目实战笔记——Pull方式整合之服务器环境联调

Push方式整合之服务器环境联调

<生产环境中使用的方式>

把代码打成jar包的方式 (如果没有依赖需加上packages)

spark-submit --class 类名 --master local[2] --packages 参数(主机IP 端口号)

Spark Streaming实时流处理项目实战笔记——Pull方式整合之服务器环境联调

官网介绍

地址 http://spark.apache.org/docs/latest/streaming-flume-integration.html
Spark Streaming实时流处理项目实战笔记——Pull方式整合之服务器环境联调
Spark Streaming实时流处理项目实战笔记——Pull方式整合之服务器环境联调
Spark Streaming实时流处理项目实战笔记——Pull方式整合之服务器环境联调

Flume配置文件 (lib需导包)

a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop
a1.sources.r1.port = 44444
 
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = hadoop
a1.sinks.k1.port = 55555
 
a1.channels.c1.type = memory
 
 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

Pom文件(加入依赖)

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-flume-sink_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.5</version>
</dependency>
           

IDEA Application

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingFlumePull extends App {

    if(args.length!=2){
      System.err.println("Usage: SparkStreamingFlume <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname,port) = args

    //生产中打成jar包,不加.setMaster("local[2]").setAppName("WordCount")
    val saprkConf = new SparkConf() //.setMaster("local[2]").setAppName("WordCount")
    val ssc = new StreamingContext(saprkConf,Seconds(5))

    val flume = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
    flume.map(x=>new String(x.event.getBody.array()).trim)
        .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
}
           

spark提交作业

继续阅读