天天看點

Spark RDD持久化

spark上下文      
package com.mao.scala

import org.apache.spark.{SparkConf, SparkContext}

object WordCountDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setAppName("WordCountSpark");
   // 本地背景排程器
    conf.setMaster("local");
   // conf.setMaster("local[3]");     //3線程,模拟cluster叢集
   // conf.setMaster("local[*]");     //比對cpu個數,
   // conf.setMaster("local[3,2]");   //3:3個線程,2最多重試次數

    //[相當于僞分布式]   
    //conf.setMaster("local-cluster[N, cores, memory]");  //模拟spark叢集。 

    //[完全分布式]
    //StandaloneSchedulerBackend
    //conf.setMaster("spark://s201:7077");    //連接配接到spark叢集上.

    val sc = new SparkContext(conf);
    val rdd1 = sc.textFile("d:/mr/test.txt");
    val rdd2 = rdd1.flatMap(line => line.split(" "));
    val rdd3 = rdd2.map(( _ ,1));
    val rdd4 = rdd3.reduceByKey(_ + _);
    val r = rdd4.collect();
    r.foreach(println);
  }
}
           

RDD持久化

    跨操作進行RDD的記憶體式存儲。

    持久化RDD時,節點上的每個分區都會儲存操記憶體中,以備在其他操作中進行重用。

    緩存技術是疊代式計算和互動式查詢的重要工具。

    使用persist()和cache()進行rdd的持久化。

    cache()是persist()一種.

    action第一次計算時會發生persist().

    spark的cache是容錯的,如果rdd的任何一個分區丢失了,都可以通過最初建立rdd的進行重新計算。

    persist可以使用不同的存儲級别進行持久化。

    MEMORY_ONLY            //隻在記憶體

    MEMORY_AND_DISK

    MEMORY_ONLY_SER        //記憶體存儲(串行化)

    MEMORY_AND_DISK_SER 

    DISK_ONLY            //硬碟

    MEMORY_ONLY_2        //帶有副本 

    MEMORY_AND_DISK_2    //快速容錯。

    OFF_HEAP 

删除持久化資料: rdd.unpersist();

資料傳遞:

     map(),filter()進階函數中通路的對象被串行化到各個節點。每個節點都有一份拷貝。

    變量值并不會回傳到driver程式。

共享變量

spark通過廣播變量和累加器實作共享變量。

    [廣播變量]

        //建立廣播變量

        val bc1 = sc.broadcast(Array(1,2,3))

        bc1.value

    [累加器]

        val ac1 = sc.longaccumulator("ac1")

        ac1.value

        sc.parell..(1 to 10).map(_ * 2).map(e=>{ac1.add(1) ; e}).reduce(_+_)

        ac1.value            //10

    通過spark實作pi的分布式計算

    ----------------------------

    sc.parallelize(1 to 20).map(e=>{val a = 1f / (2 * e - 1) ;val b = if (e % 2 == 0) -1 else 1 ;a * b * 4}).reduce(_+_)

繼續閱讀