Apache Flink中,Window操作在流式資料進行中是非常核心的一種抽象,它把一個無限流資料集分割成一個個有界的Window(或稱為Bucket),然後就可以非常友善地定義作用于Window之上的各種計算操作。本文我們主要基于Apache Flink 1.4.0版本,說明Keyed Window與Non-Keyed Window的基本概念,然後分别對與其相關的WindowFunction與WindowAllFunction的類設計進行分析,最後通過程式設計實踐來應用。
基本概念
Flink将Window分為兩類,一類叫做Keyed Window,另一類叫做Non-Keyed Window。為了說明這兩類Window的不同,我們看下Flink官網給出的,基于這兩種類型的Window編寫代碼的結構說明。
基于Keyed Window進行程式設計,使用者代碼基本結構如下所示:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL1YzN1MzYkNmZycDN2IzN0kTO0MWOwIWMiZTO5M2MkZGN0UTMkZmM08CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)
基于Non-Keyed Window進行程式設計,使用者代碼基本結構如下所示:
上面兩種程式設計結構的差別在于:
從程式設計API上看,Keyed Window程式設計結構,可以直接對輸入的stream按照Key進行操作,輸入的stream中識别Key,即輸入stream中的每個資料元素哪一部分是作為Key來關聯這個資料元素的,這樣就可以對stream中的資料元素基于Key進行相關計算操作,如keyBy,可以根據Key進行分組(相同的Key必然可以分到同一組中去)。如果輸入的stream中沒有Key,比如就是一條日志記錄資訊,那麼無法對其進行keyBy操作。而對于Non-Keyed Window程式設計結構來說,無論輸入的stream具有何種結構(比如是否具有Key),它都認為是無結構的,不能對其進行keyBy操作,而且如果使用Non-Keyed Window函數操作,就會對該stream進行分組(具體如何分組依賴于我們選擇的WindowAssigner,它負責将stream中的每個資料元素指派到一個或多個Window中),指派到一個或多個Window中,然後後續應用到該stream上的計算都是對Window中的這些資料元素進行操作。
從計算上看,Keyed Window程式設計結構會将輸入的stream轉換成Keyed stream,邏輯上會對應多個Keyed stream,每個Keyed stream會獨立進行計算,這就使得多個Task可以對Windowing操作進行并行處理,具有相同Key的資料元素會被發到同一個Task中進行處理。而對于Non-Keyed Window程式設計結構,Non-Keyed stream邏輯上将不能split成多個stream,所有的Windowing操作邏輯隻能在一個Task中進行處理,也就是說計算并行度為1。
在實際程式設計過程中,我們可以看到DataStream的API也有對應的方法timeWindow()和timeWindowAll(),他們也分别對應着Keyed Window和Non-Keyed Window。
WindowFunction與AllWindowFunction
Flink中對輸入stream進行Windowing操作後,将到達的資料元素指派到指定的Window中,或者基于EventTime/ProcessingTime,或者基于Count,或者混合EventTime/ProcessingTime/Count,來對資料元素進行分組。那麼,在對配置設定的Window進行操作時,就需要使用Flink提供的函數(Function),而對于Window的操作,分别基于Keyed Window、Non-Keyed Window提供了WindowFunction、AllWindowFunction,通過實作特定的Window函數,能夠通路Window相關的中繼資料,來滿足實際應用需要。下面,我們從類設計的角度,來看下對應的繼承層次結構:
- Keyed Window對應的WindowFunction
Keyed Window對應的WindowFunction類圖,如下所示:
通常,如果我們想要自定義處理Window中資料元素的處理邏輯,或者通路Window對應的中繼資料,可以繼承自ProcessWindowFunction類來實作。我們看一下ProcessWindowFunction對應的類聲明:
對Keyed stream的Window進行操作,上面泛型對應4個類型參數:
IN表示進入到該ProcessWindowFunction的資料元素的類型,例如stream中上一個操作的輸出是包含兩個String類型的元組,則IN類型對應為(String, String);
OUT表示該ProcessWindowFunction處理後的輸出資料元素的類型,例如輸出一個String和一個Long的元組,則OUT類型對應為(String, Long);
KEY有一點不同,需要注意,它并不是面向應用程式設計使用者使用的,而且該值不會提供有意義的業務應用含義,在Keyed Window中它是用來跟蹤該Window的,一般應用開發中隻需要将其作為輸出的Key即可,後面我們會有對應的程式設計實踐;
W類型表示該ProcessWindowFunction作用的Window的類型,例如TimeWindow、GlobalWindow。
下面,我們看一下繼承自ProcessWindowFunction需要實作的方法,方法簽名如下所示:
進入到該Window,對應着其中一個Keyed stream。屬于某個Window的資料元素都在elements這個集合中,我們可以對這些資料元素進行處理。通過context可以通路Window對應的中繼資料資訊,比如TimeWindow的開始時間(start)和結束時間(end)。out是一個Collector,負責收集處理後的資料元素并發送到stream下遊進行處理。
- Non-Keyed Window對應的AllWindowFunction
Non-Keyed Window對應的WindowFunction類圖,如下所示:
類似地,如果我們想要自定義處理Window中資料元素的處理邏輯,或者通路Window對應的中繼資料,可以繼承自ProcessAllWindowFunction類來實作。我們看一下ProcessAllWindowFunction對應的類聲明:
可以同ProcessWindowFunction對比一下,發現ProcessAllWindowFunction的泛型參數中沒有了用來跟蹤Window的KEY,因為Non-Keyed Window隻在一個Task中進行處理,其它的OUT和W與前面ProcessWindowFunction類相同,不再累述。
繼承自ProcessAllWindowFunction,需要實作的方法,如下所示:
該ProcessAllWindowFunction作用于原始輸入的stream,所有的資料元素經過Windowing後,都會經過該方法進行處理,在該方法具體處理邏輯與ProcessWindowFunction.process()類似。
程式設計實踐
現在,我們模拟這樣一個場景:某個App開發商需要從多個管道(Channel)推廣App,需要通過日志來分析對應的使用者行為(安裝、打開、浏覽、點選、購買、關閉、解除安裝),我們假設要實時(近實時)統計分析每個時間段内(如每隔5秒)來自不同管道的使用者的行為。
首先,建立一個模拟生成資料的SourceFunction,實作代碼如下所示:
有了該資料源,我們就可以基于該SimulatedEventSource來建構Flink Streaming應用程式了。下面,也分别面向Keyed Window和Non-Keyed Window來程式設計實踐,并比較它們不同之處。
Keyed Window程式設計
我們基于Sliding Window(WindowAssigner)來在stream上生成Window,Window大小size=5s,silde=1s,即每個Window計算5s之内的資料元素,每個1s啟動一個Window(檢視送出該Flink程式的指令行中指定的各個參數值)。同時,基于上面自定義實作的SimulatedEventSource作為輸入資料源,建立Flink stream,然後後續就可以對stream進行各種操作了。
處理stream資料,我們希望能夠擷取到每個Window對應的起始時間和結束時間,然後輸出基于Window(起始時間+結束時間)、管道(Channel)、行為類型進行分組統計的結果,最後将結果資料實時寫入到指定Kafka topic中。
我們實作的Flink程式類為SlidingWindowAnalytics,代碼如下所示:
首先,對輸入stream進行一個map操作,處理輸出 ((管道, 行為類型), 計數)。
其次,基于該結果進行一個keyBy操作,指定Key為(管道, 行為類型),得到了多個Keyed stream。
接着,對每個Keyed stream應用Sliding Window操作,設定Sliding Window的size和slide值。
然後,因為我們想要擷取到Window對應的起始時間和結束時間,是以需要對Windowing後的stream進行一個ProcessWindowFunction操作,這個是我們自定義實作的,在其中擷取到Window起始時間和結束時間,并對Windowing的資料進行分組統計(groupBy),然後輸出帶有Window起始時間和結束時間,以及管道、行為類型、統計計數這些資訊,對應的實作類為MyReduceWindowFunction,代碼如下所示:
上面對應于ProcessWindowFunction的泛型參數的值,分别為:IN=((String, String), Long)、OUT=((String, String, String, String), Long)、KEY=Tuple、W=TimeWindow,這樣可以對照方法process()中的各個參數的類型來了解。上述代碼中,elements中可能存在多個相同的Key的值,但是具有同一個Key的資料元素一定會在同一個Window中(即elements),我們需要對elements進行一個groupBy的記憶體計算操作,再對每個group中的資料進行彙總計數,輸出為((Window開始時間, Window結束時間, 管道, 行為類型), 累加計數值)。這樣,即可有調用stream上的process方法,将該MyReduceWindowFunction實作的示例作為參數值傳進去即可。
最後,通過map操作将結果格式化,輸出儲存到Kafka中。
運作上面我們實作的Flink程式,執行如下指令:
送出運作後,可以通過Flink Web Dashboard檢視Job運作狀态。可以在Kafka中檢視最終結果資料,對應的輸出資料示例如下所示:
通過結果可以看到,采用Sliding Window來指派Window,随着時間流逝各個Window之間存在重疊的現象,這正是我們最初想要的結果。
- Non-Keyed Window程式設計
這裡,我們基于Tumbling Window(WindowAssigner)來在stream上生成Non-Keyed Window。Tumbling Window也被稱為固定時間視窗(Fixed Time Window),各個Window的時間長度相同,Window之間沒有重疊。
我們想要達到的目标和前面類似,也希望擷取到每個Window對應的起始時間和結束時間,是以需要實作一個ProcessWindowAllFunction,但因為是Non-Keyed Window,隻有一個Task來負責對所有輸入stream中的資料元素指派Window,這在程式設計實作中并沒有感覺到有太大的差異。實作的Flink程式為TumblingWindowAllAnalytics,代碼如下所示:
object TumblingWindowAllAnalytics {
var MAX_LAGGED_TIME = 5000L
def checkParams(params: ParameterTool) = {
if (params.getNumberOfParameters < 5) {
println("Missing parameters!\n"
+ "Usage: Windowing "
+ "--window-result-topic <windowed_result_topic> "
+ "--bootstrap.servers <kafka_brokers> "
+ "--zookeeper.connect <zk_quorum> "
+ "--window-all-lagged-millis <window_all_lagged_millis> "
+ "--window-all-size-millis <window_all_size_millis>")
System.exit(-1)
}
}
def main(args: Array[String]): Unit = {
val params = ParameterTool.fromArgs(args)
checkParams(params)
MAX_LAGGED_TIME = params.getLong("window-all-lagged-millis", MAX_LAGGED_TIME)
val windowAllSizeMillis = params.getRequired("window-all-size-millis").toLong
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val stream: DataStream[(String, String)] = env.addSource(new SimulatedEventSource)
// create a Kafka producer for Kafka 0.9.x
val kafkaProducer = new FlinkKafkaProducer09(
params.getRequired("window-result-topic"),
new SimpleStringSchema, params.getProperties
)
stream
.map(t => {
val channel = t._1
val eventFields = t._2.split("\t")
val ts = eventFields(0).toLong
val behaviorType = eventFields(3)
(ts, channel, behaviorType)
})
.assignTimestampsAndWatermarks(new TimestampExtractor(MAX_LAGGED_TIME))
.map(t => (t._2, t._3))
.timeWindowAll(Time.milliseconds(windowAllSizeMillis))
.process(new MyReduceWindowAllFunction())
.map(t => {
val key = t._1
val count = t._2
val windowStartTime = key._1
val windowEndTime = key._2
val channel = key._3
val behaviorType = key._4
Seq(windowStartTime, windowEndTime,
channel, behaviorType, count).mkString("\t")
})
.addSink(kafkaProducer)
env.execute(getClass.getSimpleName)
}
class TimestampExtractor(val maxLaggedTime: Long)
extends AssignerWithPeriodicWatermarks[(Long, String, String)] with Serializable {
var currentWatermarkTs = 0L
override def getCurrentWatermark: Watermark = {
if(currentWatermarkTs <= 0) {
new Watermark(Long.MinValue)
} else {
new Watermark(currentWatermarkTs - maxLaggedTime)
}
}
override def extractTimestamp(element: (Long, String, String),
previousElementTimestamp: Long): Long = {
val ts = element._1
Math.max(ts, currentWatermarkTs)
}
}
}
上面代碼中,我們在輸入stream開始處理時,調用DataStream的assignTimestampsAndWatermarks方法為stream中的每個資料元素指派時間戳,周期性地生成WaterMark來控制stream的處理進度(Progress),用來提取時間戳和生成WaterMark的實作參考實作類TimestampExtractor。有關WaterMark相關的内容,可以參考後面的參考連結中給出的介紹。
另外,我們實作了Flink的ProcessWindowAllFunction抽象類,對應實作類為MyReduceWindowAllFunction,用來處理每個Window中的資料,擷取對應的Window的起始時間和結束時間,實作代碼如下所示:
class MyReduceWindowAllFunction
extends ProcessAllWindowFunction[(String, String), ((String, String, String, String), Long), TimeWindow] {
override def process(context: Context,
elements: Iterable[(String, String)],
collector: Collector[((String, String, String, String), Long)]): Unit = {
val startTs = context.window.getStart
val endTs = context.window.getEnd
val elems = elements.map(t => {
((t._1, t._2), 1L)
})
for(group <- elems.groupBy(_._1)) {
val myKey = group._1
val myValue = group._2
var count = 0L
for(elem <- myValue) {
count += elem._2
}
val channel = myKey._1
val behaviorType = myKey._2
val outputKey = (formatTs(startTs), formatTs(endTs), channel, behaviorType)
collector.collect((outputKey, count))
}
}
private def formatTs(ts: Long) = {
val df = new SimpleDateFormat("yyyyMMddHHmmss")
df.format(new Date(ts))
}
}
與Keyed Window實作中的ProcessWindowFunction相比,這裡沒有了對應的泛型參數KEY,因為這種情況下隻有一個Task處理stream輸入的所有資料元素,ProcessAllWindowFunction的實作類對所有未進行groupBy(也無法進行,因為資料元素的Key未知)操作得到的Window中的資料元素進行處理,處理邏輯和前面基本相同。
送出Flink程式TumblingWindowAllAnalytics,執行如下指令行: