天天看點

HDFS資料源、DStream的持久化存儲_ 2|學習筆記

開發者學堂課程【大資料實時計算架構 Spark 快速入門:HDFS 資料源、DStream 的持久化存儲_ 2】學習筆記,與課程緊密聯系,讓使用者快速學習知識。

課程位址:

https://developer.aliyun.com/learning/course/100/detail/1723

HDFS 資料源、DStream 的持久化存儲_ 2

package com. shsxt. study,streaming;

import java. util. Arrays;[

public class  UpdateStateByKeyWordcount {

public static void main(String[]args){

SparkConfconfen—new Sparkconf(). setApplame (“ IpdateStateyWordcount ”).setMas  Javastreaming  Context jsscsc=new JavaStreamingContext (conf, Durations-seconds(5) js jssc.  checkpoint (".");

 JavaReceivprInputDStream <String>lines=jssc. socketTextStream ("node24", 8888)]

JavaDStream <String>words=lines.flatMap(new  FlatMapFunction <String, String>(){

private static final long serial  VersionUID =1L;

@Override

public Iterable<String>call(String line) throws Exception{

return Arrays.asList(line, split(""));

JavaPairDStreamsString ,Integer>pairds.mapIoPair(new Pair function<String, String, Integer/O

private static final long serial  VersionUID =1L;

@Override

publicTuple2 <String, Integer>call(String word) throws Exception{return newTuple2<String, Integer>(word,1);

}

});

JavaPainDStreawKtrin ,Integer)  wordcounts - pudatestated $Key(new Function? list(integer), Optional.

​​

private static final long se  rialVersionUID =1L;

//實際上,對于每個單詞,每次 batch 計算的時候,都會調用這個函數,第一個參數 values 相當于這個 batch 中

//這個 key 對應的新的一組值,可能有多個,可能 2 個 1,(xuruyun,1)(xuru yyun,1),那麼這個 values 就是(1,1)

//那麼第二個參數表示的是這個 key 之前的狀态,我們看類型 Integer 就知道了,這裡是泛型自己指定的。

繼續閱讀