天天看點

spark 第一個簡單執行個體wordcount

背景:

因為是在已經搭好的Maven項目上進行開發,環境是叢集環境,不需要再配置,隻是講一下寫時遇到的坑。

1. 因為是采用idea開發,直接在maven項目上建立一個檔案夾SparkWordCount和檔案SparkWordCount.scala,利用maven打包後,spark-submit送出任務後,一直報錯:

19/02/20 19:34:23 ERROR yarn.ApplicationMaster: Uncaught exception: 
java.lang.ClassNotFoundException: com.yixia.bigdata.etl.SparkWordCount.SparkWordCount
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)      

後來發現,原來是代碼裡面沒加包名:package com.yixia.bigdata.etl.SparkWordCount,是以找不到主類

2. 包名問題解決後,又報錯

19/02/20 19:42:38 INFO yarn.ApplicationMaster: Waiting for spark context initialization...
Exception in thread "Driver" java.lang.NullPointerException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:552)
19/02/20 19:42:38 ERROR yarn.ApplicationMaster: Uncaught exception: 
java.lang.IllegalStateException: SparkContext is null but app is still running!
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:355)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:197)      

後來發現,主類是用class建立的,而不是object建立的。因為object是scala的靜态類,而main方法作為整個程式的入口,必須是靜态的,隻能用object來修飾,而不能用class

3. 統計詞頻的邏輯,首先把檔案讀進去,然後過濾掉空字元,再利用map對每個詞頻進行統計,最後reduceByKey即可,利用saveAsTestFile進行儲存。這裡沒有用split()函數,主要是因為我的測試資料一行就一個字元,是以不需要split(); 為了保證執行個體的資源回收,必須寫sc.stop()

詳細的執行個體程式設計代碼如下:

package com.yixia.bigdata.etl.SparkWordCount

import org.apache.spark.{SparkConf, SparkContext}


object SparkWordCount  {
  def main(args:Array[String]) : Unit ={



    val sc = new SparkContext()
   
    val input = args(0) //hdfs://nameservice1/user/matrix/zy
    val output = args(1)

    val data = sc.textFile(input)
    val wordcount=data.filter(it => it!="").map(word => (word,1)).reduceByKey(_+_)
    wordcount.saveAsTextFile(output)

    sc.stop()
  }



}
           

送出到叢集上的代碼如下:

spark-submit \
    --class com.yixia.bigdata.etl.SparkWordCount.SparkWordCount \
    --name com.yixia.bigdata.etl.SparkWordCount.SparkWordCount \
    --keytab ~/.matrix.keytab  \
    --principal matrix  \
    --master yarn \
    --deploy-mode cluster \
    --num-executors 50 \
    --queue matrix \
    ./etl-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://nameservice1/user/matrix/zy hdfs://nameservice1/user/matrix/zy1234

           

送出到yarn叢集上,既可以跑出來結果了,結果存在hdfs上面

繼續閱讀