天天看點

Spark 性能調優 RDD架構以及RDD持久化

RDD重複情況

第一種情況

第一種情況就是就是一個rdd重複計算 比如hdfs—rdd1—rdd2這個流程

Spark 性能調優 RDD架構以及RDD持久化

第二種情況

rdd1 到 rdd2

rdd1 到 rdd3 這個算子邏輯重複

Spark 性能調優 RDD架構以及RDD持久化

RDD的重構以及持久化

1. 第一,RDD架構重構與優化

盡量複用rdd,差不多的rdd,可以抽取為一個共同的rdd,供後面RDD計算是,反複使用。

2. 第二,公共RDD一定要持久化

對于多次計算和使用的公共RDD,一定要進行持久化

持久化就是說,将RDD的資料緩存到記憶體中/磁盤中(通過BlockManager緩存),緩存以後,以後無論對這個RDD做多少次計算,那麼都是直接取這個RDD的持久化的資料,比如通過記憶體或者磁盤中,直接取一份資料。

3. 第三,持久化 進行序列化

思考1 如果全部持久化在記憶體中,那麼可能會導緻記憶體占用過大,這樣的話,也許,會導緻OOM記憶體溢出。

記憶體序列化

那麼 當純記憶體無法滿足公共RDD資料完全存放時,就優先考慮,使用序列化的方式在純記憶體中存儲。

将RDD的每個partition的資料,序列化成一個大的位元組資料,就一個對象,序列化後,大大減少記憶體的空間占用

序列化的方式,缺點就是,擷取資料時需要反序列化

如果序列化純記憶體方式,還是導緻OOM,記憶體溢出,就要考慮磁盤的方式

記憶體+磁盤的普通方式(無序列化)

記憶體+磁盤序列化

4. 第四,記憶體充足的情況下,為了資料高可靠行,可以考慮使用雙副本機制,進行持久化

持久化的雙副本機制持久化的一個副本,因為機器當機了,副本丢了,還是要重新計算一次;持久化的每個資料單元,存儲一份副本,放在其他節點上面;進而記性容錯,一個副本丢了,不用重新計算,還可以使用另外一份副本。

針對 記憶體資源極度充足時。

/**
		 * actionRDD,就是一個公共RDD
		 * 第一,要用ationRDD,擷取到一個公共的sessionid為key的PairRDD
		 * 第二,actionRDD,用在了session聚合環節裡面
		 *
		 * sessionid為key的PairRDD,是确定了,在後面要多次使用的
		 * 1、與通過篩選的sessionid進行join,擷取通過篩選的session的明細資料
		 * 2、将這個RDD,直接傳入aggregateBySession方法,進行session聚合統計
		 *
		 * 重構完以後,actionRDD,就隻在最開始,使用一次,用來生成以sessionid為key的RDD
		 *
		 */
		JavaRDD<Row> actionRDD = SparkUtils.getActionRDDByDateRange(sqlContext, taskParam);
		JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);
           

三個持久化

sessionid2actionRDD

sessionid2detailRDD

filteredSessionid2AggrInfoRDD

/**
		 * 持久化,很簡單,就是對RDD調用persist()方法,并傳入一個持久化級别
		 *
		 * 如果是persist(StorageLevel.MEMORY_ONLY()),純記憶體,無序列化,那麼就可以用cache()方法來替代
		 * StorageLevel.MEMORY_ONLY_SER(),第二選擇
		 * StorageLevel.MEMORY_AND_DISK(),第三選擇
		 * StorageLevel.MEMORY_AND_DISK_SER(),第四選擇
		 * StorageLevel.DISK_ONLY(),第五選擇
		 *
		 * 如果記憶體充足,要使用雙副本高可靠機制
		 * 選擇字尾帶_2的政策
		 * StorageLevel.MEMORY_ONLY_2()
		 *
		 */
		sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());

           
filteredSessionid2AggrInfoRDD = filteredSessionid2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY());

sessionid2detailRDD = sessionid2detailRDD.persist(StorageLevel.MEMORY_ONLY());
           

繼續閱讀