開發者學堂課程【大資料實時計算架構 Spark 快速入門:Update StateByKey 算子. Tranform 算子_ 1】學習筆記,與課程緊密聯系,讓使用者快速學習知識。
課程位址:
https://developer.aliyun.com/learning/course/100/detail/1724Update StateByKey 算子. Tranform 算子_ 1
具體操作如下:
package com. shsxt. study,streaming;
import java. sql. Connection ;
public class PersistMySQLWordcount {
public static void main(String[]args){
Sparkconf conf=new SparkConf(). settlaster ("local[1]"). setAppName ("Persist# ySQIWordcoun JavaStreamingContextjsc =new JavaStreamingContext (conf, Durations,seconds(5));
JavaDStream <String>lines=jssc. textFileStream ("hdfs://node21:8020/wordcount _ dir"); JavaDStream <String>words=lines,flatMap(new FlatMapFunction <String, Stringy()(
private static final long serial VersionUID =1L;
@Override
public Iterable<String>call(String line) throws Exception{
return Arrays. astist(line, split("“));
}
});
JavaPairDStream <String,Integer>pairs=words.mapToPair(new Pairfunction <String, string.
private static final long se rialVersionUID =1L;
@Override
publicTuple2 <String, Integer>call(String word) throws Exception{
wordpress. print();
wordcounts . foreachRDD (new VoidFunction < 3avaPairRDD <String, Integer>>(){
private static final long serial VersionUID =1L;
@Override public void call( JavaPairRDD <String,Integer> wordcountsRDD ) throws Exception{word countsRDD. foreach Partition(new Void Function<Iterator<Tuple2<String, Integer>>>(){
private static final long serial VersionUID =1L;
@Override public void call(Iterator<Tuple2<String, Integer> wordcounts ) throws Exception{ Connection conn= ConnectionPool . getConnection ();
Tuple2<String, Integer>wordcount=null;
while( wordcounts .hasNext()}{
wordpress= wordcounts . next();
String sql="insert into wordcount(word, count)"
+"values('"+wordcount._1+"',"+wordcount._2+")";
Statement stmt=conn. createStatement ();
stmt executeUpdate (sql);
Conne ctionPool. returnConne ction(conn);
}
});
jssc. start();
jssc. awaitTermination ();
jssc. close();
}