背景
在大資料的實時進行中,實時的大屏展示已經成了一個很重要的展示項,比如最有名的雙十一大屏實時銷售總價展示。除了這個,還有一些其他場景的應用,比如我們在我們的背景系統實時的展示我們網站目前的pv、uv等等,其實做法都是類似的。
今天我們就做一個最簡單的模拟電商統計大屏的小例子,我們抽取一下最簡單的需求。
- 實時計算出當天零點截止到目前時間的銷售總額
- 計算出各個分類的銷售top3
- 每秒鐘更新一次統計結果
執行個體講解
構造資料
首先我們通過自定義source 模拟訂單的生成,生成了一個Tuple2,第一個元素是分類,第二個元素表示這個分類下産生的訂單金額,金額我們通過随機生成.
/**
* 模拟生成某一個分類下的訂單生成
*/
public static class MySource implements SourceFunction<Tuple2<String,Double>>{
private volatile boolean isRunning = true;
private Random random = new Random();
String category[] = {
"女裝", "男裝",
"圖書", "家電",
"洗護", "美妝",
"運動", "遊戲",
"戶外", "家具",
"樂器", "辦公"
};
@Override
public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{
while (isRunning){
Thread.sleep(10);
//某一個分類
String c = category[(int) (Math.random() * (category.length - 1))];
//某一個分類下産生了price的成交訂單
double price = random.nextDouble() * 100;
ctx.collect(Tuple2.of(c, price));
}
}
@Override
public void cancel(){
isRunning = false;
}
}
複制
構造統計結果類
public static class CategoryPojo{
// 分類名稱
private String category;
// 改分類總銷售額
private double totalPrice;
// 截止到目前時間的時間
private String dateTime;
getter and setter ........
}
複制
定義視窗和觸發器
DataStream<CategoryPojo> result = dataStream.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.days(
1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
1)))
.aggregate(
new PriceAggregate(),
new WindowResult()
);
複制
首先我們定義一個視窗期是一天的滾動視窗,然後設定一個1秒鐘的觸發器,之後進行聚合計算.
集合計算
private static class PriceAggregate
implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
@Override
public Double createAccumulator(){
return 0D;
}
@Override
public Double add(Tuple2<String,Double> value, Double accumulator){
return accumulator + value.f1;
}
@Override
public Double getResult(Double accumulator){
return accumulator;
}
@Override
public Double merge(Double a, Double b){
return a + b;
}
}
複制
聚合計算也比較簡單,其實就是對price的簡單sum操作
收集視窗結果資料
private static class WindowResult
implements WindowFunction<Double,CategoryPojo,Tuple,TimeWindow>{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<Double> input,
Collector<CategoryPojo> out) throws Exception{
CategoryPojo categoryPojo = new CategoryPojo();
categoryPojo.setCategory(((Tuple1<String>) key).f0);
BigDecimal bg = new BigDecimal(input.iterator().next());
double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
categoryPojo.setTotalPrice(p);
categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
out.collect(categoryPojo);
}
}
複制
我們最聚合的結果進行簡單的封裝,封裝成CategoryPojo類以便後續處理
使用聚合視窗的結果
result.keyBy("dateTime")
.window(TumblingProcessingTimeWindows.of(Time.seconds(
1)))
.process(new WindowResultProcess());
複制
接下來我們要使用上面聚合的結果,是以我們使用上面的window聚合結果流又定義了時間是1秒的滾動視窗.
如何使用視窗的結果,可以參考flink的官網[1]
結果統計
接下來我們做最後的結果統計,在這裡,我們會把各個分類的總價加起來,就是全站的總銷量金額,然後我們同時使用優先級隊列計算出分類銷售的Top3,列印出結果,在生産過程中我們可以把這個結果資料發到hbase或者redis等外部存儲,以供前端的實時頁面展示。
private static class WindowResultProcess
extends ProcessWindowFunction<CategoryPojo,Object,Tuple,TimeWindow>{
@Override
public void process(
Tuple tuple,
Context context,
Iterable<CategoryPojo> elements,
Collector<Object> out) throws Exception{
String date = ((Tuple1<String>) tuple).f0;
Queue<CategoryPojo> queue = new PriorityQueue<>(
3,
(o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
double price = 0D;
Iterator<CategoryPojo> iterator = elements.iterator();
int s = 0;
while (iterator.hasNext()){
CategoryPojo categoryPojo = iterator.next();
if (queue.size() < 3){
queue.add(categoryPojo);
} else {
CategoryPojo tmp = queue.peek();
if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){
queue.poll();
queue.add(categoryPojo);
}
}
price += categoryPojo.getTotalPrice();
}
List<String> list = queue.stream()
.sorted((o1, o2)->o1.getTotalPrice() <=
o2.getTotalPrice() ? 1 : -1)
.map(f->"(分類:" + f.getCategory() + " 銷售額:" +
f.getTotalPrice() + ")")
.collect(
Collectors.toList());
System.out.println("時間 : " + date + " 總價 : " + price + " top3 " +
StringUtils.join(list, ","));
System.out.println("-------------");
}
}
複制
示例運作結果
3> CategoryPojo{category='戶外', totalPrice=734.45, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='遊戲', totalPrice=862.86, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='洗護', totalPrice=926.83, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='運動', totalPrice=744.98, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='樂器', totalPrice=648.81, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='圖書', totalPrice=1010.12, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='家具', totalPrice=880.35, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='家電', totalPrice=1225.34, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='男裝', totalPrice=796.06, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='女裝', totalPrice=1018.88, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='美妝', totalPrice=768.37, dateTime=2020-06-13 22:55:34}
時間 : 2020-06-13 22:55:34 總價 : 9617.050000000001 top3 (分類:家電 銷售額:1225.34),(分類:女裝 銷售額:1018.88),(分類:圖書 銷售額:1010.12)
複制
完整的代碼請參考
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/BigScreem.java
參考資料
【1】https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#working-with-window-results