天天看點

Apache Flink:Keyed Window與Non-Keyed Window

5萬人關注的大資料成神之路,不來了解一下嗎? 5萬人關注的大資料成神之路,真的不來了解一下嗎? 5萬人關注的大資料成神之路,确定真的不來了解一下嗎?

Apache Flink中,Window操作在流式資料進行中是非常核心的一種抽象,它把一個無限流資料集分割成一個個有界的Window(或稱為Bucket),然後就可以非常友善地定義作用于Window之上的各種計算操作。本文我們主要基于Apache Flink 1.4.0版本,說明Keyed Window與Non-Keyed Window的基本概念,然後分别對與其相關的WindowFunction與WindowAllFunction的類設計進行分析,最後通過程式設計實踐來應用。

基本概念

Flink将Window分為兩類,一類叫做Keyed Window,另一類叫做Non-Keyed Window。為了說明這兩類Window的不同,我們看下Flink官網給出的,基于這兩種類型的Window編寫代碼的結構說明。

基于Keyed Window進行程式設計,使用者代碼基本結構如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

基于Non-Keyed Window進行程式設計,使用者代碼基本結構如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

上面兩種程式設計結構的差別在于:

從程式設計API上看,Keyed Window程式設計結構,可以直接對輸入的stream按照Key進行操作,輸入的stream中識别Key,即輸入stream中的每個資料元素哪一部分是作為Key來關聯這個資料元素的,這樣就可以對stream中的資料元素基于Key進行相關計算操作,如keyBy,可以根據Key進行分組(相同的Key必然可以分到同一組中去)。如果輸入的stream中沒有Key,比如就是一條日志記錄資訊,那麼無法對其進行keyBy操作。而對于Non-Keyed Window程式設計結構來說,無論輸入的stream具有何種結構(比如是否具有Key),它都認為是無結構的,不能對其進行keyBy操作,而且如果使用Non-Keyed Window函數操作,就會對該stream進行分組(具體如何分組依賴于我們選擇的WindowAssigner,它負責将stream中的每個資料元素指派到一個或多個Window中),指派到一個或多個Window中,然後後續應用到該stream上的計算都是對Window中的這些資料元素進行操作。

從計算上看,Keyed Window程式設計結構會将輸入的stream轉換成Keyed stream,邏輯上會對應多個Keyed stream,每個Keyed stream會獨立進行計算,這就使得多個Task可以對Windowing操作進行并行處理,具有相同Key的資料元素會被發到同一個Task中進行處理。而對于Non-Keyed Window程式設計結構,Non-Keyed stream邏輯上将不能split成多個stream,所有的Windowing操作邏輯隻能在一個Task中進行處理,也就是說計算并行度為1。

在實際程式設計過程中,我們可以看到DataStream的API也有對應的方法timeWindow()和timeWindowAll(),他們也分别對應着Keyed Window和Non-Keyed Window。

WindowFunction與AllWindowFunction

Flink中對輸入stream進行Windowing操作後,将到達的資料元素指派到指定的Window中,或者基于EventTime/ProcessingTime,或者基于Count,或者混合EventTime/ProcessingTime/Count,來對資料元素進行分組。那麼,在對配置設定的Window進行操作時,就需要使用Flink提供的函數(Function),而對于Window的操作,分别基于Keyed Window、Non-Keyed Window提供了WindowFunction、AllWindowFunction,通過實作特定的Window函數,能夠通路Window相關的中繼資料,來滿足實際應用需要。下面,我們從類設計的角度,來看下對應的繼承層次結構:

  • Keyed Window對應的WindowFunction

Keyed Window對應的WindowFunction類圖,如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

通常,如果我們想要自定義處理Window中資料元素的處理邏輯,或者通路Window對應的中繼資料,可以繼承自ProcessWindowFunction類來實作。我們看一下ProcessWindowFunction對應的類聲明:

Apache Flink:Keyed Window與Non-Keyed Window

對Keyed stream的Window進行操作,上面泛型對應4個類型參數:

IN表示進入到該ProcessWindowFunction的資料元素的類型,例如stream中上一個操作的輸出是包含兩個String類型的元組,則IN類型對應為(String, String);

OUT表示該ProcessWindowFunction處理後的輸出資料元素的類型,例如輸出一個String和一個Long的元組,則OUT類型對應為(String, Long);

KEY有一點不同,需要注意,它并不是面向應用程式設計使用者使用的,而且該值不會提供有意義的業務應用含義,在Keyed Window中它是用來跟蹤該Window的,一般應用開發中隻需要将其作為輸出的Key即可,後面我們會有對應的程式設計實踐;

W類型表示該ProcessWindowFunction作用的Window的類型,例如TimeWindow、GlobalWindow。

下面,我們看一下繼承自ProcessWindowFunction需要實作的方法,方法簽名如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

進入到該Window,對應着其中一個Keyed stream。屬于某個Window的資料元素都在elements這個集合中,我們可以對這些資料元素進行處理。通過context可以通路Window對應的中繼資料資訊,比如TimeWindow的開始時間(start)和結束時間(end)。out是一個Collector,負責收集處理後的資料元素并發送到stream下遊進行處理。

  • Non-Keyed Window對應的AllWindowFunction

Non-Keyed Window對應的WindowFunction類圖,如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

類似地,如果我們想要自定義處理Window中資料元素的處理邏輯,或者通路Window對應的中繼資料,可以繼承自ProcessAllWindowFunction類來實作。我們看一下ProcessAllWindowFunction對應的類聲明:

Apache Flink:Keyed Window與Non-Keyed Window

可以同ProcessWindowFunction對比一下,發現ProcessAllWindowFunction的泛型參數中沒有了用來跟蹤Window的KEY,因為Non-Keyed Window隻在一個Task中進行處理,其它的OUT和W與前面ProcessWindowFunction類相同,不再累述。

繼承自ProcessAllWindowFunction,需要實作的方法,如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

該ProcessAllWindowFunction作用于原始輸入的stream,所有的資料元素經過Windowing後,都會經過該方法進行處理,在該方法具體處理邏輯與ProcessWindowFunction.process()類似。

程式設計實踐

現在,我們模拟這樣一個場景:某個App開發商需要從多個管道(Channel)推廣App,需要通過日志來分析對應的使用者行為(安裝、打開、浏覽、點選、購買、關閉、解除安裝),我們假設要實時(近實時)統計分析每個時間段内(如每隔5秒)來自不同管道的使用者的行為。

首先,建立一個模拟生成資料的SourceFunction,實作代碼如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

有了該資料源,我們就可以基于該SimulatedEventSource來建構Flink Streaming應用程式了。下面,也分别面向Keyed Window和Non-Keyed Window來程式設計實踐,并比較它們不同之處。

Keyed Window程式設計

我們基于Sliding Window(WindowAssigner)來在stream上生成Window,Window大小size=5s,silde=1s,即每個Window計算5s之内的資料元素,每個1s啟動一個Window(檢視送出該Flink程式的指令行中指定的各個參數值)。同時,基于上面自定義實作的SimulatedEventSource作為輸入資料源,建立Flink stream,然後後續就可以對stream進行各種操作了。

處理stream資料,我們希望能夠擷取到每個Window對應的起始時間和結束時間,然後輸出基于Window(起始時間+結束時間)、管道(Channel)、行為類型進行分組統計的結果,最後将結果資料實時寫入到指定Kafka topic中。

我們實作的Flink程式類為SlidingWindowAnalytics,代碼如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

首先,對輸入stream進行一個map操作,處理輸出 ((管道, 行為類型), 計數)。

其次,基于該結果進行一個keyBy操作,指定Key為(管道, 行為類型),得到了多個Keyed stream。

接着,對每個Keyed stream應用Sliding Window操作,設定Sliding Window的size和slide值。

然後,因為我們想要擷取到Window對應的起始時間和結束時間,是以需要對Windowing後的stream進行一個ProcessWindowFunction操作,這個是我們自定義實作的,在其中擷取到Window起始時間和結束時間,并對Windowing的資料進行分組統計(groupBy),然後輸出帶有Window起始時間和結束時間,以及管道、行為類型、統計計數這些資訊,對應的實作類為MyReduceWindowFunction,代碼如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

上面對應于ProcessWindowFunction的泛型參數的值,分别為:IN=((String, String), Long)、OUT=((String, String, String, String), Long)、KEY=Tuple、W=TimeWindow,這樣可以對照方法process()中的各個參數的類型來了解。上述代碼中,elements中可能存在多個相同的Key的值,但是具有同一個Key的資料元素一定會在同一個Window中(即elements),我們需要對elements進行一個groupBy的記憶體計算操作,再對每個group中的資料進行彙總計數,輸出為((Window開始時間, Window結束時間, 管道, 行為類型), 累加計數值)。這樣,即可有調用stream上的process方法,将該MyReduceWindowFunction實作的示例作為參數值傳進去即可。

最後,通過map操作将結果格式化,輸出儲存到Kafka中。

運作上面我們實作的Flink程式,執行如下指令:

Apache Flink:Keyed Window與Non-Keyed Window

送出運作後,可以通過Flink Web Dashboard檢視Job運作狀态。可以在Kafka中檢視最終結果資料,對應的輸出資料示例如下所示:

Apache Flink:Keyed Window與Non-Keyed Window

通過結果可以看到,采用Sliding Window來指派Window,随着時間流逝各個Window之間存在重疊的現象,這正是我們最初想要的結果。

  • Non-Keyed Window程式設計

這裡,我們基于Tumbling Window(WindowAssigner)來在stream上生成Non-Keyed Window。Tumbling Window也被稱為固定時間視窗(Fixed Time Window),各個Window的時間長度相同,Window之間沒有重疊。

我們想要達到的目标和前面類似,也希望擷取到每個Window對應的起始時間和結束時間,是以需要實作一個ProcessWindowAllFunction,但因為是Non-Keyed Window,隻有一個Task來負責對所有輸入stream中的資料元素指派Window,這在程式設計實作中并沒有感覺到有太大的差異。實作的Flink程式為TumblingWindowAllAnalytics,代碼如下所示:

object TumblingWindowAllAnalytics {
  var MAX_LAGGED_TIME = 5000L
 
  def checkParams(params: ParameterTool) = {
    if (params.getNumberOfParameters < 5) {
      println("Missing parameters!\n"
        + "Usage: Windowing "
        + "--window-result-topic <windowed_result_topic> "
        + "--bootstrap.servers <kafka_brokers> "
        + "--zookeeper.connect <zk_quorum> "
        + "--window-all-lagged-millis <window_all_lagged_millis> "
        + "--window-all-size-millis <window_all_size_millis>")
      System.exit(-1)
    }
  }
 
  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    checkParams(params)
    MAX_LAGGED_TIME = params.getLong("window-all-lagged-millis", MAX_LAGGED_TIME)
    val windowAllSizeMillis = params.getRequired("window-all-size-millis").toLong
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val stream: DataStream[(String, String)] = env.addSource(new SimulatedEventSource)
 
    // create a Kafka producer for Kafka 0.9.x
    val kafkaProducer = new FlinkKafkaProducer09(
      params.getRequired("window-result-topic"),
      new SimpleStringSchema, params.getProperties
    )
 
    stream
      .map(t => {
        val channel = t._1
        val eventFields = t._2.split("\t")
        val ts = eventFields(0).toLong
        val behaviorType = eventFields(3)
        (ts, channel, behaviorType)
      })
      .assignTimestampsAndWatermarks(new TimestampExtractor(MAX_LAGGED_TIME))
      .map(t => (t._2, t._3))
      .timeWindowAll(Time.milliseconds(windowAllSizeMillis))
      .process(new MyReduceWindowAllFunction())
      .map(t => {
        val key = t._1
        val count = t._2
        val windowStartTime = key._1
        val windowEndTime = key._2
        val channel = key._3
        val behaviorType = key._4
        Seq(windowStartTime, windowEndTime,
          channel, behaviorType, count).mkString("\t")
      })
      .addSink(kafkaProducer)
 
    env.execute(getClass.getSimpleName)
  }
 
  class TimestampExtractor(val maxLaggedTime: Long)
    extends AssignerWithPeriodicWatermarks[(Long, String, String)] with Serializable {
 
    var currentWatermarkTs = 0L
 
    override def getCurrentWatermark: Watermark = {
      if(currentWatermarkTs <= 0) {
        new Watermark(Long.MinValue)
      } else {
        new Watermark(currentWatermarkTs - maxLaggedTime)
      }
    }
 
    override def extractTimestamp(element: (Long, String, String),
                                  previousElementTimestamp: Long): Long = {
      val ts = element._1
      Math.max(ts, currentWatermarkTs)
    }
  }
}           

上面代碼中,我們在輸入stream開始處理時,調用DataStream的assignTimestampsAndWatermarks方法為stream中的每個資料元素指派時間戳,周期性地生成WaterMark來控制stream的處理進度(Progress),用來提取時間戳和生成WaterMark的實作參考實作類TimestampExtractor。有關WaterMark相關的内容,可以參考後面的參考連結中給出的介紹。

另外,我們實作了Flink的ProcessWindowAllFunction抽象類,對應實作類為MyReduceWindowAllFunction,用來處理每個Window中的資料,擷取對應的Window的起始時間和結束時間,實作代碼如下所示:

class MyReduceWindowAllFunction
  extends ProcessAllWindowFunction[(String, String), ((String, String, String, String), Long), TimeWindow] {
 
  override def process(context: Context,
                       elements: Iterable[(String, String)],
                       collector: Collector[((String, String, String, String), Long)]): Unit = {
    val startTs = context.window.getStart
    val endTs = context.window.getEnd
    val elems = elements.map(t => {
      ((t._1, t._2), 1L)
    })
    for(group <- elems.groupBy(_._1)) {
      val myKey = group._1
      val myValue = group._2
      var count = 0L
      for(elem <- myValue) {
        count += elem._2
      }
      val channel = myKey._1
      val behaviorType = myKey._2
      val outputKey = (formatTs(startTs), formatTs(endTs), channel, behaviorType)
      collector.collect((outputKey, count))
    }
  }
 
  private def formatTs(ts: Long) = {
    val df = new SimpleDateFormat("yyyyMMddHHmmss")
    df.format(new Date(ts))
  }
}           

與Keyed Window實作中的ProcessWindowFunction相比,這裡沒有了對應的泛型參數KEY,因為這種情況下隻有一個Task處理stream輸入的所有資料元素,ProcessAllWindowFunction的實作類對所有未進行groupBy(也無法進行,因為資料元素的Key未知)操作得到的Window中的資料元素進行處理,處理邏輯和前面基本相同。

送出Flink程式TumblingWindowAllAnalytics,執行如下指令行:

參考連結

《大資料成神之路》 https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102