天天看点

Spark Streaming 性能调优指南

要在集群上的Spark Streaming应用程序中获得最佳性能,需要进行一些调整。本文介绍了如何调整参数和配置以提高应用程序性能。这些主要是基于以下两个层次进行考量的。

  • 过有效使用群集资源减少每批数据的处理时间。
  • 设置正确的批处理大小,以便可以按接收到的批处理速度处理一批数据(也就是说,数据处理跟上数据拉取的速度)。

1、减少批处理时间

Spark可以进行很多优化,以最大程度地减少每批的处理时间。这些已在《Spark调优指南》中详细讨论,本文就最重要的一些调优方法进行一次简单介绍。

1.1 数据接收中的并行度

通过网络(例如Kafka,socket等)接收数据需要将数据反序列化并存储在Spark中。如果数据接收成为系统的瓶颈,请考虑并行化数据接收。请注意,每个输入DStream都会创建一个接收器Receiver(在Executor上运行),该接收器接收单个数据流。因此,可以通过创建多个输入DStream并将其配置为从源接收数据流的不同分区来实现接收多个数据流。例如,可以将接收两个Topic的单个Kafka输入DStream拆分为两个Kafka输入流,每个输入流仅接收一个主题。这将运行两个接收器,从而允许并行接收数据,从而提高了总体吞吐量。多个DStream可以结合在一起以创建单个DStream。然后,可以将应用于单个输入DStream的转换应用于统一流。代码做如下。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
           

应该考虑的另一个参数是接收器的块间隔,该间隔由配置参数确定 spark.streaming.blockInterval。对于大多数接收器,接收到的数据在存储在Spark内存中之前会合并为数据块。每批中的块数确定了将在类似map和transform中用于处理接收到的数据的任务数。每批接收器中每个接收器的任务数大约为(批处理间隔/块间隔)。例如,200 ms的块间隔将每2秒批处理创建10个任务。如果任务数太少(即少于每台计算机的核心数),那么它将效率低下,因为将不使用所有可用的核心来处理数据。要增加给定批处理间隔的任务数,请减小阻止间隔。但是,建议的块间隔最小值约为50毫秒,如果小于这个值,可能会造成额外的任务启动开销。

使用多个输入流/接收器接收数据的一种替代方法是显式地对输入数据流进行分区(使用

inputStream.repartition(number of partitions)

)。在进行下一步处理之前,这会将接收到的数据批分布在群集中指定数量的计算机上。

1.2 数据序列化

可以通过调整序列化格式来减少数据序列化的开销。在流传输的情况下,有两种类型的数据推荐序列化。

  • 输入数据:默认情况下,通过Receivers接收的输入数据通过

    StorageLevel.MEMORY_AND_DISK_SER_2

    存储在执行程序的内存中。也就是说,数据被序列化为字节以减少GC开销。同样,数据首先保存在内存中,并且仅在内存不足以容纳流计算所需的所有输入数据时才溢写到磁盘。当然,这种序列化会产生开销–接收器必须对接收到的数据进行反序列化,然后使用Spark的序列化格式对其进行重新序列化。
  • 流操作生成的持久RDD:流计算生成的RDD可以保留在内存中。例如,窗口操作会将数据保留在内存中,因为它们将被多次处理。但是,与Spark Core默认的

    StorageLevel.MEMORY_ONLY

    不同,默认情况下,流式计算生成的持久性RDD与

    StorageLevel.MEMORY_ONLY_SER

    (即序列化)保持一致,以最大程度地减少GC开销。

在这两种情况下,使用Kryo序列化都可以减少CPU和内存的开销。有关更多详细信息,请参见《Spark调优指南》。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅《spark调优指南》中与Kryo相关的配置)。

在流应用程序需要保留的数据量不大的特定情况下,将数据(两种类型)持久化为反序列化对象是可行的,而不会产生过多的GC开销。例如,如果您使用的是几秒钟的批处理间隔并且没有窗口操作,则可以尝试通过显式设置存储级别来禁用持久化数据中的序列化。这将减少由于序列化导致的CPU开销,从而可能在没有太多GC开销的情况下提高性能。

1.3 任务启动开销

如果每秒启动的任务数量很高(例如,每秒50个或更多),则向从服务器发送任务的开销可能会很大,并且将难以实现亚秒级的延迟。可以通过以下更改来减少开销:

  • 执行模式:在独立模式或粗粒度Mesos模式下运行Spark可以比细粒度Mesos模式缩短任务启动时间。有关更多详细信息,请参阅“在Mesos上 运行”指南。

这些更改可以将批处理时间减少100毫秒,从而使亚秒级的批处理大小可行。

2、设置正确的批次间隔

为了使在群集上运行的Spark Streaming应用程序稳定,系统应能够尽快处理接收到的数据。换句话说,应尽快处理一批数据。可以通过监视流式Web UI中的处理时间来找到合适的批次间隔时间 ,其中批处理时间应小于批处理间隔。

根据流计算的性质,所使用的批处理间隔可能会对数据速率产生重大影响,该速率可以由应用程序在一组固定的群集资源上维持。例如,让我们考虑前面的WordCountNetwork示例。对于特定的数据速率,系统可能能够跟上每2秒(即2秒的批处理间隔)但不是每500毫秒报告一次字计数的情况。因此,需要设置批次间隔,以便可以维持生产中的预期数据速率。

找出适合您的应用程序的正确批处理大小的一种好方法是使用保守的批处理间隔(例如5-10秒)和低数据速率进行测试。要验证系统是否能够跟上数据速率,可以检查每个已处理批处理经历的端到端延迟的值(可以在Spark驱动程序log4j日志中查找“ Total delay”,也可以使用 StreamingListener 接口)。如果延迟保持与批次大小相当,则系统是稳定的。否则,如果延迟持续增加,则意味着系统无法跟上,因此不稳定。一旦有了稳定配置的想法,就可以尝试提高数据速率和/或减小批处理大小。注意,由于暂时的数据速率增加而引起的延迟的瞬时增加可能是好的,只要延迟减小回到较低的值(即小于批大小)即可。

3、内存调优

在《spark调优指南》中已详细讨论了调整 Spark应用程序的内存使用情况和GC行为。强烈建议您阅读。在本节中,我们将专门在Spark Streaming应用程序的上下文中讨论一些调整参数。

Spark Streaming应用程序所需的群集内存量在很大程度上取决于所使用的转换类型。例如,如果要对最后10分钟的数据使用窗口操作,则群集应具有足够的内存以将10分钟的数据保存在内存中。或者,如果您想使用updateStateByKey大量的按键,则所需的存储空间会很大。相反,如果您想执行一个简单的map-filter-store操作,则所需的内存将很少。

通常,由于通过接收器接收的数据存储在

StorageLevel.MEMORY_AND_DISK_SER_2

中,因此无法容纳在内存中的数据将溢写到磁盘上。这可能会降低流应用程序的性能,因此建议根据流应用程序的要求提供足够的内存。最好尝试以小规模查看内存使用情况并据此进行估计。

内存调整的另一个方面是垃圾回收。对于需要低延迟的流应用程序,不希望有由JVM垃圾回收引起的大停顿。

有一些参数可以帮助您调整内存使用和GC开销:

  • DStream的持久性级别:如前面的“ 数据序列化”部分所述,默认情况下,输入数据和RDD被持久化为序列化字节。与反序列化的持久性相比,这减少了内存使用和GC开销。启用Kryo序列化可进一步减少序列化的大小和内存使用量。通过压缩可以进一步减少内存使用(请参见Spark配置spark.rdd.compress),而这会占用CPU时间。
  • 清除旧数据:默认情况下,将自动清除DStream转换生成的所有输入数据和持久的RDD。Spark Streaming根据使用的转换来决定何时清除数据。例如,如果您使用10分钟的窗口操作,那么Spark Streaming将保留最后10分钟的数据,并主动丢弃较旧的数据。通过以下设置可以将数据保留更长的时间(例如,以交互方式查询较早的数据):streamingContext.remember。
  • CMS垃圾收集器:强烈建议使用并发标记扫掠GC,以使与GC相关的暂停时间始终保持较低。尽管已知并发GC会降低系统的整体处理吞吐量,但仍建议使用并发GC以实现更一致的批处理时间。确保在驱动程序(使用–driver-java-options中spark-submit)和执行程序(使用Spark配置 spark.executor.extraJavaOptions)上都设置了CMS GC 。

其他提示:为了进一步减少GC开销,请尝试以下更多提示。

  • 使用OFF_HEAP存储级别持久化RDD 。
  • 使用更多具有较小堆大小的excutor。这将减少每个JVM堆中的GC压力。

继续阅读