天天看點

flink實戰-模拟簡易雙11實時統計大屏背景執行個體講解

背景

在大資料的實時進行中,實時的大屏展示已經成了一個很重要的展示項,比如最有名的雙十一大屏實時銷售總價展示。除了這個,還有一些其他場景的應用,比如我們在我們的背景系統實時的展示我們網站目前的pv、uv等等,其實做法都是類似的。

今天我們就做一個最簡單的模拟電商統計大屏的小例子,我們抽取一下最簡單的需求。

  • 實時計算出當天零點截止到目前時間的銷售總額
  • 計算出各個分類的銷售top3
  • 每秒鐘更新一次統計結果

執行個體講解

構造資料

首先我們通過自定義source 模拟訂單的生成,生成了一個Tuple2,第一個元素是分類,第二個元素表示這個分類下産生的訂單金額,金額我們通過随機生成.

/**
  * 模拟生成某一個分類下的訂單生成
  */
 public static class MySource implements SourceFunction<Tuple2<String,Double>>{

  private volatile boolean isRunning = true;
  private Random random = new Random();
  String category[] = {
    "女裝", "男裝",
    "圖書", "家電",
    "洗護", "美妝",
    "運動", "遊戲",
    "戶外", "家具",
    "樂器", "辦公"
  };

  @Override
  public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{
   while (isRunning){
    Thread.sleep(10);
    //某一個分類
    String c = category[(int) (Math.random() * (category.length - 1))];
    //某一個分類下産生了price的成交訂單
    double price = random.nextDouble() * 100;
    ctx.collect(Tuple2.of(c, price));
   }
  }

  @Override
  public void cancel(){
   isRunning = false;
  }
 }

           

複制

構造統計結果類

public static class CategoryPojo{
  //  分類名稱
  private String category;
  //  改分類總銷售額
  private double totalPrice;
  //      截止到目前時間的時間
  private String dateTime;
  
     getter and setter ........
 }

           

複制

定義視窗和觸發器

DataStream<CategoryPojo> result = dataStream.keyBy(0)
                                              .window(TumblingProcessingTimeWindows.of(Time.days(
                                                1), Time.hours(-8)))
                                              .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
                                                1)))
                                              .aggregate(
                                                new PriceAggregate(),
                                                new WindowResult()
                                              );

           

複制

首先我們定義一個視窗期是一天的滾動視窗,然後設定一個1秒鐘的觸發器,之後進行聚合計算.

集合計算

private static class PriceAggregate
   implements AggregateFunction<Tuple2<String,Double>,Double,Double>{

  @Override
  public Double createAccumulator(){
   return 0D;
  }

  @Override
  public Double add(Tuple2<String,Double> value, Double accumulator){
   return accumulator + value.f1;
  }

  @Override
  public Double getResult(Double accumulator){
   return accumulator;
  }

  @Override
  public Double merge(Double a, Double b){
   return a + b;
  }
 }

           

複制

聚合計算也比較簡單,其實就是對price的簡單sum操作

收集視窗結果資料

private static class WindowResult
   implements WindowFunction<Double,CategoryPojo,Tuple,TimeWindow>{
  SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

  @Override
  public void apply(
    Tuple key,
    TimeWindow window,
    Iterable<Double> input,
    Collector<CategoryPojo> out) throws Exception{
   CategoryPojo categoryPojo = new CategoryPojo();
   categoryPojo.setCategory(((Tuple1<String>) key).f0);

   BigDecimal bg = new BigDecimal(input.iterator().next());
   double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
   categoryPojo.setTotalPrice(p);
   categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
   out.collect(categoryPojo);
  }
 }

           

複制

我們最聚合的結果進行簡單的封裝,封裝成CategoryPojo類以便後續處理

使用聚合視窗的結果

result.keyBy("dateTime")
        .window(TumblingProcessingTimeWindows.of(Time.seconds(
          1)))
        .process(new WindowResultProcess());

           

複制

接下來我們要使用上面聚合的結果,是以我們使用上面的window聚合結果流又定義了時間是1秒的滾動視窗.

如何使用視窗的結果,可以參考flink的官網[1]

結果統計

接下來我們做最後的結果統計,在這裡,我們會把各個分類的總價加起來,就是全站的總銷量金額,然後我們同時使用優先級隊列計算出分類銷售的Top3,列印出結果,在生産過程中我們可以把這個結果資料發到hbase或者redis等外部存儲,以供前端的實時頁面展示。

private static class WindowResultProcess
   extends ProcessWindowFunction<CategoryPojo,Object,Tuple,TimeWindow>{

  @Override
  public void process(

    Tuple tuple,
    Context context,
    Iterable<CategoryPojo> elements,
    Collector<Object> out) throws Exception{
   String date = ((Tuple1<String>) tuple).f0;

   Queue<CategoryPojo> queue = new PriorityQueue<>(
     3,
     (o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
   double price = 0D;
   Iterator<CategoryPojo> iterator = elements.iterator();
   int s = 0;
   while (iterator.hasNext()){
    CategoryPojo categoryPojo = iterator.next();
    if (queue.size() < 3){
     queue.add(categoryPojo);
    } else {
     CategoryPojo tmp = queue.peek();
     if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){
      queue.poll();
      queue.add(categoryPojo);
     }
    }
    price += categoryPojo.getTotalPrice();
   }

   List<String> list = queue.stream()
                            .sorted((o1, o2)->o1.getTotalPrice() <=
                                              o2.getTotalPrice() ? 1 : -1)
                            .map(f->"(分類:" + f.getCategory() + " 銷售額:" +
                                    f.getTotalPrice() + ")")

                            .collect(
                              Collectors.toList());
   System.out.println("時間 : " + date + "  總價 : " + price + " top3 " +
                      StringUtils.join(list, ","));
   System.out.println("-------------");
  }

 }


           

複制

示例運作結果

3> CategoryPojo{category='戶外', totalPrice=734.45, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='遊戲', totalPrice=862.86, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='洗護', totalPrice=926.83, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='運動', totalPrice=744.98, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='樂器', totalPrice=648.81, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='圖書', totalPrice=1010.12, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='家具', totalPrice=880.35, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='家電', totalPrice=1225.34, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='男裝', totalPrice=796.06, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='女裝', totalPrice=1018.88, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='美妝', totalPrice=768.37, dateTime=2020-06-13 22:55:34}
時間 : 2020-06-13 22:55:34  總價 : 9617.050000000001 top3 (分類:家電 銷售額:1225.34),(分類:女裝 銷售額:1018.88),(分類:圖書 銷售額:1010.12)

           

複制

完整的代碼請參考

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/BigScreem.java

參考資料

【1】https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#working-with-window-results