天天看点

SparkStreaming03

1.上节课回顾

昨天讲了basic source(基础数据源),例如:file system,socket connection等。还有一个比较重要的是Connetcion poll这个记者去找代码,试一试!!

2.Flume

(1)Flume的作用:Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.(Apache Flume是一种分布式的、可靠的、可用的服务,可以有效地收集、聚合和移动大量的日志数据。在这里,我们将解释如何配置Flume和Spark流来接收来自Flume的数据。有两种方法可以解决这个问题。)

(2)Note: Flume support is deprecated as of Spark 2.3.0.(在spark2.3.0之后是过时了的)

3.Spark Streaming + Flume   

3.1Approach 1: Flume-style Push-based Approach

(1)原理,Flume 从source端采集数据来,直接sink push到Streaming的端口上来处理。spark自己要设置一个receiver来接收数据。

flume配置文件:

avro-sink-agent.sources = netcat-source
avro-sink-agent.sinks = avro-sink
avro-sink-agent.channels = netcat-memory-channel

avro-sink-agent.sources.netcat-source.type = netcat
avro-sink-agent.sources.netcat-source.bind = localhost
avro-sink-agent.sources.netcat-source.port = 44444


avro-sink-agent.channels.netcat-memory-channel.type = memory

avro-sink-agent.sinks.avro-sink.type = avro
avro-sink-agent.sinks.avro-sink.hostname = 192.168.1.105
avro-sink-agent.sinks.avro-sink.port = 41414

avro-sink-agent.sources.netcat-source.channels = netcat-memory-channel
avro-sink-agent.sinks.avro-sink.channel = netcat-memory-channel
//name.sources=source的类型
//name.sinks=sink的类型
//name
           
package Streamning03
// SparkStreaming + Flume push 方式!!有弊端 容易丢数据,而且如果数据来的大了怎么办?
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingFlumeApp01 {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("StreamingFlumeApp").setMaster("local[2]")
    val ssc=new StreamingContext(sparkconf,Seconds(10))
val lines=FlumeUtils.createStream(ssc,"192.168.1.105",41414)
    //push方式这里的ip是要运行代码的ip

  //将SparkEvent变成字符串
lines.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(","))
    .map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }

}

           

记得加外部依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
           

3.2Approach 2: Pull-based Approach using a Custom Sink

(1)原理:自定义一个Flume,首先Flume  push数据到sink中去,并且保持缓存,Sparkstreaming使用一个可靠的Flume接收器和事务从接收器提取数据。只有在通过Spark流接收和复制数据之后,事务才会成功。(这里涉及到了一个自定义Receiver官网上有案例用到可以来看看的http://spark.apache.org/docs/2.3.1/streaming-custom-receivers.html)

val lines=FlumeUtils.createPollingStream(ssc,"192.168.137.1",41413) //俩个方法就是一个调用方法不同
      

(2)缺点:当Driver挂掉了,executor也会挂掉了,当Driver重启时,execetor中接受的数据就丢失了!!这就有了一个WAL机制(数据进来我先写到日志里面去,即使挂了也可以再去读出来,纯在hdfs上定期清理,很多关系型数据库都是这样保证数据不丢失的,但是多了一个WAL,吞吐量就小许多了,速度就会慢下来)

4.Spark+Kafka(0.8)  Approach1:Receiver

1.启动步骤:

(1)有receiver在UI上有个JOB id=0的特点

<!--添加SparkStreaming整合kafka的依赖-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
           

(2)启动Kafka:  ./kafka-server-start.sh -daemon /home/hadoop/app/kafka/config/server.properties   

(3)启动ZK :  ./zkServer.sh start

(4)启动生产者:./kafka-console-producer.sh --broker-list localhost:9092 --topic ruoze_kafka_streaming

(5)启动消费者:./kafka-console-consumer.sh --bootstrap-server 192.168.137.251:9092 --topic ruoze_kafka_streaming

(6)

package Streamning03

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingKafka01 {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("StreamingKafka01").setMaster("local[2]")
    val ssc=new StreamingContext(sparkconf,Seconds(10))
val topics="ruoze_kafka_streaming".split(",").map((_,1)).toMap 
//上面这个1仅仅增加了线程的数量,而不会提高并行度
val lines=KafkaUtils.createStream(ssc,"192.168.137.251:2181/kafka","ruoze_group",topics)
//这里有点像Flumeutil,传入sc,加上Zk的保存地址,和主题
lines.map(_._2).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }

}
           

2.注意点:

(1)Kafka topic 中的partition和RDD 中的partition没关系的,所以你增加topic partition的数量

(2)如果想快点,可以创建多个组,多个receiver

(3)果启用了使用HDFS之类的复制文件系统WAL,则接收到的数据将以副本的方式存起来,例如存到hdfs上副本就3份,因此你的StorageLevel.MEMORY_AND_DISK_SER_2(默认存储级别是2份),你可以改成1份就好MEMORY_AND_DISK_SER,不同框架的StorageLevel的区别?

5.Spark+Kafka(0.8)  Approach2:Dirct  NO Receiver(1.3之后出来的)

相比于有Receiver的优点:

(1)简化了并行度(Simplified Parallelism),kafka的分区和RDD的分区是一一对应的,一个partiton就对应一个task,在资源够的情况下 每个task都是并行跑的,要提高并行度添加Kafka分区就行了。

(2)效率更高(Efficiency):第一种有receiver的为了保证零数据丢失,需要将数据WAL(Write Ahead Log)到存储系统上去,数据被存了俩次,降低了吞吐量。第二种不需要WAL机制,只要保留了足够的Kafka retention这个参数默认是168小时(太长了数据增长较快根本扛不住的),就可以从Kafka恢复消息。

(3)只执行一次(Exactly-once semantics):还有俩种(at most once:数据要么被处理,要么就丢咯 和at least once:数据不会丢失,但是会重复处理),第一种方式offset不用管的,自己会存储,这种方式配合WAL可以不丢数据,但是可能重复处理(也就是at least once),在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。偏移量通过其检查点内的Spark流进行跟踪。这消除了Spark流和Zookeeper/Kafka之间的不一致性,因此尽管出现了故障,Spark流仍然可以有效地接收每条记录一次。为了实现结果输出的一次性语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务(请参阅主编程中的输出操作语义http://spark.apache.org/docs/2.3.1/streaming-programming-guide.html#semantics-of-output-operations)。

缺点:

这种方法的一个缺点是它不更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监控工具不会显示进度。但是,您可以在每个批处理中访问这种方法处理的偏移量,并自己更新Zookeeper(参见下面)。

package Streamning03

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingKafka02 {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("StreamingKafka01").setMaster("local[2]")
    val ssc=new StreamingContext(sparkconf,Seconds(10))
    val kafkapa=Map[String,String](
      "metadata.broker.list"->"192.168.137.251:9092"
    )
    val topics="ruoze_kafka_streaming".split(",").toSet
   val lines=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkapa,topics)



    lines.map(_._2).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }

}
           

5.Spark Kafka

6.SparkStreaming重点

(1)Data Skew(数据倾斜)

(2)Offset管理

继续阅读