1.什麼是Spark?
2.為什麼要使用Spark?
因為Hadoop在處理資料的時候有多次的IO和網絡操作,Mapreduce都要轉成map,shuffle和reduce等核心階段,而且任務之間是串行執行的
Spark對比Hadoop MR的特點
記憶體計算比mr快100倍,磁盤計算快mr10倍
使用友善,安裝部署簡單,支援互動式
支援處理豐富
繼承hadoop,能都讀取hadoop上的資料,hdfs,hbase等
Spark運作模式
- local本地模式,多線程
- standalone叢集模式可以使用zk解決單點故障問題
- on YARN,通過yarn排程資源,spark負責任務排程和計算
- On mesos
- on cloud
常見術語解釋
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLw0kaNlXU61EMRpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5UTMxUTMzUTMzADOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
什麼是RDD?
RDD就是分布式彈性資料集,可以在定義app的時候指定,通常是加載外部資源資料或者是對象集合
RDD有兩種操作,轉換和動作
轉換就是将原來的Rdd通過某種規則轉換成新的RDD,轉換是函數規則,例如map和Filter
動作就是求出結果
轉換采用的是懶處理,隻有當動作的時候,才會真真的計算
如果需要對一個RDD進行重用,可以通過rdd,persist()方法将其放入記憶體中
血統,也就是RDD之間的關系
RDDD本質上是一個隻讀的分區記錄集合,每一個分區就是一個dataset(類似于mr中的檔案 切片)
map會産生窄依賴,groupby産生寬依賴
RDD的特征
- 有一個分片清單,可以将一個RDD分成多個分片,這樣才能并行運算
- 函數計算每一個分片
- 對父RDD的依賴可以分為寬依賴和窄依賴(一個是窄,多個依賴就是寬依賴)
- 對KV類型的RDD是根據hash分區的,每一個分片有預先計算位置,移動運算優于移動資料
通過RDD.toDebugString,可以看到目前RDD的血統
基本概念
RDD來自英文Resilient Distributed DataSet,字面翻譯也就是彈性可恢複的分布式的資料集,彈性是指這個集合是可大可小可以動态變化的,可恢複是針對處理過程是可恢複的,資料是存儲在不同機器上的
RDD實際上是對分區資料的一種抽象和邏輯集合,RDD可以通過讀取HDFS上的資料,也可以由内部資料進行初始化
RDD分區,也就是對資料集的劃分,分區決定了處理資料的并行度
預設是一個檔案切片一個分區(如果檔案大小差距比較大,會講大的檔案對應多個分區,具體的邏輯在FileInputFormat類中),
可以通過指定運作的cores參數,來指定分區數量,核心數決定了運作時的線程數量
一個Job對應多個Task,每個Job對應多個stage,如果計算過程沒有shuffle,就是一個stage,如果存在shuffle,每次shuffle都會講task劃分為兩個階段(stage),一個stage對應一個task
一個任務如果沒有shuffle就是一個task,一個stage,如果有一次shuffle,就是兩個stage,也就是兩個task,這兩個task會形成DAG(有向無環圖,從子RDD不斷往上遞歸)也就是執行順序,對于HDFS檔案來說,檔案中的一條資料隻對應一個分區,但是可以有多個task操作操作這個資料
Task在Dirver端生成之後,會通過序列化來發送到worker結點的excutor中,Executor啟動一個線程進行反序列化,執行使用者的邏輯,如果存在後續的task,會存在記憶體中,或者持久化到磁盤上,Task持有父RDD的疊代器,對父RDD的疊代器中進行處理
舉個例子
API使用者:司令
RDD:軍隊
Driver,也就是智囊團
Mater,就是黨中央
Worker是各個軍隊
不管是Worker,和Driver,必須要是黨的一部分才能參政議政,是以都會向Master注冊自己
當有戰鬥任務的時候,我們發号施令說明自己像幹嘛,需要多少人參加,然後智囊團進行計劃,那幾個軍隊參加比較合适和占據優勢
然後向Master去申請,Dirver将作戰計劃形成文檔(Task的序列化文檔),加密後發送給各個戰鬥部門所有的戰鬥的具體都是由智慧團指導的,各個軍隊之間将各自小的戰鬥成果彙總成大的勝利階段一步一步取得勝利,但是這個謀劃必須要由将軍指令行動才會執行
RDD算子及功能
Transformation
reduceBykey比groupbykey的效率高,因為資料量小,而且可以局部聚合,groupbyKey在shuffle的時候會将kv都寫入檔案中等待stage2聚合
涉及到shuffle的轉換算子詳解
def main(args: Array[String]): Unit = {
val goodsMoneyCount: SparkConf = new SparkConf().setAppName("GoodsMoneyCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(goodsMoneyCount)
val rdd1: RDD[String] = sc.textFile("")
// new ShuffledRDD[K, V, C](self, partitioner)
// .setSerializer(serializer)
// .setAggregator(aggregator)
// .setMapSideCombine(mapSideCombine)
// Aggregator,集合器
// 其中的rdd.reduceBykey,其實就是建立一個新的shufflerdd,并且設定了Aggregator,這裡面定義了怎麼進行聚合
val wordAndOne: RDD[(String, Int)] = rdd1.map(s=>(s,1))
val shuffleRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String,Int,Int](wordAndOne,new HashPartitioner(rdd1.getNumPartitions))
//mapside
val createCombiner=(a:Int)=>a
val mergeValue=(a:Int,b:Int)=>b + a
//shuffle read之後執行
val mergeCombiners=(arr1:Int,arr2:Int)=>{arr1 +arr2}
//定義三個函數
val reduceByKey: ShuffledRDD[String, Int, Int] = shuffleRDD.setAggregator(new Aggregator[String, Int, Int](createCombiner,mergeValue,mergeCombiners))
reduceByKey.saveAsTextFile("shuffle-out")
sc.stop()
//也就是說,其實reduce和group等隻要涉及到shuffle的都是調用了ShuffleRDD這個類,這個類将應用傳進來的三個函數
// 其中前兩個函數都是在shuffleread的時候調用,也就是寫入本地磁盤之前,第三個是在shuffleread之後調用
//前兩個是對分區的結果進行聚合操作,最後一個參數是對分區後的結果進行聚合
}
通過建立ShuffleRDD來自定義shuffle方法
自定義shuffledRDD實作
ReduceByKey
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("ReduceByKey").setMaster("local[*]"))
val words: RDD[String] = sc.parallelize(
List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"
), 4)
// val value: RDD[(String, Int)] = words.mapPartitions(f=>{f.map(t=>(t,1))})
// //獲得每個分區的疊代器,将每個值加上1之後,獲得rdd
// val tuples: Array[(String, Int)] = value.reduceByKey(_+_).collect()//根據key進行聚合操作
val shuffle: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String,Int,Int](words.map(f=>(f,1)),new HashPartitioner(words.partitions.length))
//通過建立ShuffledRDD,來自定義建構Shuffled
//其中[String,Int,Int] ----分别是針對父rddwords.map(f=>(f,1)),的輸入的key,輸出的value,以及進行shuffle的結果集合輸出,第二個參數是分區器
//聚合器 Aggregator
//第一個聚合器是對每一行資料的val進行處理,第二個是對局部進行什麼處理,最後一個是對全局結果進行什麼處理
val rdd: ShuffledRDD[String, Int, Int] = shuffle.setAggregator(new Aggregator[String, Int, Int](x=>x, (a, b)=>a+b,_+_))
println(rdd.collect().toBuffer)
sc.stop()
}
groupByKey
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("ReduceByKey").setMaster("local[*]"))
val words: RDD[String] = sc.parallelize(
List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"
), 4)
val rdd1=words.map(f=>(f,1))
val shuffle: ShuffledRDD[String, Int, ArrayBuffer[Int]] = new ShuffledRDD[String,Int,ArrayBuffer[Int]](rdd1,new HashPartitioner(rdd1.partitions.length))
val createCombiner = (v: Int) => ArrayBuffer(v) //每個記錄怎麼處理
val mergeValue = (buf: ArrayBuffer[Int], v: Int) => buf += v //每個分區怎麼處理
val mergeCombiners = (c1: ArrayBuffer[Int], c2: ArrayBuffer[Int]) => {c1 ++= c2
} //對各個分區結果怎麼處理
shuffle.setAggregator(new Aggregator[String,Int,ArrayBuffer[Int]](createCombiner,mergeValue,mergeCombiners))
//第一個函數是指定了每個資料在map處理完成之後就開始執行,也就是mapPartition之後
//處理完成之後,就開始将相同的key分發到不同的分區,這是在shuffle之前,每個分區将自己分區的結果進行何種計算
//這時候的拿到的已經是分區内每個key及對應的疊代器
//最後一個是task2已經拿到shuffle之後的結果了,這時候的結果是多個分區的結果,對這些分區的資料進行何種處理
println(shuffle.collect().toBuffer)
Thread.sleep(1000000000)
sc.stop()
}
aggregateByKey,每個分區加上一個初始值,然後對相同的key做f1,對不同分區做f2
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("ReduceByKey").setMaster("local[*]"))
val wordAndCount = sc.parallelize(List(
("spark", 5), ("hadoop", 2), ("spark", 3), ("hive", 4),
("spark", 9), ("hadoop", 4), ("spark", 1), ("hive", 2)
), 2)
val result = wordAndCount.aggregateByKey(10)(Math.max(_, _), _+_)
//map階段的局部結果進行max(_, _)運算,然後将總體結果 _+_
//初始值10,每個分區一個初始值
result.saveAsTextFile("aggregate3-out")
sc.stop()
}
Cogroup
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
将兩個RDD進行組合,依賴于Key,傳回的是同一個key的多個iterator,每個都是一個分區的元素疊代器
join方法也是依賴于這個方法
ACTION
Spark-submit的參數
Spark配置檔案
Spark原理及流程
Standalone模式
這裡的standalone是通過spark-submit送出任務的,這時候,SparkSubmit和Driver都是是運作在送出任務的用戶端的,還有一種是cluster模式,這種模式的的Driver是運作在一個Worker結點上的
- start-all.sh啟動之後,會通過本地shell啟動,将本地結點的資訊向zookeeper注冊,并且注冊為Master
- 啟動slave配置檔案的子節點
- 啟動app程式後,首先找到Master結點,
- 這時候DiverApplication裡的scheduler收到任務會要求Master排程多少資源給他,Master會通知Worker建立Executer,
- Worker建立完畢之後之後會向Driver反向注冊(Master告訴了Worker).
注意:
每個Worker可以運作多個ExcuterRunner(每個都是一個單獨的任務)
一個Executer對應一個CoarseGrainedExecutorBackend,
一個Executor裡面有該worker分得的該任務的task,可以有多個task,單這些task都是同一個任務的,一個Worker兩個Executor的Task是不同任務的task,Task隻是不同的對象,Excutor就是線程池,每個Task運作由一個獨立線程反序列化的Task到線程池中