RDD重複情況
第一種情況
第一種情況就是就是一個rdd重複計算 比如hdfs—rdd1—rdd2這個流程

第二種情況
rdd1 到 rdd2
rdd1 到 rdd3 這個算子邏輯重複
RDD的重構以及持久化
1. 第一,RDD架構重構與優化
盡量複用rdd,差不多的rdd,可以抽取為一個共同的rdd,供後面RDD計算是,反複使用。
2. 第二,公共RDD一定要持久化
對于多次計算和使用的公共RDD,一定要進行持久化
持久化就是說,将RDD的資料緩存到記憶體中/磁盤中(通過BlockManager緩存),緩存以後,以後無論對這個RDD做多少次計算,那麼都是直接取這個RDD的持久化的資料,比如通過記憶體或者磁盤中,直接取一份資料。
3. 第三,持久化 進行序列化
思考1 如果全部持久化在記憶體中,那麼可能會導緻記憶體占用過大,這樣的話,也許,會導緻OOM記憶體溢出。
記憶體序列化
那麼 當純記憶體無法滿足公共RDD資料完全存放時,就優先考慮,使用序列化的方式在純記憶體中存儲。
将RDD的每個partition的資料,序列化成一個大的位元組資料,就一個對象,序列化後,大大減少記憶體的空間占用
序列化的方式,缺點就是,擷取資料時需要反序列化
如果序列化純記憶體方式,還是導緻OOM,記憶體溢出,就要考慮磁盤的方式
記憶體+磁盤的普通方式(無序列化)
記憶體+磁盤序列化
4. 第四,記憶體充足的情況下,為了資料高可靠行,可以考慮使用雙副本機制,進行持久化
持久化的雙副本機制持久化的一個副本,因為機器當機了,副本丢了,還是要重新計算一次;持久化的每個資料單元,存儲一份副本,放在其他節點上面;進而記性容錯,一個副本丢了,不用重新計算,還可以使用另外一份副本。
針對 記憶體資源極度充足時。
/**
* actionRDD,就是一個公共RDD
* 第一,要用ationRDD,擷取到一個公共的sessionid為key的PairRDD
* 第二,actionRDD,用在了session聚合環節裡面
*
* sessionid為key的PairRDD,是确定了,在後面要多次使用的
* 1、與通過篩選的sessionid進行join,擷取通過篩選的session的明細資料
* 2、将這個RDD,直接傳入aggregateBySession方法,進行session聚合統計
*
* 重構完以後,actionRDD,就隻在最開始,使用一次,用來生成以sessionid為key的RDD
*
*/
JavaRDD<Row> actionRDD = SparkUtils.getActionRDDByDateRange(sqlContext, taskParam);
JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);
三個持久化
sessionid2actionRDD
sessionid2detailRDD
filteredSessionid2AggrInfoRDD
/**
* 持久化,很簡單,就是對RDD調用persist()方法,并傳入一個持久化級别
*
* 如果是persist(StorageLevel.MEMORY_ONLY()),純記憶體,無序列化,那麼就可以用cache()方法來替代
* StorageLevel.MEMORY_ONLY_SER(),第二選擇
* StorageLevel.MEMORY_AND_DISK(),第三選擇
* StorageLevel.MEMORY_AND_DISK_SER(),第四選擇
* StorageLevel.DISK_ONLY(),第五選擇
*
* 如果記憶體充足,要使用雙副本高可靠機制
* 選擇字尾帶_2的政策
* StorageLevel.MEMORY_ONLY_2()
*
*/
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
filteredSessionid2AggrInfoRDD = filteredSessionid2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY());
sessionid2detailRDD = sessionid2detailRDD.persist(StorageLevel.MEMORY_ONLY());