天天看點

最最簡單的~WordCount¬

sc.textFile("hdfs://....").flatMap(line =>line.split(" ")).map(w =>(w,1)).reduceByKey(_+_).foreach(println)
      

不使用reduceByKey

sc.textFile("hdfs://....").flatMap(l=>l.split(" ")).map(w=>(w,1)).groupByKey().map((p:(String,Iterable[Int]))=>(p._1,p._2.sum)).collect
      

步驟1:textFile先生成HadoopRDD,然後再通過map操作生成MappedRDD.

結果:res0:org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13

步驟2:val split = line =>line.split(" ")).flatMap(line => line.split(" ")) flatMap将原來的MappedRDD轉換為FlatMappedRDD

步驟3:val wordCount = split.map(w =>(w,1)) 利用w生成相應的鍵值對,上一步的FlatMappedRDD被轉換為MappedRDD

步驟4:val reduce = wordCount.reduceByKey(_+_)

步驟5:reduce.foreach(println) 觸發執行  

 在執行foreach時,調用了runJob函數,實作了重載。 Final RDD和作用于RDD上的Function。 然後讀取Finall RDD的分區數,通過allowLocal來表示是否在Standalone模式下執行。

從spark-shell到sparkContext的建立的調用路徑:

spark-shell -> spark-submit ->spark-class->sparkSubmit.main ->SparkILoop -> createSparkContext

SpackContext初始化過程中 傳入的入參是SparkConf

一、根據初始化生成SparkConf,再根據SparkConf來建立SparkEnv.

二、建立TaskScheduler,根據Spark的運作模式選擇相應的SchedulerBackend,同時啟動TaskScheduler

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this,master,appName)
taskScheduler.start()
      

 createTaskScheduler最為關鍵,根據master環境變量來判斷Spark目前的部署方式,進而生成相應的SchedulerBackend的不同子類。taskScheduler.start的目的是啟動相應的SchedulerBackend.

三、從上一步建立的taskScheduler執行個體為入參建立DAGScheduler并啟動運作。

private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
      

四、啟動WebUI.

ui.start()
      

  

繼續閱讀