在spark中,有時候我們覺得task并行度太小,就想着提高其并行度。
首先,先說一下有多少種增加分區提高并行度的方法:
1,textFile(path, numPartion=partitionNum)
2,增加hdfs上的block數
3,reduceByKey groupByKey shuffle算子可以指定傳回的RDD的分區數,如reduceByKey(+, 10)
4,重分區 coalesce repartition
5,自定義分區器 partionBy
總體上來說,就以上五類方法。下面我們就來做進一步的講解。
需要強調的是,所謂并行度,我們并不是說增加分區就是增加并行度,而是通過增加分區,運作更多的task,這樣來增加并行度。
這裡有一個重要的概念就是每一個stage task的個數與這個stage中的最後一個RDD的分區相關。
如圖中,rdd1、rdd2和rdd3三個rdd屬于同一個stage,由三個task執行。
可能這裡就有讀者會問道,為什麼是最後一個rdd決定,而不是前面的rdd決定的呢? 下面将展示一個特殊案例:
圖中可以看到,rdd0和rdd1通過union得到rdd3,一種産生4個task,而rdd1和rdd0都隻有2個分區。那麼這就很好的解釋了為什麼是最後一個rdd的分區決定task的數量了吧。
另外的,鑒于前三種增加rdd分區的方式相對好了解。這裡重點解釋一下repartition和coalesce兩個增加分區的算子。
對于repartion:
用法是rdd.repartition(partitonNum)
從圖中可以直接了解到,repartition有一個shuffle的過程。即将每一個partition中的資料,循環周遊,分到對應的分區中。同理,對應到減小分區也是這樣操作。
對于coalesce:
用法 coalese(partitionNum, True/False)
其中,partitionNum表示新分區個數,True和False表示是否shuffle。
圖中的結構,表示将3分區重新分區為2分區。可以用rdd1.coalesce(2, False)來分區。但是這裡,如果我們想要将rdd2變為4分區,則語句為rdd1.coalesce(4, True),需要注意的是,分區增加就必須将是否shuffle設定為True了。
總結:
如果想要增加rdd的分區,必須使用帶有shuffle的重分區方式,repartition/coalesce(num, true)
如果想要減少rdd的分區,可以不使用帶有shuffle的重分區方式,coalesce(num, false)