開發者學堂課程【大資料實時計算架構 Spark 快速入門:UpdateStateByKey、Tranform 算子_2】學習筆記,與課程緊密聯系,讓使用者快速學習知識。
課程位址:
https://developer.aliyun.com/learning/course/100/detail/1725UpdateStateByKey、Tranform 算子_2
内容簡介:
一、UpdateStateByKey 相關代碼
二、Transform Operation(變換操作) 介紹
三、TraratormOperation 相關代碼
1
package
com.snsxt.stuay.streaming;
2
3
import
java.util.Arrays;
19
20
public class
UpdateStateByKeyWordcount{
21
22
public static void
main(string[] args){
23SparkConfconf=
new
SparkConf().setAppName("
UpdateStateByKeyWordcount
").setMaster("
local[2]
");
24
JavaStreamingContextjss
=
new
JavaStreamingContext(conf,Durations.seconds(5));
25
jssd.checkpoint(".);
26
27
JavaReceiverInputDstream<String> linesmissc.socketTextStream("
node24
",8888);
28
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,string>()(
29
30
private static final long
serialVersionUID
= 1L;
31
32
@Override
33
public
Iterable<String> call(string line)
throws
Exception{
34
return
Arrays.asList(line.split(""));
35
36
});
37
JavaPairDStream<String, Integer> pairs =words.mapToPair(new PairFunction<String, string, Integer>()(
38
39
private static final long
serialversionUID
=1L;
40
41
@Override
42
public
Tuple2<String, Integer> call(string word)throws Exception{
43
return new
Tuple2<String, Integer>(word,1);
44}
45
);
46
JavaPairDStream<String,Integer>wordcountspairs.updateStateByKey(new Function2<List<Integer>, Optionale
47
4
8
private static final long
serialVersionUID
= 1L;
49
50 //實際上,對于每個單詞,每次 batch 計算的時候,都會調用這個函數,第一個參數 values 相當于這個 batch 中
51 // 這個 key 對應的新的一組值,可能有多個,可能 2 個 1,(xuruyun,1)(xuruyun,1),那麼這個 values 就是(1,1)
52 //那麼第二個參數表示的是這個 key 之前的狀态,我們看類型 Integer 也就知道了,這裡是泛型自己指定的
二、Transform Operation(變換操作)
The transform operation(along with is variations like transformwith)allows arbitraryRDO-to-RDD functions to be applyed on a DStream. it can be used to apply any ROD operation that is not expdsed in the DStream API. For example,the functionality of joining every batch in a data stream wth another dataset is not directly exposed in the DStream API.However, you can easily use transform to do this. This enables very powerful possibilities.For example,one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.
譯文:變換操作(類似于 transforn with 的變體)允許将任意的 RDO 到 RDD 函數應用于 DStream 。它可以用于應用任何在 DStream API 中沒有展開的 ROD 操作。例如,在資料流中加入每個批處理的功能,另一個資料集不會直接暴露在 DS Stream API.不過,您可以輕松地使用 transform 來完成此操作。這非常具有可能性。例如,可以通過将輸入資料流與預先計算的垃圾郵件資訊(也可能使用 Spark 生成)連接配接起來,然後根據這些資訊進行比對,進而實作實時資料清理。
1
package
com.shsxt.study.streaming;
2
3
import
java.util.ArrayList;
20
21
public class
TransformOperation{
22
23
public static void
main(String[] args)(
24
SparkConf conf=
new
SparkConf().setAppName("
TransformOperation
").setMaster("local[2]");
25JavaStreamingContex
jssc=
new
JavaStreamingContext(conf,Durations.seconds(20));
26
27 //使用者對于網上的廣告可以進行點選!點選之後可以進行實時計算,但是有些使用者就是刷廣告!
28//是以說我們要有一個黑名單機制!隻要是黑名單中的使用者點選的廣告,我們就給過濾掉!
29
30 // 先來模拟一個黑名單資料 RDD,true 代表啟用,false 代表不啟用!
31 List<Tuple2<String,Boolean>> blacklist =
new
ArrayList<Tuple2<String, Boolean>>();
32
blacklist.add(
new
Tuple2<String,Boolean>("
yasaka
",
true
));
33 blacklist.add(
new
Tuple2<String,Boolean>("
xuruyun
",
false
));
34
35
final
JavaPairRDO<String,Boolean>blacklistRDD =jssc.se().parallelizePairs(blacklist);
36
37
// time ad
I
d name
38
JavaReceiverInputDStream<String>adsClickLogDStream =jssc.socketTextStream("node24",8888);
39
40
JavaPairDStream<String,String>adsClickLogPairDStream = adsClickLogDStream.mapToPair(new
PairFunction<String, String, String>()
41
42
private static final long
serialVersionuID
=
1L;
43
44
@Override
45
public
Tuple2<String, String> call(string line)throws Exception(
46
return new
Tuple2<String,String>(line.split("")[2],line);
47
48));
49
50JavaDStream<String>normalLogs=adsClickLogPairDStream.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String() {
51
52
private static final long
serialVersionuID
= 1L;
53
54
@Override
55
public
JavaRDD<String> call(JavaPairRDD<String, String> userLogBatchrDo)
56
throws
Exception{
57
58
JavaPairRDD<String,Tuple2<String,Optional<Boolean>>> joined
R
DD
=
userLogBatchRDD.leftOuterJoin(blacklist
R
DD);
59
60
61 JavaPairRDD<String,Tuple2<String, Optional<Boolean>>> filteredRDD =
62 joinedRDD.filter(
new
Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>,Boolean>()
63
64
private static final long
serialVersionUID
= 1L;
65
66 @Override
67
public
Boolean call(Tuple2<String,Tuple2<String,Optional<Boolean>>> tuple)
68
throws
Exception{
69
70
if
(tuple.2.2.isPresent()&&tuple.2.2.get()){
71
return false
;
72}
73
74
return
true
;
75}