天天看點

死磕spark中的job、stage、task

寫在前面

台風夜的電話面試裡被問到了spark運作任務的過程中stage的劃分依據。一下子就給整懵了,支支吾吾答非所問。從事大資料的開發也有一年半光景,spark任務的運作原理依舊知之甚少。是以就參閱各種優秀的文章,再配上一個自己工作中的實際項目,特意整理出這篇筆記,以此警示自己的自大與無知。

測試環境

本地開發環境

  • idea 2019.1.2
  • maven 3.6
  • spark 2.4.3
  • scala 2.1.8
  • jdk1.8

測試叢集環境

  • spark 2.4.3
  • scala 2.1.8
  • jdk1.8
  • hadoop2.7.4
死磕spark中的job、stage、task

測試項目例子

計算某一天店鋪銷售額\時段銷售額top10

樣例資料字段格式

filed1|filed2|filed3|store_no|filed5|filed6|filed7|filed8|amount|filed10|filed11|sale_time
           

這裡不提供具體的測試資料,實驗過程中需要自己模拟所用的資料。

樣例demo

package com.dr.leo

import com.dr.leo.utils.StrUtils
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.SparkSession

/**
  * @author leo.jie ([email protected])
  * @organization DataReal
  * @version 1.0
  * @website https://www.jlpyyf.com
  * @date 2019-07-28 20:29
  * @since 1.0
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("WordCount")
      //.master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    val fileRddOri = loadFileToRdd(sc, "hdfs://leo/test/pos.DAT")
    val fileRdd = fileRddOri.map(x => for (data <- x._2.split("\\|")) yield if (data == null) "" else data.trim)
      .filter(x => x.length == 12)
      .map(x => (retailer_shop_code(x(3)), x(10).substring(10, 13), x(8).toFloat))
      .map(x => ((x._1, x._2), x._3))
      .reduceByKey(_ + _)

    fileRdd.top(10)(Ordering.by(e => e._2)).foreach(println(_))
    println("##########################################################")
    fileRdd.map(x => (x._1._1, x._2)).reduceByKey(_ + _).top(10)(Ordering.by(e => e._2)).foreach(println(_))
    println("##########################################################")
    println(fileRdd.count())
    println("##########################################################")
    println(fileRdd.first())
    println("##########################################################")
    fileRdd.take(10).foreach(println(_))
    while (true) {
      ;
    }
    spark.stop()
  }

  /**
    * 讀取gbk編碼的file
    * @param sc
    * @param path
    * @param encoding
    * @return
    */
  def loadFileToRdd(sc: SparkContext, path: String, encoding: String = "GBK"): RDD[(String, String, Int)] = {
    sc.hadoopFile[LongWritable, Text, TextInputFormat](path)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {
        val file = inputSplit.asInstanceOf[FileSplit]
        iterator.filter(x => x._2 != null).map(x => {
          (file.getPath.getName, new String(x._2.getBytes, 0, x._2.getLength, encoding), 1)
        })
      })
  }

  /**
    * 隻是一個店鋪号轉換的函數
    * @param retailer_shop_code
    * @return
    */
  def retailer_shop_code(retailer_shop_code: String): String = {
    if (StrUtils.isBlank(retailer_shop_code)) ""
    else if (retailer_shop_code.length == 5) retailer_shop_code.substring(0, retailer_shop_code.length - 1).toUpperCase()
    else if (retailer_shop_code.length == 6) retailer_shop_code.substring(0, retailer_shop_code.length - 2).toUpperCase()
    else if (retailer_shop_code.length == 8) retailer_shop_code.substring(0, retailer_shop_code.length - 2).toUpperCase()
    else retailer_shop_code
  }
}

           

運作測試

程式打包後發往叢集送出任務。所用指令

[[email protected] leo_demo]$ spark-submit --master yarn --deploy-mode cluster --driver-memory 2G --driver-cores 2 --executor-memory 2g --num-executors 5 --executor-cores 2 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.network.timeout=30000 --class com.dr.leo.WordCount leo-study-spark.jar
           

spark-ui上的資訊告訴了我們什麼?

檢視任務的運作資訊

spark任務運作的過程中,我們可以點選

ApplicationMaster

跳轉任務運作的界面。

死磕spark中的job、stage、task

運作流程之:job

死磕spark中的job、stage、task

此時我們送出的任務的所有job都已經運作成功,隻因為程式中任務執行完畢後是一段無限循環,是以這個界面會一直存在,直到我們手動在yarn上kill掉這個application。

我們寫的代碼被送出運作的過程中,會先被劃分為一個又一個job,這些job按照被劃分的先後順序會依次執行。

圖示中我們已經知道,我們送出的任務,最終被劃分成了5個job。

所謂一個job,就是由一個rdd action觸發的動作。簡單了解為,當你需要執行一個rdd的action操作的時候,就會生成一個job。

這裡不會贅述什麼是rdd的action操作。

結合代碼與圖示我們可以知道:

job-0 的産生是由于觸發了top操作

top at WordCount.scala:33

job-1 的産生是由于觸發了top操作

top at WordCount.scala:35

job-2 的産生是由于觸發了count操作

count at WordCount.scala:37

job-3 的産生是由于觸發了first操作

first at WordCount.scala:39

job-4 的産生是由于觸發了take操作

take at WordCount.scala:41

運作流程之:stage

選擇任意一個job,點選連結去檢視該job的detail,這裡我們選擇job-0。

死磕spark中的job、stage、task

由圖示我們可知,job-0由兩個stage組成,并且每個stage都有8個task,說明每個stage的資料都在8個partition上。

下面我們将詳細說明stage以及stage的劃分依據。

stage 概念、劃分

貌似沒有十分明确的概念來十厘清楚地說明spark stage究竟是什麼?這裡隻記錄從衆多優秀的部落格裡提取的直言片語,以及本人的一點見解。

stage的劃分是以shuffle操作作為邊界的。也就是說某個action導緻了shuffle操作,就會劃分出兩個stage。

stage的劃分在RDD的論文中也有詳細介紹,簡單的說是以shuffle和result這兩種類型來劃分。在spark中有兩類task,一類是shuffle map task,一類是result task,第一類task的輸出是shuffle所需資料,第二類task的輸出是result,stage的劃分也依次為依據,shuffle之前的所有變換是一個stage,shuffle之後的操作是另一個stage。比如 rdd.parallize(1 to 10).foreach(println) 這個操作沒有shuffle,直接就輸出了,那麼隻有它的task是resultTask,stage也隻有一個;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job因為有reduce,是以有一個shuffle過程,那麼reduceByKey之前的是一個stage,執行shuffleMapTask,輸出shuffle所需的資料,reduceByKey到最後是一個stage,直接就輸出結果了。如果job中有多次shuffle,那麼每個shuffle之前都是一個stage。

在DAGScheduler中,會将每個job劃分成多個stage,每個stage會建立一批task并且計算task的最佳位置,一個task對應一個partition。DAGScheduler的stage劃分算法如下:它會從觸發action操作的那個RDD開始往前推,首先會為最後一個RDD建立一個stage,然後往前倒推的時候,如果發現對某個RDD是寬依賴,那麼就會将寬依賴的那個RDD建立一個新的stage,那個RDD就是新的stage的最後一個RDD,然後依次類推,繼續往前倒推,根據窄依賴或者寬依賴進行stage的劃分,直到所有的RDD全部周遊完成為止。

spark任務會根據RDD之間的依賴關系,形成一個DAG有向無環圖,DAG會送出給DAGScheduler,DAGScheduler會把DAG劃分成互相依賴的多個stage,劃分stage的依據就是RDD之間的寬窄依賴。

遇到寬依賴就劃分stage

,每個stage包含一個或多個task任務。然後将這些task以task set的形式交給TaskScheduler運作。

stage是由一組并行的task組成

窄依賴

父RDD和子RDD partition之間的關系是一對一的。不會有shuffle的産生。父RDD的一個分區去到子RDD的一個分區中。

寬依賴

父RDD與子RDD partition之間的關系是一對多的。會有shuffle的産生。父RDD的一個分區去到子RDD的不同分區裡面。

其實區分寬窄依賴,主要就是看父RDD的一個partition的流向,要是流向一個的話就是窄依賴,流向多個的話就是寬依賴。以WordCount為例,看圖了解:

死磕spark中的job、stage、task

運作流程之:task

task是stage下的一個任務執行單元,一般來說,一個rdd有多少個partition,就會有多少個task,因為每一個task隻是處理一個partition上的資料。

繼續閱讀