天天看點

Spark踩坑記——從RDD看叢集排程

目錄

  • 前言
  • RDD詳談
    • RDD存儲結構
    • RDD的操作
      • Transformation
      • Action
    • RDD依賴方式
      • 窄依賴(Narrow Dependency)
      • Shuffle依賴(寬依賴 Shffle/Wide Dependency)
  • 叢集部署
    • 元件
    • 部署方式
    • 叢集部署舉例
  • 從RDD看叢集任務排程
    • Spark監控界面
  • 踩坑小記
    • Driver程式崩潰
    • kafka編碼錯誤
  • 總結

在Spark的使用中,性能的調優配置過程中,查閱了很多資料,本文的思路是從spark最細節的本質,即核心的資料結構RDD出發,到整個Spark叢集宏觀的排程過程做一個整理歸納,從微觀到宏觀兩方面總結,友善自己在調優過程中找尋問題,理清思路,也加深自己對于分布式程式開發的了解。(有任何問題和纰漏還請各位大牛指出啦,我會第一時間改正)

在Spark開山之作"Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"的這篇paper中(以下簡稱RDD Paper),Matei等提出了RDD這種資料結構,文中開頭對RDD定義是:

A distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.

也就是說RDD設計的核心點為:

  • 記憶體計算
  • 适合于計算機叢集
  • 有容錯方式

文中提到了對于RDD設計的最大挑戰便是在提供有效的容錯機制(fault tolerance efficiently),之前存在的基于記憶體存儲的叢集抽象,例如分布式共享記憶體、鍵值存儲、資料庫等,更多是細粒度的(fine-grained)更新一個可變狀态表,而其容錯方式通常為在機器間進行資料複制或者日志更新,而這些方式很明顯會造成機器負載加大以及大量的網絡傳輸開銷。

而RDD則使用了粗粒度的(coarse-grained)轉換,即對于很多相同的資料項使用同一種操作(如map/filter/join),這種方式能夠通過記錄RDD之間的轉換進而刻畫RDD的繼承關系(lineage),而不是真實的資料,最終構成一個DAG(有向無環圖),而如果發生RDD丢失,RDD會有充足的資訊來得知怎麼從其他RDDs重新計算得到。

這也是RDD設計的核心理念,接下來圍繞這一理念我們來剖析,看RDD是怎麼實作這種高效的容錯機制的。

RDD實作的資料結構核心是一個五元組,如下表:

屬性 說明
分區清單-partitions 每個分區為RDD的一部分資料
依賴清單-dependencies table存儲其父RDD即依賴RDD
計算函數-compute 利用父分區計算RDD各分區的值
分區器-partitioner 指明RDD的分區方式(hash/range)
分區位置清單-preferredLocations 指明分區優先存放的結點位置

其中每個屬性的代碼如下:

// RDD中的依賴關系由一個Seq資料集來記錄,這裡使用Seq的原因是經常取第一個元素或者周遊
private var dependencies_: Seq[Dependency[_]] = null

// 分區清單定義在一個數組中,這裡使用Array的原因是随時使用下标來通路分區内容
// @transient分區清單不需要被序列化
@transient private var partitions_: Array[Partition] = null

// 接口定義,具體由子類實作,對輸入的RDD分區進行計算
def compute(split: Partition, context: TaskContext): Iterator[T]

// 分區器
// 可選,子類可以重寫以指定新的分區方式,Spark支援Hash和Range兩種分區方式
@transient val partitioner: Option[Partitioner] = None

// 可選,子類可以指定分區的位置,如HadoopRDD可以重寫此方法,讓分區盡可能與資料在相同的節點上
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
           

在RDD Paper中,作者提到在抽象RDD時,一個很重要的點便是如何使得RDD能夠記錄RDD之間的繼承依賴關系(lineage),這種繼承關系來自豐富的轉移(Transformation)操作。是以作者提出了一種基于圖的表示方式來實作這個目标,這也正是上面RDD五種屬性的核心作用。

這五種屬性從spark誕生到新的版本疊代,一直在使用,沒有增加也沒有減少,是以可以說Spark的核心就是RDD,而RDD的核心就是這五種屬性。

在Spark踩坑記——初試中對RDD的操作也進行了簡單說明,在Spark中,對RDD的操作可以分為Transformation和Action兩種,我們分别進行整理說明:

對于Transformation操作是指由一個RDD生成新RDD的過程,其代表了是計算的中間過程,其并不會觸發真實的計算。

  • map(f:T=>U) : RDD[T]=>RDD[U]

    傳回一個新的分布式資料集,由每個原元素經過func函數轉換後組成

  • filter(f:T=>Bool) : RDD[T]=>RDD[T]

    傳回一個新的資料集,由經過func函數後傳回值為true的原元素組成

  • flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U])

    類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(是以,func函數的傳回值是一個Seq,而不是單一進制素)

  • sample(withReplacement: Boolean, fraction: Double, seed: Long) : RDD[T]=>RDD[T]

    sample将RDD這個集合内的元素進行采樣,擷取所有元素的子集。使用者可以設定是否有放回的抽樣、百分比、随機種子,進而決定采樣方式。

    withReplacement=true, 表示有放回的抽樣;

    withReplacement=false, 表示無放回的抽樣。

    如下圖:

    Spark踩坑記——從RDD看叢集排程
    每個方框是一個RDD分區。通過sample函數,采樣50%的資料。V1、V2、U1、U2、U3、U4采樣出資料V1和U1、U4,形成新的RDD。
  • groupByKey([numTasks]) : RDD[(K,V)]=>RDD[(K,Seq[V])]

    在一個由(K,V)對組成的資料集上調用,傳回一個(K,Seq[V])對的資料集。注意:

    1. 預設情況下,使用與父RDD的partition數量對應的并行任務進行分組,也可以傳入numTask可選參數,根據資料量設定不同數目的Task。
    2. 另外如果相同key的value求和或者求平均,那麼使用reduceByKey性能更好
  • reduceByKey(f:(V,V)=>V, [numTasks]) : RDD[(K, V)]=>RDD[(K, V)]

    在一個(K,V)對的資料集上使用,傳回一個(K,V)對的資料集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。

  • union(otherDataset) : (RDD[T],RDD[T])=>RDD[T]

    傳回一個新的資料集,由原資料集和參數聯合而成

  • join(otherDataset, [numTasks]) : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(V,W))]

    傳回key值相同的所有比對對,如下圖:

    Spark踩坑記——從RDD看叢集排程
    join操作會将兩個RDD中相同key值的合并成key,pair(value1, value2)的形式。
  • cogroup() : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(Seq[V],Seq[W]))]

    cogroup函數将兩個RDD進行協同劃分。對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分别聚合為一個集合,并且傳回兩個RDD中對應Key的元素集合的疊代器(K, (Iterable[V], Iterable[w]))。其中,Key和Value,Value是兩個RDD下相同Key的兩個資料集合的疊代器所構成的元組。

  • cartesian(otherDataset) : (RDD[T],RDD[U])=>RDD[(T,U)]

    笛卡爾積。但在資料集T和U上調用時,傳回一個(T,U)對的資料集,所有元素互動進行笛卡爾積。

  • sortByKey([ascending], [numTasks]) : RDD[(K,V)]=>RDD[(K,V)]

    根據key值進行排序,如果ascending設定為true則按照升序排序

  • repartition(numPartitions) :

    對RDD中的所有資料進行shuffle操作,建立更多或者更少的分區使得更加平衡。往往需要通過網絡進行資料傳輸

不同于Transformation操作,Action代表一次計算的結束,不再産生新的RDD,将結果傳回到Driver程式。是以Transformation隻是建立計算關系,而Action才是實際的執行者。每個Action都會調用SparkContext的runJob方法向叢集正式送出請求,是以每個Action對應一個Job。

  • count() : RDD[T]=>Long

    傳回資料集的元素個數

  • countByKey() : RDD[T]=>Map[T, Long]

    對(K,V)類型的RDD有效,傳回一個(K,Int)對的Map,表示每一個key對應的元素個數

  • collect() : RDD[T]=>Seq[T]

    在Driver中,以數組的形式,傳回資料集的所有元素。這通常會在使用filter或者其它操作并傳回一個足夠小的資料子集後再使用會比較有用。

  • reduce(f:(T,T)=>T) : RDD[T]=>T

    通過函數func(接受兩個參數,傳回一個參數)聚集資料集中的所有元素。這個功能必須可交換且可關聯的,進而可以正确的被并行執行。

  • saveAsTextFile(path:String)

    将資料集的元素,以textfile的形式,儲存到本地檔案系統,HDFS或者任何其它hadoop支援的檔案系統。對于每個元素,Spark将會調用toString方法,将它轉換為檔案中的文本行

  • saveAsSequenceFile(path:String)

    将資料集的元素,以Hadoop sequencefile的格式,儲存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支援的檔案系統。這個隻限于由key-value對組成,并實作了Hadoop的Writable接口,或者隐式的可以轉換為Writable的RDD。(Spark包括了基本類型的轉換,例如Int,Double,String,等等)

  • saveAsObjectFile(path:String)

    利用Java的Serialization接口進行持久化操作,之後可以使用SparkContext.objectFile()重新load回記憶體

  • take(n)

    傳回一個由資料集的前n個元素組成的數組。注意,這個操作目前并非并行執行,而是由驅動程式計算所有的元素

  • takeSample(withReplacement, num, [seed])

    傳回一個數組,在資料集中随機采樣num個元素組成,可以選擇是否用随機數替換不足的部分,Seed用于指定的随機數生成器種子

  • takeOrdered(n, [ordering])

    傳回前n個元素,可以使用元素的自然順序,也可以使用使用者自定義comparator

  • first()

    傳回資料集的第一個元素(類似于take(1))

  • foreach(func)

    在資料集的每一個元素上,運作函數func進行更新。這通常用于邊緣效果,例如更新一個累加器,或者和外部存儲系統進行互動,例如HBase。關于foreach我在Spark踩坑記——資料庫(Hbase+Mysql)中對sparkstreaming的foreach操作有詳細整理

RDD 的容錯機制是通過記錄更新來實作的,且記錄的是粗粒度的轉換操作。在外部,我們将記錄的資訊稱為血統(Lineage)關系,而到了源碼級别,Apache Spark 記錄的則是 RDD 之間的依賴(Dependency)關系。在一次轉換操作中,建立得到的新 RDD 稱為子 RDD,提供資料的 RDD 稱為父 RDD,父 RDD 可能會存在多個,我們把子 RDD 與父 RDD 之間的關系稱為依賴關系,或者可以說是子 RDD 依賴于父 RDD。

依賴隻儲存父 RDD 資訊,轉換操作的其他資訊,如資料處理函數,會在建立 RDD 時候,儲存在新的 RDD 内。依賴在 Apache Spark 源碼中的對應實作是 Dependency 抽象類。

Apache Spark 将依賴進一步分為兩類,分别是窄依賴(Narrow Dependency)和 Shuffle 依賴(Shuffle Dependency,在部分文獻中也被稱為 Wide Dependency,即寬依賴)。

窄依賴中,父 RDD 中的一個分區最多隻會被子 RDD 中的一個分區使用,換句話說,父 RDD 中,一個分區内的資料是不能被分割的,必須整個傳遞給子 RDD 中的一個分區。下圖展示了幾類常見的窄依賴及其對應的轉換操作。

Spark踩坑記——從RDD看叢集排程

Shuffle 依賴中,父 RDD 中的分區可能會被多個子 RDD 分區使用。因為父 RDD 中一個分區内的資料會被分割,發送給子 RDD 的所有分區,是以 Shuffle 依賴也意味着父 RDD 與子 RDD 之間存在着 Shuffle 過程。下圖展示了幾類常見的Shuffle依賴及其對應的轉換操作。

Spark踩坑記——從RDD看叢集排程

需要說明的是,依賴關系時RDD到RDD之間的一種映射關系,是兩個RDD之間的依賴,那麼如果在一次操作中涉及到多個父RDD,也有可能同時包含窄依賴和Shuffle依賴,如join操作:

Spark踩坑記——從RDD看叢集排程

說到Spark叢集的部署,我們先來讨論一下Spark中一些關鍵的元件,在我的博文Spark踩坑記——初試中,我對Master/Worker/Driver/Executor幾個關鍵概念做了闡述。首先,先上官方文檔中的一張圖:

Spark踩坑記——從RDD看叢集排程

官方文檔對其中的術語進行了總結,如下表:

Spark踩坑記——從RDD看叢集排程

從官方文檔摘抄了這麼多東東,對Spark中基本的叢集結構,以及一個程式送出到Spark後的排程情況我們有了了解。

對于叢集的部署方式,Spark提供了多種叢集部署方式,如下:

  • Local模式:本地調試的一種模式,可以在一台機器上完成程式的運作與調試
  • Standalone模式:即獨立模式,自帶完整的服務,可單獨部署到一個叢集中,無需依賴任何其他資源管理系統。
  • Spark On YARN模式:将Spark搭建在Hadoop之上,由hadoop中的yarn負責資源調配,Spark負責計算任務;
  • Spark On Mesos模式:這是很多公司采用的模式,官方推薦這種模式(當然,原因之一是血緣關系)。正是由于Spark開發之初就考慮到支援Mesos,是以,目前而言,Spark運作在Mesos上會比運作在YARN上更加靈活,更加自然。目前在Spark On Mesos環境中,使用者可選擇兩種排程模式之一運作自己的應用程式。

由于在我平時的使用中,是直接采用的Standalone的部署方式,我這裡将部署的架構做一個簡單的介紹,其他部署方式其實可以做一些參考來進行搭配部署:

Spark踩坑記——從RDD看叢集排程

假設我們的網段為10.214.55.x,其中1、2、3機器我們用作叢集節點,4和5位master節點,這裡我們用到了zookeeper,關于zookeeper的介紹大家可以在網上搜搜,我們這裡加入zk的目的就是master節點如果崩潰後進行一個主備切換,保證叢集能夠繼續正常運作。如果我們在1送出我們的應用,那麼2和3就将作為我們的worker節點參與運算。而關于配置檔案中需要的具體配置項可以參考官方文檔:Spark Standalone Mode

上文我們從微觀和宏觀兩個角度對Spark進行了總結,RDD以及RDD的依賴,Spark叢集以及部署,那麼當我們在送出了一個任務或者說Application到Spark叢集時,它是怎麼運作的呢?

  • 首先我們通過maven或者sbt等,将我們的應用以及其依賴的jar包完整的打包,利用spark-submit指令将jar送出到spark;
  • 送出程式的這個Spark節點會作為Driver節點,并從Cluster Manager中擷取資源;
  • 程式會在worker節點中獲得executor用來執行我們的任務;
  • 在spark程式中每次RDD的action變換會産生一個新的job,每個job包含多個task;
  • 而RDD在進行Transformation時,會産生新的stage;
  • task會被送往各個executor運作;
  • 而最終的計算結果會回到driver節點進行彙總并輸出(如reduceByKey)。

針對這個過程,我們可以從微觀和宏觀兩個角度把控,将RDD的操作依賴關系,以及task在叢集間的配置設定情況綜合起來看,如下圖:

Spark踩坑記——從RDD看叢集排程

在送出Spark任務時,我們可以在送出指令中加入一項參數--conf spark.ui.port=xxxx,其中"xxxx"為你需要的端口号,這樣在浏覽器中我們就可以利用Spark提供的UI界面對Application的運作情況進行監控如下圖:

Spark踩坑記——從RDD看叢集排程
Spark踩坑記——從RDD看叢集排程

在spark平時的使用過程當中,由于程式在整個叢集當中奔跑,經常會遇到很多莫名其妙的錯誤,有時候通過日志給定的錯誤很難真的定位到真正的原因,那叫一個憂傷阿T^T

出現這類錯誤,往往日志中會提到JVM。在Spark中大多數操作會分擔到各個結點的worker進行計算,但是對于shuffle類操作,如我們經常會用的reduceByKey或者collect等,都會使得spark将所有結點的資料彙總到driver進行計算,這樣就會導緻driver需要遠大于正常worker的記憶體,是以遇到這類問題,最先可以考慮的便是增加driver結點的記憶體,增加方式如下:

--driver-memory 15g
           

在利用spark streaming的python版本,消費kafka資料的時候,遇到類似下面的問題:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x85 in position 87: invalid start byte
           

我們知道python2中的字元串形式有兩種即unicode形式和普通str形式,通過反複分析日志和檢視kafka.py的源碼找到了問題所在。首先在pyspark的kafka API中,找到createStream函數的如下說明:

Spark踩坑記——從RDD看叢集排程

圖中紅框内清楚的說明了,在解析kafka傳來的資料的時候,預設使用了utf8_decoder函數,那這個東東是個什麼玩意呢,找到kafka.py的源碼,其定義如下:

# 預設解碼器
def utf8_decoder(s):
    """ Decode the unicode as UTF-8 """
    if s is None:
        return None
    return s.decode('utf-8')

class KafkaUtils(object):

    @staticmethod
    def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                     keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
        """
        Create an input stream that pulls messages from a Kafka Broker.
        :param ssc:  StreamingContext object
        :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
        :param groupId:  The group id for this consumer.
        :param topics:  Dict of (topic_name -> numPartitions) to consume.
                        Each partition is consumed in its own thread.
        :param kafkaParams: Additional params for Kafka
        :param storageLevel:  RDD storage level.
        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
        :return: A DStream object
        """
        if kafkaParams is None:
            kafkaParams = dict()
        kafkaParams.update({
            "zookeeper.connect": zkQuorum,
            "group.id": groupId,
            "zookeeper.connection.timeout.ms": "10000",
        })
        if not isinstance(topics, dict):
            raise TypeError("topics should be dict")
        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
        helper = KafkaUtils._get_helper(ssc._sc)
        jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
        stream = DStream(jstream, ssc, ser)
        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
...
           

我們看到預設的解碼器直接調用了s.decode,那麼當kafka傳來的資料中有非utf8編碼的字元時,整個stage就會挂掉,是以修改如下:

def my_uft8_decoder(s):
    if s is None:
        return None
    try:
        return s.decode('utf-8', 'replace')
    except Exception, e:
        print e;
        return None

# 建立stream時傳入
kafkaStream = KafkaUtils.createStream(ssc, \
     conf.kafka_quorum, conf.kafka_consumer_group, {conf.kafka_topic:conf.spark_streaming_topic_parallelism}, {
        "auto.commit.interval.ms":"50000",
        "auto.offset.reset":"smallest",
        },
        StorageLevel.MEMORY_AND_DISK_SER,
        valueDecoder=my_uft8_decoder
)

           

如果采用createDirectStream來建立context與此類似,不再贅述。是以在pyspark的kafka消費中遇到解碼問題可以關注一下這裡。

挺長的一篇整理,前後拖了很久。本篇博文我的構思主要就是,當我們送出了一個應用到Spark時,我們需要大緻了解Spark做了什麼,這裡我并沒有分析源碼(因為我木有看哈哈)。從最微觀的RDD的操作,到宏觀的整個叢集的排程運算,這樣從RDD看叢集排程就有了一個整體的認識,當遇到問題的時候就更容易排查,遇到性能拼瓶頸也容易查找。OK,這就是這篇博文的全部整理哈,其中末尾部分闡述了在實際項目中遇到的一些問題和坑,如果有相似的問題的朋友可以參考下。

做個小廣告,項目是WeTest輿情,企鵝風訊,感興趣的歡迎大家來踩踩:

http://wetest.qq.com/bee/

參考文獻:

  1. 《Spark最佳實踐》陳歡 林世飛(鵝廠大神的作品v)
  2. Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation. USENIX Association, 2012: 2-2.
  3. spark源碼閱讀
  4. 【Spark】RDD操作詳解2——值型Transformation算子
  5. Spark Programming Guide
  6. Spark 開發指南
  7. pyspark.streaming module
  8. RDD 依賴
  9. Cluster Mode Overview
  10. Apache Spark探秘:三種分布式部署方式比較

繼續閱讀