天天看點

Flink 零基礎實戰教程:如何計算實時熱門商品

在上一篇入門教程中,我們已經能夠快速建構一個基礎的 Flink 程式了。本文會一步步地帶領你實作一個更複雜的 Flink 應用程式:實時熱門商品。在開始本文前我們建議你先實踐一遍上篇文章,因為本文會沿用上文的my-flink-project項目架構。

通過本文你将學到:

  • 如何基于 EventTime 處理,如何指定 Watermark
  • 如何使用 Flink 靈活的 Window API
  • 何時需要用到 State,以及如何使用
  • 如何使用 ProcessFunction 實作 TopN 功能

實戰案例介紹

“實時熱門商品”的需求,我們可以将“實時熱門商品”翻譯成程式員更好了解的需求:每隔5分鐘輸出最近一小時内點選量最多的前 N 個商品。将這個需求進行分解我們大概要做這麼幾件事情:

  • 抽取出業務時間戳,告訴 Flink 架構基于業務時間做視窗
  • 過濾出點選行為資料
  • 按一小時的視窗大小,每5分鐘統計一次,做滑動視窗聚合(Sliding Window)
  • 按每個視窗聚合,輸出每個視窗中點選量前N名的商品

資料準備

這裡我們準備了一份淘寶使用者行為資料集(來自阿裡雲天池公開資料集,特别感謝)。本資料集包含了淘寶上某一天随機一百萬使用者的所有行為(包括點選、購買、加購、收藏)。資料集的組織形式和MovieLens-20M類似,即資料集的每一行表示一條使用者行為,由使用者ID、商品ID、商品類目ID、行為類型和時間戳組成,并以逗号分隔。關于資料集中每一列的較長的描述如下:

列名稱 說明
使用者ID 整數類型,加密後的使用者ID
商品ID 整數類型,加密後的商品ID
商品類目ID 整數類型,加密後的商品所屬類目ID
行為類型 字元串,枚舉類型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’)
時間戳 行為發生的時間戳,機關秒

你可以通過下面的指令下載下傳資料集到項目的 resources 目錄下:

$ cd my-flink-project/src/main/resources
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv           

這裡是否使用 curl 指令下載下傳資料并不重要,你也可以使用 wget 指令或者直接通路連結下載下傳資料。關鍵是,将資料檔案儲存到項目的 resources 目錄下,友善應用程式通路。

編寫程式

在 src/main/java/myflink 下建立 HotItems.java 檔案:

package myflink;

public class HotItems {

  public static void main(String[] args) throws Exception {
    
  }
}           

與上文一樣,我們會一步步往裡面填充代碼。第一步仍然是建立一個 StreamExecutionEnvironment,我們把它添加到 main 函數中。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 為了列印到控制台的結果不亂序,我們配置全局的并發為1,這裡改變并發對結果正确性沒有影響
env.setParallelism(1);           

建立模拟資料源

在資料準備章節,我們已經将測試的資料集下載下傳到本地了。由于是一個csv檔案,我們将使用 CsvInputFormat 建立模拟資料源。

注:雖然一個流式應用應該是一個一直運作着的程式,需要消費一個無限資料源。但是在本案例教程中,為了省去建構真實資料源的繁瑣,我們使用了檔案來模拟真實資料源,這并不影響下文要介紹的知識點。這也是一種本地驗證 Flink 應用程式正确性的常用方式。

我們先建立一個 UserBehavior 的 POJO 類(所有成員變量聲明成public便是POJO類),強類型化後能友善後續的處理。

/** 使用者行為資料結構 **/
public static class UserBehavior {
  public long userId;         // 使用者ID
  public long itemId;         // 商品ID
  public int categoryId;      // 商品類目ID
  public String behavior;     // 使用者行為, 包括("pv", "buy", "cart", "fav")
  public long timestamp;      // 行為發生的時間戳,機關秒
}           

接下來我們就可以建立一個 PojoCsvInputFormat 了, 這是一個讀取 csv 檔案并将每一行轉成指定 POJO

類型(在我們案例中是 UserBehavior)的輸入器。

// UserBehavior.csv 的本地檔案路徑
URL fileUrl = HotItems2.class.getClassLoader().getResource("UserBehavior.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
// 由于 Java 反射抽取出的字段順序是不确定的,需要顯式指定下檔案中字段的順序
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
// 建立 PojoCsvInputFormat
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);           

下一步我們用 PojoCsvInputFormat 建立輸入源。

DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);           

這就建立了一個 UserBehavior 類型的 DataStream。

EventTime 與 Watermark

當我們說“統計過去一小時内點選量”,這裡的“一小時”是指什麼呢? 在 Flink 中它可以是指 ProcessingTime ,也可以是 EventTime,由使用者決定。

  • ProcessingTime:事件被處理的時間。也就是由機器的系統時間來決定。
  • EventTime:事件發生的時間。一般就是資料本身攜帶的時間。

在本案例中,我們需要統計業務時間上的每小時的點選量,是以要基于 EventTime 來處理。那麼如果讓 Flink 按照我們想要的業務時間來處理呢?這裡主要有兩件事情要做。

第一件是告訴 Flink 我們現在按照 EventTime 模式進行處理,Flink 預設使用 ProcessingTime 處理,是以我們要顯式設定下。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);           

第二件事情是指定如何獲得業務時間,以及生成 Watermark。Watermark 是用來追蹤業務事件的概念,可以了解成 EventTime 世界中的時鐘,用來訓示目前處理到什麼時刻的資料了。由于我們的資料源的資料已經經過整理,沒有亂序,即事件的時間戳是單調遞增的,是以可以将每條資料的業務時間就當做 Watermark。這裡我們用 AscendingTimestampExtractor 來實作時間戳的抽取和 Watermark 的生成。

注:真實業務場景一般都是存在亂序的,是以一般使用 BoundedOutOfOrdernessTimestampExtractor。
DataStream<UserBehavior> timedData = dataSource
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
      @Override
      public long extractAscendingTimestamp(UserBehavior userBehavior) {
        // 原始資料機關秒,将其轉成毫秒
        return userBehavior.timestamp * 1000;
      }
    });           

這樣我們就得到了一個帶有時間标記的資料流了,後面就能做一些視窗的操作。

過濾出點選事件

在開始視窗操作之前,先回顧下需求“每隔5分鐘輸出過去一小時内點選量最多的前 N 個商品”。由于原始資料中存在點選、加購、購買、收藏各種行為的資料,但是我們隻需要統計點選量,是以先使用 FilterFunction 将點選行為資料過濾出來。

DataStream<UserBehavior> pvData = timedData
    .filter(new FilterFunction<UserBehavior>() {
      @Override
      public boolean filter(UserBehavior userBehavior) throws Exception {
        // 過濾出隻有點選的資料
        return userBehavior.behavior.equals("pv");
      }
    });           

視窗統計點選量

由于要每隔5分鐘統計一次最近一小時每個商品的點選量,是以視窗大小是一小時,每隔5分鐘滑動一次。即分别要統計 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等視窗的商品點選量。是一個常見的滑動視窗需求(Sliding Window)。

DataStream<ItemViewCount> windowedData = pvData
    .keyBy("itemId")
    .timeWindow(Time.minutes(60), Time.minutes(5))
    .aggregate(new CountAgg(), new WindowResultFunction());           

我們使用.keyBy("itemId")對商品進行分組,使用.timeWindow(Time size, Time slide)對每個商品做滑動視窗(1小時視窗,5分鐘滑動一次)。然後我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉資料,減少 state 的存儲壓力。較之.apply(WindowFunction wf)會将視窗中的資料都存儲下來,最後一起計算要高效地多。aggregate()方法的第一個參數用于

這裡的CountAgg實作了AggregateFunction接口,功能是統計視窗中的條數,即遇到一條資料就加一。

/** COUNT 統計的聚合函數實作,每出現一條記錄加一 */
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

  @Override
  public Long createAccumulator() {
    return 0L;
  }

  @Override
  public Long add(UserBehavior userBehavior, Long acc) {
    return acc + 1;
  }

  @Override
  public Long getResult(Long acc) {
    return acc;
  }

  @Override
  public Long merge(Long acc1, Long acc2) {
    return acc1 + acc2;
  }
}           

.aggregate(AggregateFunction af, WindowFunction wf) 的第二個參數WindowFunction将每個 key每個視窗聚合後的結果帶上其他資訊進行輸出。我們這裡實作的WindowResultFunction将主鍵商品ID,視窗,點選量封裝成了ItemViewCount進行輸出。

/** 用于輸出視窗的結果 */
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

  @Override
  public void apply(
      Tuple key,  // 視窗的主鍵,即 itemId
      TimeWindow window,  // 視窗
      Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值
      Collector<ItemViewCount> collector  // 輸出類型為 ItemViewCount
  ) throws Exception {
    Long itemId = ((Tuple1<Long>) key).f0;
    Long count = aggregateResult.iterator().next();
    collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
  }
}

/** 商品點選量(視窗操作的輸出類型) */
public static class ItemViewCount {
  public long itemId;     // 商品ID
  public long windowEnd;  // 視窗結束時間戳
  public long viewCount;  // 商品的點選量

  public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
    ItemViewCount result = new ItemViewCount();
    result.itemId = itemId;
    result.windowEnd = windowEnd;
    result.viewCount = viewCount;
    return result;
  }
}           

現在我們得到了每個商品在每個視窗的點選量的資料流。

TopN 計算最熱門商品

為了統計每個視窗下最熱門的商品,我們需要再次按視窗進行分組,這裡根據ItemViewCount中的windowEnd進行keyBy()操作。然後使用 ProcessFunction 實作一個自定義的 TopN 函數 TopNHotItems 來計算點選量排名前3名的商品,并将排名結果格式化成字元串,便于後續輸出。

DataStream<String> topItems = windowedData
    .keyBy("windowEnd")
    .process(new TopNHotItems(3));  // 求點選量前3名的商品           

ProcessFunction 是 Flink 提供的一個 low-level API,用于實作更進階的功能。它主要提供了定時器 timer 的功能(支援EventTime或ProcessingTime)。本案例中我們将利用 timer 來判斷何時收齊了某個 window 下所有商品的點選量資料。由于 Watermark 的進度是全局的,

在 processElement 方法中,每當收到一條資料(ItemViewCount),我們就注冊一個 windowEnd+1 的定時器(Flink 架構會自動忽略同一時間的重複注冊)。windowEnd+1 的定時器被觸發時,意味着收到了windowEnd+1的 Watermark,即收齊了該windowEnd下的所有商品視窗統計值。我們在 onTimer() 中處理将收集的所有商品及點選量進行排序,選出 TopN,并将排名資訊格式化成字元串後進行輸出。

這裡我們還使用了 ListState 來存儲收到的每條 ItemViewCount 消息,保證在發生故障時,狀态資料的不丢失和一緻性。ListState 是 Flink 提供的類似 Java List 接口的 State API,它內建了架構的 checkpoint 機制,自動做到了 exactly-once 的語義保證。

/** 求某個視窗中前 N 名的熱門點選商品,key 為視窗時間戳,輸出為 TopN 的結果字元串 */
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {

  private final int topSize;

  public TopNHotItems(int topSize) {
    this.topSize = topSize;
  }

  // 用于存儲商品與點選數的狀态,待收齊同一個視窗的資料後,再觸發 TopN 計算
  private ListState<ItemViewCount> itemState;

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    // 狀态的注冊
    ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
        "itemState-state",
        ItemViewCount.class);
    itemState = getRuntimeContext().getListState(itemsStateDesc);
  }

  @Override
  public void processElement(
      ItemViewCount input,
      Context context,
      Collector<String> collector) throws Exception {

    // 每條資料都儲存到狀态中
    itemState.add(input);
    // 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬于windowEnd視窗的所有商品資料
    context.timerService().registerEventTimeTimer(input.windowEnd + 1);
  }

  @Override
  public void onTimer(
      long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    // 擷取收到的所有商品點選量
    List<ItemViewCount> allItems = new ArrayList<>();
    for (ItemViewCount item : itemState.get()) {
      allItems.add(item);
    }
    // 提前清除狀态中的資料,釋放空間
    itemState.clear();
    // 按照點選量從大到小排序
    allItems.sort(new Comparator<ItemViewCount>() {
      @Override
      public int compare(ItemViewCount o1, ItemViewCount o2) {
        return (int) (o2.viewCount - o1.viewCount);
      }
    });
    // 将排名資訊格式化成 String, 便于列印
    StringBuilder result = new StringBuilder();
    result.append("====================================\n");
    result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n");
    for (int i=0;i<topSize;i++) {
      ItemViewCount currentItem = allItems.get(i);
      // No1:  商品ID=12224  浏覽量=2413
      result.append("No").append(i).append(":")
            .append("  商品ID=").append(currentItem.itemId)
            .append("  浏覽量=").append(currentItem.viewCount)
            .append("\n");
    }
    result.append("====================================\n\n");

    out.collect(result.toString());
  }
}           

列印輸出

最後一步我們将結果列印輸出到控制台,并調用env.execute執行任務。

topItems.print();
env.execute("Hot Items Job");           

運作程式

直接運作 main 函數,就能看到不斷輸出的每個時間點的熱門商品ID。

總結

本文的完整代碼可以通過 GitHub 通路到。本文通過實作一個“實時熱門商品”的案例,學習和實踐了 Flink 的多個核心概念和 API 用法。包括 EventTime、Watermark 的使用,State 的使用,Window API 的使用,以及 TopN 的實作。希望本文能加深大家對 Flink 的了解,幫助大家解決實戰上遇到的問題。

整代碼請移步 GitHub 通路 : https://github.com/wuchong/my-flink-project/blob/master/src/main/java/myflink/HotItems.java

繼續閱讀