Scala版本的wordCount
object ScalaWordCount {
def main(args: Array[String]): Unit = {
if(args.length != ){
println("Usage:cn.edu360.spark31.day01.ScalaWordCount <input><output>")
sys.exit()
}
// 接收參數
val Array(input,output)= args
// 建立一個配置對象
val conf:SparkConf = new SparkConf()
// 建立一個SparkContext的對象
val sc:SparkContext = new SparkContext(conf)
// 讀取檔案
val files: RDD[String] = sc.textFile(input)
// 切分并壓平
val splitedData: RDD[String] = files.flatMap(_.split(" "))
// 組裝
val wordsAndOne: RDD[(String, Int)] = splitedData.map((_,))
// 進行分組聚合
val result: RDD[(String, Int)] = wordsAndOne.reduceByKey(_+_)
// 如果需要排序
val sorted = result.sortBy(-_._2)
// 存儲結果資料
sorted.saveAsTextFile(output)
// 釋放資源
sc.stop()
/// 差別 spark中,rdd上的方法 名稱和 scala本地方法的名稱,可能是一緻的
// spark中調用的是rdd上的flatMap 方法, scala 中的flatMap 是本地集合的方法
}
}
java版本的wordCount
public class JavaWordCount {
public static void main(String[] args) {
//提示輸入參數個數,若不是兩個就會提示
if (args.length != ) {
System.out.println("Usage:cn.edu360.ScalaWordCount <input><output>");
System.exit();
}
//擷取配置對象
SparkConf conf = new SparkConf();
//要是不在叢集中測試,在本地測試 則設定一下參數
// conf.setMaster("local");
// conf.setAppName(JavaWordCount.class.getSimpleName());
// 利用java程式來程式設計 spark程式的時候new JavaSparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 讀檔案
JavaRDD<String> lines = jsc.textFile(args[]);
// 切分并壓平 flatMap方法中,參數的類型是一個接口 兩個參數 一個輸入參數 一個輸出參數
JavaRDD<String> flatData = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
// 方法的傳回值類型是疊代器 需要把 數組類型的傳回值 轉換成 疊代器類型
return Arrays.asList(line.split(" ")).iterator();
}
});
// 和1 組裝成元組 第一個參數 是輸入資料類型 第二個是傳回值類型(元組)的key,第三個是元組的value
JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, );
}
});
// 分組聚合 三個參數 reduceByKey((a,b)=>a+b) 最後一個Iinteger 表達 傳回值的類型
JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 先在本地測試一下
/* result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> line) throws Exception {
System.out.println(line);
}
});
*/
// 可選,排序 <String,Integer> ---> <Integer ,String>
JavaPairRDD<Integer, String> beforeSort = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
// 把元組的key 和 value 交換 位置
return tp.swap();
}
});
// 預設是升序,如果需要降序
JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);
// 把key 和 value的順序 交換回來
JavaPairRDD<String, Integer> finalRes = sortedData.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
return tp.swap();
}
});
// 把排序後的結果寫入檔案中
finalRes.saveAsTextFile(args[]);
// 釋放資源
jsc.stop(); // close()
}
.Javalamda版本的wordCount
public class JavaLambdaWordCount {
public static void main(String[] args) {
if (args.length != ) {
System.out.println("Usage:cn.edu360.spark31.day02.JavaLambdaWordCount <input><output>");
System.exit();
}
//同上
SparkConf conf = new SparkConf();
// conf.setMaster("local");
// conf.setAppName(JavaWordCount.class.getSimpleName());
// 同上
JavaSparkContext jsc = new JavaSparkContext(conf);
// 讀檔案
JavaRDD<String> lines = jsc.textFile(args[]);
// 切分并壓平
JavaRDD<String> flatData = lines.flatMap(t -> Arrays.asList(t.split(" ")).iterator());
// 和1 組裝
JavaPairRDD<String, Integer> wordAndOne = flatData.mapToPair(line -> new Tuple2<>(line, ));
// 分組聚合
JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey((a, b) -> a + b);
JavaPairRDD<Integer, String> beforeSort = result.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> sortedData = beforeSort.sortByKey(false);
// 把元素交換回來
JavaPairRDD<String, Integer> finalResu = sortedData.mapToPair(tp -> tp.swap());
// 儲存結果
finalResu.saveAsTextFile(args[]);
// 釋放資源
jsc.stop(); // close()
}
}
SparkFrame版本的wordCount
object DataFrameWC {
def main(args: Array[String]): Unit = {
//new配置執行個體
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
//入口
val sc = new SparkContext(conf)
//sparkSQL的入口
val sqlContext: SQLContext = new SQLContext(sc)
//導入SQLContext執行個體上的隐式轉換
import sqlContext.implicits._
//讀檔案
val words: RDD[String] = sc.textFile("wc.txt")
//切分
val wordsRdd: RDD[String] = words.flatMap(_.split(" "))
//将RDD轉化成DATAFrame toDF()!!!!!!當然下面的set也可以啦
val worddf: DataFrame = wordsRdd.map(Word(_)).toDF()
//不想轉DF也可以 自己搞個類型嘛。。。下文有個樣例類(不用new 友善的一批)
// wordsRdd.map(t=>Word(t))
//注冊一個臨時表 ----要用sql沒有表怎麼行,搞個臨時的嘛
worddf.registerTempTable("t_word")
//執行sql語句,
val result: DataFrame = sqlContext.sql("select name ,count(*) as cnts from t_word group by name order by cnts desc ")
//展示結果
result.show()
//關閉資源
sc.stop()
}
}
// 定義一個字段 name //好像是被我注釋掉了
case class Word(name:String)
DataSet SQL文法的wordCount
object DataSetWC {
def main(args: Array[String]): Unit = {
// 統一的API入口
// session 或者使用 spark 如果當期上下文環境中有,直接拿來使用,否則再建立
val session: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
// 如何從sparkSession中擷取到sparkContext和sqlContext
// val sc: SparkContext = session.sparkContext
// val sqlContext = session.sqlContext
// 導入sparksession執行個體上的隐式轉換
import session.implicits._
// sessionApi 擷取到的資料類型就是Dataset
val file: Dataset[String] = session.read.textFile("wc.txt")
val wDs: Dataset[String] = file.flatMap(_.split(" "))
// Dataset 預設有schema資訊
wDs.printSchema()
// |-- value: string (nullable = true) // 預設shema的名稱是value
wDs.schema
// SQL DSL
// 注冊臨時表
wDs.createTempView("v_word")
val result: DataFrame = session.sql("select value ,count(*) cnts from v_word group by value order by cnts desc ")
result.show()
session.stop()
// session.close()
}
}
Dataset DSL 文法版本的wordCount
object DatasetDSLDemo {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
// 導入sparksession執行個體上的隐式轉換
import session.implicits._
// sessionApi 擷取到的資料類型就是Dataset
val file: Dataset[String] = session.read.textFile("person.txt")
val pdf: DataFrame = file.map(_.split(" ")).map(t=>(t(),t().toInt,t().toInt)).toDF("name","age","fv")
// DSL 文法統計
// select where group by order by count sum max min
// 選擇
pdf.select("name","age")
// .show()
// pdf.select(pdf.col("name"),pdf.col("age")).show()
// pdf.select(pdf("name"),pdf("age")).show()
// where 條件 過濾 where 調用的是filterAPI
// pdf.where("fv > 93 ")
pdf.filter("fv > 93") // .show()
// 排序 order by Sort orderBy 調用的就是sort
pdf.orderBy("age").show()
// 多個條件的排序
pdf.sort($"age" ,$"fv" desc) //.show()
// 分組之後還需要進行聚合統計 sum max min avg
pdf.groupBy($"age").sum("fv") //.show()
val result1: DataFrame = pdf.groupBy($"age").count()
// result1.show()
// 聚合條件 需要導入函數
import org.apache.spark.sql.functions._
// 聚合時,指定别名
pdf.groupBy($"age").agg(count("*") as "cnts")// .show()
pdf.groupBy($"age").agg(max("fv"))//.show()
// 重命名列名
pdf.withColumnRenamed("age","myage").show()
// pdf
session.stop()
}
}
Streaming版本的wordcount
object StreamingWordCount {
// 設定日志級别為 Warn
val logger = Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
// 本地運作的 配置
.setMaster("local[5]") // 至少是2個cores
.setAppName(this.getClass.getSimpleName)
// conf: SparkConf, batchDuration: Duration
// 時間間隔 需要根據我們的業務, 不是随便定義的
val ssc: StreamingContext = new StreamingContext(conf, Seconds())
// 從 socket中擷取資料建立一個将要連接配接到 hostname:port 的 DStream,
// 這裡讀取socket資料的Receiver,需要一個core
val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)
val textStream2: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03",)
textStream2.print()
// 1 個 receiver
// 擷取到資料之後的業務邏輯
val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,)).reduceByKey(_+_)
// 列印結果 預設列印 10行
result.print()
// prefix-TIME_IN_MS.suffix
result.saveAsTextFiles("stream","log")
// 啟動主程式 需要去連接配接socket
ssc.start()
// 阻塞 等待程式被關閉
ssc.awaitTermination()
}
}
Streaming SQL 版本的wordcount
object StreamingSQL {
// 設定日志級别為 Warn
val logger = Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
// 本地運作的 配置
.setMaster("local[2]") // 至少是2個cores
.setAppName(this.getClass.getSimpleName)
// conf: SparkConf, batchDuration: Duration
// 時間間隔 需要根據我們的業務, 不是随便定義的
val ssc: StreamingContext = new StreamingContext(conf, Seconds())
// 從 socket中擷取資料建立一個将要連接配接到 hostname:port 的 DStream,
// 這裡讀取socket資料的Receiver,需要一個core
val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hdp-03", )
// DStream = rdd + time
// 如何使用SQL 文法來操作資料
textStream.foreachRDD(rdd => {
val session = SparkSession.builder()
.config(conf) // 使用已經存在的conf對象
.getOrCreate()
import session.implicits._
// 按照空格切分 所有的空格
val wordRDD: RDD[String] = rdd.flatMap(_.split(" +"))
val wdf: DataFrame = wordRDD.toDF("word")
// 注冊臨時視圖 createOrReplaceTempView
wdf.createOrReplaceTempView("v_word")
val dfRes = session.sql("select word,count(1) cnts from v_word group by word order by cnts desc")
dfRes.show()
})
// 啟動主程式 需要去連接配接socket
ssc.start()
// 阻塞 等待程式被關閉
ssc.awaitTermination()
}
}
Kafka版本的wordCount
object StreamingKafkaWC {
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getSimpleName)
val ssc= new StreamingContext(conf,Seconds())
// kafka的參數配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hdp-02:9092,hdp-03:9092,hdp-04:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "hello_topic_group",
// 從哪個位置開始讀取資料
"auto.offset.reset" -> "earliest",
// 是否可以自動送出偏移量 自定義
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// streaming 支援 讀取kafka的多個主題
val topics = Array("helloTopic6", "topicB")
// 指定泛型約定
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
// 我們的broker和executor在同一台機器上
// LocationStrategies.PreferBrokers
// 資源會平均配置設定
LocationStrategies.PreferConsistent,
// PreferConsistent,
// 消費者 訂閱哪些主題
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd=>{
// rdd中的資料類型是 ConsumerRecord[String, String] 拿到這裡的value
val result = rdd.map(_.value()).map((_,)).reduceByKey(_+_)
result.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
}