spark上下文
package com.mao.scala
import org.apache.spark.{SparkConf, SparkContext}
object WordCountDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setAppName("WordCountSpark");
// 本地背景排程器
conf.setMaster("local");
// conf.setMaster("local[3]"); //3線程,模拟cluster叢集
// conf.setMaster("local[*]"); //比對cpu個數,
// conf.setMaster("local[3,2]"); //3:3個線程,2最多重試次數
//[相當于僞分布式]
//conf.setMaster("local-cluster[N, cores, memory]"); //模拟spark叢集。
//[完全分布式]
//StandaloneSchedulerBackend
//conf.setMaster("spark://s201:7077"); //連接配接到spark叢集上.
val sc = new SparkContext(conf);
val rdd1 = sc.textFile("d:/mr/test.txt");
val rdd2 = rdd1.flatMap(line => line.split(" "));
val rdd3 = rdd2.map(( _ ,1));
val rdd4 = rdd3.reduceByKey(_ + _);
val r = rdd4.collect();
r.foreach(println);
}
}
RDD持久化
跨操作進行RDD的記憶體式存儲。
持久化RDD時,節點上的每個分區都會儲存操記憶體中,以備在其他操作中進行重用。
緩存技術是疊代式計算和互動式查詢的重要工具。
使用persist()和cache()進行rdd的持久化。
cache()是persist()一種.
action第一次計算時會發生persist().
spark的cache是容錯的,如果rdd的任何一個分區丢失了,都可以通過最初建立rdd的進行重新計算。
persist可以使用不同的存儲級别進行持久化。
MEMORY_ONLY //隻在記憶體
MEMORY_AND_DISK
MEMORY_ONLY_SER //記憶體存儲(串行化)
MEMORY_AND_DISK_SER
DISK_ONLY //硬碟
MEMORY_ONLY_2 //帶有副本
MEMORY_AND_DISK_2 //快速容錯。
OFF_HEAP
删除持久化資料: rdd.unpersist();
資料傳遞:
map(),filter()進階函數中通路的對象被串行化到各個節點。每個節點都有一份拷貝。
變量值并不會回傳到driver程式。
共享變量
spark通過廣播變量和累加器實作共享變量。
[廣播變量]
//建立廣播變量
val bc1 = sc.broadcast(Array(1,2,3))
bc1.value
[累加器]
val ac1 = sc.longaccumulator("ac1")
ac1.value
sc.parell..(1 to 10).map(_ * 2).map(e=>{ac1.add(1) ; e}).reduce(_+_)
ac1.value //10
通過spark實作pi的分布式計算
----------------------------
sc.parallelize(1 to 20).map(e=>{val a = 1f / (2 * e - 1) ;val b = if (e % 2 == 0) -1 else 1 ;a * b * 4}).reduce(_+_)