天天看點

UpdateStateByKey、Tranform 算子_2|學習筆記

開發者學堂課程【大資料實時計算架構 Spark 快速入門:UpdateStateByKey、Tranform 算子_2】學習筆記,與課程緊密聯系,讓使用者快速學習知識。

課程位址:

https://developer.aliyun.com/learning/course/100/detail/1725

UpdateStateByKey、Tranform 算子_2

内容簡介:

一、UpdateStateByKey 相關代碼

二、Transform Operation(變換操作) 介紹

三、TraratormOperation 相關代碼

1

​​

package

com.snsxt.stuay.streaming;

2

3

import

 java.util.Arrays;

19

20

public class

 UpdateStateByKeyWordcount{

21

22

public static void

 main(string[] args){

23SparkConfconf=

new

SparkConf().setAppName("

UpdateStateByKeyWordcount

").setMaster("

local[2]

");

24

JavaStreamingContextjss

=

new

 JavaStreamingContext(conf,Durations.seconds(5));

25

jssd.checkpoint(".);

26

27

JavaReceiverInputDstream<String> linesmissc.socketTextStream("

node24

",8888);

28

JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,string>()(

29

30

private static final long

serialVersionUID

 = 1L;

31

32

@Override

33

public

Iterable<String> call(string line)

 throws

Exception{

34

return

 Arrays.asList(line.split(""));

35

36

});

37

JavaPairDStream<String, Integer> pairs =words.mapToPair(new PairFunction<String, string, Integer>()(

38

39

private static final long

serialversionUID

=1L;

40

41

@Override

42

public

Tuple2<String, Integer> call(string word)throws Exception{

43

return new

 Tuple2<String, Integer>(word,1);

44}

45

);

46

JavaPairDStream<String,Integer>wordcountspairs.updateStateByKey(new Function2<List<Integer>, Optionale

47

4

8

private static final long

serialVersionUID

= 1L;

49

50   //實際上,對于每個單詞,每次 batch 計算的時候,都會調用這個函數,第一個參數 values 相當于這個 batch 中

51   // 這個 key 對應的新的一組值,可能有多個,可能 2 個 1,(xuruyun,1)(xuruyun,1),那麼這個 values 就是(1,1)

52 //那麼第二個參數表示的是這個 key 之前的狀态,我們看類型 Integer 也就知道了,這裡是泛型自己指定的

二、Transform Operation(變換操作)

The transform operation(along with is variations like transformwith)allows arbitraryRDO-to-RDD functions to be applyed on a DStream. it can be used to apply any ROD operation that is not expdsed in the DStream API. For example,the functionality of joining every batch in a data stream wth another dataset is not directly exposed in the DStream API.However, you can easily use transform to do this. This enables very powerful possibilities.For example,one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.

譯文:變換操作(類似于 transforn with 的變體)允許将任意的 RDO 到 RDD 函數應用于 DStream 。它可以用于應用任何在 DStream API 中沒有展開的 ROD 操作。例如,在資料流中加入每個批處理的功能,另一個資料集不會直接暴露在 DS Stream API.不過,您可以輕松地使用 transform 來完成此操作。這非常具有可能性。例如,可以通過将輸入資料流與預先計算的垃圾郵件資訊(也可能使用 Spark 生成)連接配接起來,然後根據這些資訊進行比對,進而實作實時資料清理。

1

package

com.shsxt.study.streaming;

2

3  

import

 java.util.ArrayList;

20

21

public class

 TransformOperation{

22

23

public static void

 main(String[] args)(

24

SparkConf conf=

new

SparkConf().setAppName("

TransformOperation

").setMaster("local[2]");

25JavaStreamingContex

jssc=

new

JavaStreamingContext(conf,Durations.seconds(20));

26

27 //使用者對于網上的廣告可以進行點選!點選之後可以進行實時計算,但是有些使用者就是刷廣告!

28//是以說我們要有一個黑名單機制!隻要是黑名單中的使用者點選的廣告,我們就給過濾掉!

29

30 // 先來模拟一個黑名單資料 RDD,true 代表啟用,false 代表不啟用!

31 List<Tuple2<String,Boolean>> blacklist =

new

 ArrayList<Tuple2<String, Boolean>>();

32

blacklist.add(

new

 Tuple2<String,Boolean>("

yasaka

",

 true

));

33 blacklist.add(

new

 Tuple2<String,Boolean>("

xuruyun

",

false

));

34

35

final

JavaPairRDO<String,Boolean>blacklistRDD =jssc.se().parallelizePairs(blacklist);

36

37

// time ad

I

d name

38

JavaReceiverInputDStream<String>adsClickLogDStream =jssc.socketTextStream("node24",8888);

39

40

JavaPairDStream<String,String>adsClickLogPairDStream = adsClickLogDStream.mapToPair(new

PairFunction<String, String, String>()

41

42

private static final long

 serialVersionuID

=

1L;

43

44

@Override

45

public

 Tuple2<String, String> call(string line)throws Exception(

46

 return new

 Tuple2<String,String>(line.split("")[2],line);

47

48));

49

50JavaDStream<String>normalLogs=adsClickLogPairDStream.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String() {

51

52

private static final long

serialVersionuID

= 1L;

53

54

@Override

55

public

JavaRDD<String> call(JavaPairRDD<String, String> userLogBatchrDo)

56  

throws

 Exception{

57

58

JavaPairRDD<String,Tuple2<String,Optional<Boolean>>> joined

R

DD

=

userLogBatchRDD.leftOuterJoin(blacklist

R

DD);

59

60

61 JavaPairRDD<String,Tuple2<String, Optional<Boolean>>> filteredRDD =

62 joinedRDD.filter(

new

 Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>,Boolean>()

63

64

private static final long

serialVersionUID

 = 1L;

65

66 @Override

67

public

Boolean call(Tuple2<String,Tuple2<String,Optional<Boolean>>> tuple)

68

 throws

 Exception{

69

70

if

(tuple.2.2.isPresent()&&tuple.2.2.get()){

71

return false

;

72}

73

74

return

true

;

75}

繼續閱讀