天天看点

【Spark内存计算框架】SparkStreaming容错_SparkStreaming和Kafka进行整合SparkStreaming容错SparkSreaming语义SparkStreaming和Kafka进行整合

003SparkStreaming

  • SparkStreaming容错
    • Executor失败
    • Driver失败
    • 保证数据不丢失
    • 推测执行
  • SparkSreaming语义
  • SparkStreaming和Kafka进行整合
    • 方式一:Receiver-based Approach(不推荐使用)
    • 方式二: Direct Approach (No Receivers)
    • SparkStreaming与Kafka-0-8整合
    • SparkStreaming与Kafka-0-10整合

SparkStreaming容错

  1. SparkStreaming容错(了解)
    1. Executor失败的容错
    2. Driver的容错

      如何配置自动重启的参数

    3. 保证数据不丢失的

      WAL

    4. 推测执行

Executor失败

【Spark内存计算框架】SparkStreaming容错_SparkStreaming和Kafka进行整合SparkStreaming容错SparkSreaming语义SparkStreaming和Kafka进行整合

Tasks和Receiver自动的重启,不需要做任何的配置

Driver失败

【Spark内存计算框架】SparkStreaming容错_SparkStreaming和Kafka进行整合SparkStreaming容错SparkSreaming语义SparkStreaming和Kafka进行整合

用checkpoint机制恢复失败的Driver,定期的将Driver信息写入到HDFS中。

步骤一:设置自动重启Driver程序

Standalone:

在spark-submit中增加以下两个参数:

–deploy-mode cluster

–supervise

Spark任务是可以设置自动重启的,但是client模式是不支持自动重启功能,只能是cluster模式支持。而且不同的运行场景,设置自动重启的方式不一样。

–supervise这个参数一旦挂了自动重启。

Yarn:

在spark-submit中增加以下参数:

–deploy-mode cluster

在yarn配置中设置yarn.resourcemanager.am.max-attemps

yarn.resourcemanager.am.max-attemps 3

一定要设置,因为分布式程序,有些时候任务就是因为网络发生抖动

Mesos:

Marathon 可以重启 Mesos应用

步骤二:设置HDFS的checkpoint目录

streamingContext.setCheckpoint(hdfsDirectory)

步骤三:代码实现

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()
           

保证数据不丢失

利用WAL把数据写入到HDFS中。

步骤一:设置checkpoint目录

streamingContext.setCheckpoint(hdfsDirectory)

步骤二:开启WAL日志

sparkConf.set(“spark.streaming.receiver.writeAheadLog.enable”, “true”)

步骤三:需要reliable receiver

当数据写完了WAL后,才告诉数据源数据已经消费,对于没有告诉数据源的数据,可以从数据源中重新消费数据

步骤四:取消备份

使用StorageLevel.MEMORY_AND_DISK_SER来存储数据源,不需要后缀为2的策略了,因为HDFS已经是多副本了。

Reliable Receiver : 当数据接收到,并且已经备份存储后,再发送回执给数据源

Unreliable Receiver : 不发送回执给数据源

可靠数据源:kafka,flume

不可靠数据源:socket

WAL

WAL使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。

对于像kafka和flume这些使用接收器来接收数据的数据源。接收器作为一个长时间的任务运行在executor中,负责从数据源接收数据,如果数据源支持的话,向数据源确认接收到数据,然后把数据存储在executor的内存中,然后driver在exector上运行任务处理这些数据。

如果wal启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持久性,此外,如果只有在数据写入到log中之后接收器才向数据源确认,这样driver重启后那些保存在内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。

推测执行

当一个task很慢容错,开启推测机制:

spark.speculation=true,每隔一段时间来检查有哪些正在运行的task需要重新调度(spark.speculation.interval=100ms),假设总的task有10个,成功的task的数量 > 0.75 * 10(spark.speculation.quantile=0.75),正在运行的task的运行时间 > 1.5 * 成功运行task的平均时间(spark.speculation.multiplier=1.5),则这个正在运行的task需要重新等待调度。

如果出现数据倾斜,对推测执行就会很有很有问题,因为推测执行是解决task任务失败启用的,但任务本身耗时过长或者数据量本身过大,就会造成不断的重复重新调度,任务执行不完。这里特别需要根据执行日志查看问题属于哪里一类,进行相应的调整。

SparkSreaming语义

At most once

       最多被处理一次

       有可能会丢数据

       统计日志里面的数据,看一下趋势

At least once

       处理一次或多次

       有可能会重复处理

       绝大多数业务用的都是这种

Exactly once

       仅一次

       不会丢也不会重

       比较麻烦,代价也比较高

SparkStreaming和Kafka进行整合

SparkStreaming整合Kafka官方文档

方式一:Receiver-based Approach(不推荐使用)

此方法使用Receiver接收数据。Receiver是使用Kafka高级消费者API实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。但是,在默认配置下,此方法可能会在失败时丢失数据(为确保零数据丢失,必须在Spark Streaming中另外启用Write Ahead Logs(在Spark 1.2中引入)。这将同步保存所有收到的Kafka将数据写入分布式文件系统(例如HDFS)上的预写日志,以便在发生故障时可以恢复所有数据,但是性能不好。

pom.xml文件添加如下:

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.3.3
           

核心代码:

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
           

方式二: Direct Approach (No Receivers)

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

3、一次且仅一次的事务机制:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

4、降低资源。

Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。

5、降低内存。

Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。

6、鲁棒性更好。

Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

SparkStreaming与Kafka-0-8整合

支持0.8版本,或者更高的版本

pom.xml文件添加内容如下:

groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.3.3
           

代码实现:

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDirec08 {
  def main(args: Array[String]): Unit = {
    //步骤一:初始化程序入口
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingKafkaApp02")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    val kafkaParams =  Map[String, String](
      "metadata.broker.list"->"hadoop000:9092",
          "group.id" -> "test"
    )
    val topics = "kafka_streaming".split(",").toSet
   //步骤二:获取数据源
    val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    //步骤三:业务代码处理
    lines.map(_._2).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
           

要想保证数据不丢失,最简单的就是靠checkpoint的机制,但是checkpoint机制有个特点,每当代码升级了,checkpoint机制就失效了。所以如果想实现数据不丢失,那么就需要自己管理offset。

工作中遇到的3种情况:

1.代码需要迁移,hadoop1迁到到hadoop2后从当前最新的数据去消费数据。

2.最常见的是,代码写的有问题,改了一个地方,需要重新打包,重新提交。

3.业务逻辑发生了改变,需要修改代码。

checkpoint机制,将offset存到checkpoint目录里,保证数据不丢失(会有重复)。第一种情况能解决,解决不了第二三种。为什么呢?因为hdfs中的checkpoint路径目录,是根据jar包计算出来的唯一的,重新打包就不是原先那个路径了。

那怎么解决呢?

SparkStreaming与Kafka0.8版本整合数据不丢失方案

SparkStreaming与Kafka-0-10整合

支持0.10版本,或者更高的版本

pom.xml文件添加内容如下:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
           

代码实现:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._

object KafkaDirect010 {
  def main(args: Array[String]): Unit = {
    //步骤一:获取配置信息
    val conf = new SparkConf().setAppName("sparkstreamingoffset").setMaster("local[5]")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "5")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    val ssc = new StreamingContext(conf,Seconds(5))

    val brokers = "xxx:9092"
    val topics = "xx_openothers"
    val groupId = "xxx_consumer" //注意,这个也就是我们的消费者的名字
    val topicsSet = topics.split(",").toSet

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "group.id" -> groupId,
      "fetch.message.max.bytes" -> "209715200",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "enable.auto.commit" -> "false"
    )

    //步骤二:获取数据源
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    stream.foreachRDD( rdd =>{
      //步骤三:业务逻辑处理
      val newRDD: RDD[String] = rdd.map(_.value())
      newRDD.foreach( line =>{
        println(line)
      })
      //步骤四:提交偏移量信息,把偏移量信息添加到kafka里
      val offsetRanges  = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
           

高版本的方案,天然的就保证了数据不丢失了。

继续阅读