天天看點

Update StateByKey 算子. Tranform算子_ 1|學習筆記

開發者學堂課程【大資料實時計算架構 Spark 快速入門:Update StateByKey  算子. Tranform 算子_ 1】學習筆記,與課程緊密聯系,讓使用者快速學習知識。

課程位址:

https://developer.aliyun.com/learning/course/100/detail/1724

Update StateByKey  算子. Tranform 算子_ 1

​具體操作如下​:​

package com. shsxt. study,streaming;

import java. sql.  Connection ;

public class  PersistMySQLWordcount {

public static void main(String[]args){

Sparkconf conf=new SparkConf().  settlaster ("local[1]"). setAppName ("Persist# ySQIWordcoun   JavaStreamingContextjsc =new  JavaStreamingContext (conf, Durations,seconds(5));

JavaDStream <String>lines=jssc. textFileStream ("hdfs://node21:8020/wordcount _ dir");  JavaDStream <String>words=lines,flatMap(new  FlatMapFunction <String, Stringy()(

private static final long serial  VersionUID =1L;

@Override

public Iterable<String>call(String line) throws Exception{

return Arrays. astist(line, split("“));

}

});  

JavaPairDStream <String,Integer>pairs=words.mapToPair(new  Pairfunction <String, string.

private static final long se  rialVersionUID =1L;

@Override

publicTuple2 <String, Integer>call(String word) throws Exception{

wordpress. print();

wordcounts . foreachRDD (new VoidFunction < 3avaPairRDD <String, Integer>>(){

private static final long serial  VersionUID =1L;

@Override public void call( JavaPairRDD <String,Integer> wordcountsRDD ) throws Exception{word countsRDD. foreach Partition(new Void Function<Iterator<Tuple2<String, Integer>>>(){

private static final long serial  VersionUID =1L;

@Override public void call(Iterator<Tuple2<String, Integer> wordcounts ) throws Exception{ Connection  conn= ConnectionPool . getConnection ();

Tuple2<String, Integer>wordcount=null;

while( wordcounts .hasNext()}{

wordpress= wordcounts . next();

String sql="insert into wordcount(word, count)"

+"values('"+wordcount._1+"',"+wordcount._2+")";

Statement stmt=conn. createStatement ();

stmt  executeUpdate (sql);

Conne ctionPool. returnConne  ction(conn);

}

});

jssc. start();

jssc. awaitTermination ();

jssc. close();

}