天天看點

基于阿裡雲官方Flink滾動視窗測試示例完善篇

一 官方文檔背景

首先列出官方文檔對于Flink滾動視窗的介紹以及示例示範:

https://help.aliyun.com/document_detail/62511.html?spm=a2c4g.11174283.6.650.73161e494aJMpz

對于具體滾動視窗的含義和參數及SQL的使用不再詳盡介紹,官方文檔介紹的已經相當完善(niubi);

Flink SQL支援的視窗聚合主要是兩種:Window聚合和Over聚合。本文檔主要介紹Window聚合。Window聚合支援兩種時間屬性定義視窗:Event Time和Processing Time。

本文主要驗證和完善的是Flink SQL對于滾動視窗函數的event time的示例的完善;

如下是關于event time的解釋

Event Time

Event Time也稱為Row Time。EventTime時間屬性必須在源表DDL中聲明,可以将源表中的某一字段聲明成Event Time。目前隻支援将TIMESTAMP類型(将來會支援LONG類型)聲明成Row Time字段。如果源表中需要聲明為Event Time的列不是TIMESTAMP類型,需要借助計算列,基于現有列構造出一個TIMESTAMP類型的列。

由于資料本身的亂序、網絡的抖動(網絡堵塞導緻的資料傳輸延遲的變化)或者其它原因,導緻了資料到達的順序和被處理的順序,可能是不一緻的(亂序)。是以定義一個Row Time字段,需要明文定義一個Watermark計算方法。

至于為什麼完善,就用文章的較長的描述過程來解釋吧。

二 原始文檔測試過程及問題

2.1 測試流程及相關準備

這裡測試的整個流程如下;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

通過datahub的logstash插件将文檔中示例的資料放到csv檔案當中,通過logstash同步到datahub當中;

詳細logstash同步可以參考:

https://help.aliyun.com/document_detail/47451.html?spm=a2c4g.11174283.6.553.771463efHE1Mot

這裡将時間換算成Unix時間戳格式類型,并且要符合datahub微妙級别的格式即16位,如下換算以後末尾增加6個0;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

并且在datahub中使用bigint方式來存儲;

還有一種方式是将标準時間格式的資料以string字元串類型方式存儲到datahub中然後再連接配接到flink當中進行計算;

兩種方式都需要在flink中進行進一步的處理;

如下是使用bigint方式存儲的字段進行flink處理的方式;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

如下是使用string方式存儲的字段進行flink處理的方式;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

2.2 原始測試流程

在client源端将資料寫入csv檔案

基于阿裡雲官方Flink滾動視窗測試示例完善篇

然後通過logstash插件将資料寫入datahub中;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

如下是datahub捕獲到的資料情況

基于阿裡雲官方Flink滾動視窗測試示例完善篇

通過Flink接datahub存儲進行作業開發,開發代碼如下,和文檔中示例代碼一緻,做了時間類型的轉換,否則會報錯;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

啟動任務監控,寫入資料以後監控發現有6條記錄寫入,但是輸出是1條;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

查詢輸出print結果隻輸出了1條完全不符合文檔中描述的輸出3條記錄的結果;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

2.3 資料丢失統計

通過如上的測試,我們通過string存儲标準時間格式的方式再次進行了驗證,結果依然一樣,會發生資料丢失的情況;

如下是詳細的解釋;

需要注意這裡的withOffset設定的是2000即2s;

WATERMARK wk FOR ts as withOffset(ts, 2000) --為Rowtime定義Watermark。
基于阿裡雲官方Flink滾動視窗測試示例完善篇

最終查到目标端輸出的結果隻有如下幾條資料,其他資料均丢失;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

三 原始文檔測試流程疑問處理

基于官方文檔滾動視窗函數連接配接内容源端Flink引用的資料來源是從datahub同步過來的;但是存在一個問題是标準時間格式的資料同步到datahub以後就變成了16位(微妙級别的Unix時間戳)的timestamp字段來存儲時間;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

而讀取到flink以後滾動視窗識别的timestamp時間類型字段的Unix時間戳預設支援識别毫秒級别,即13位的時間戳,直接影響到的資料的計算;

但是在文檔中并沒有對這部分資料進行實際的處理影響使用者測試參考,其實官方文檔額外提供了另外的一篇文章--“計算列”:

https://help.aliyun.com/document_detail/110847.html?spm=a2c4g.11186623.2.11.46be1216uEKQSM#concept-nnx-bwy-bhb

詳細解釋了如何進行針對datahub類似的時間戳格式字段的轉換以及flink SQL對應的支援處理方式;這樣就解決了時間戳無法識别的問題;

四 基于“計算列”模拟測試

4.1 “計算列”模拟測試

從第三章的講述基于“計算列”的時間處理通過logstash插件模拟再次進行測試滾動視窗的實作;

開發作業代碼如下:

--SQL
--********************************************************************--
--Author: asp_dmp
--CreateTime: 2019-11-07 10:34:36
--Comment: tumble_window
--********************************************************************--

CREATE TABLE tumble_window (
username VARCHAR,
click_url VARCHAR,
`time` bigint,
ts as to_timestamp(`time`/1000),
WATERMARK FOR ts as withOffset(ts, 2000)
) WITH (
type = 'datahub',
endPoint = 'http://dh-cn-beijing-int-vpc.aliyuncs.com',
roleArn='acs:ram::xxxxxxxx:role/aliyunstreamdefaultrole',
project = 'huiyan_zrh',
topic = 'tumble_window_int'
);

CREATE TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
type='print'
);

INSERT INTO tumble_output
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE),
TUMBLE_END(ts, INTERVAL '1' MINUTE),
username,
COUNT(click_url)
FROM tumble_window
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;           

經過測試源端通過logstash寫入12條資料記錄;

Jark,http://taobao.com/xxx,1507600800000000       2017-10-10 10:00:00.0  
Jark,http://taobao.com/xxx,1507600810000000       2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000       2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000       2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000       2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000       2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000      2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000      2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000      2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000      2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000       2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000       2017-10-10 10:04:29.0           

注:對于如上表格中後邊的時間格式僅為了解釋使用的時間情況,測試不需要

檢視logstash插入日志顯示插入12條,從插入情況看并非是順序一條條進去的;這裡留個疑問,後邊内容來解釋

基于阿裡雲官方Flink滾動視窗測試示例完善篇

在flink側檢視發現接收到了12條資料記錄,但是輸出隻有一條,

基于阿裡雲官方Flink滾動視窗測試示例完善篇

從如上的開發作業的實作效果來看,我們想實作的是按照每分鐘使用者點選網頁的次數來分組統計,也就是說希望得到的是6條資料,然而隻輸出一條;

還是有部分資料丢失了;

然後懷疑主要原因是消息處理延遲嗎?Watermark如果設定大一點,降低敏感度,是不是資料不會被丢棄?或者還有其他更好的方案嗎?

4.2 Watermark參數的功能

基于4.1章節,在這裡就要引申出來開發的flink SQL中引用的Watermark參數了;

使用方式如下:

WATERMARK [watermarkName] FOR AS withOffset(, offset)

對于Watermark用如下示例解釋:

WATERMARK FOR rowtime AS withOffset(rowtime, 4000)

Watermark時間為 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。這條資料的Watermark時間含義:時間戳小于1501750580000(2017-08-03 08:56:20.000)的資料已經全部到達。

對于Watermark的使用總結如下:

a.Watermark含義是所有時間戳t'< t 的事件已經全部發生。若t(Watermark)已經生效,則後續Event Time小于t的記錄将全部丢棄(後續支援使用者配置,使Event Time小于t的資料也能繼續更新)。

b.針對亂序的的流,Watermark至關重要。即使部分事件延遲到達,也不會過大影響視窗計算的正确性。

c.并行資料流中,當算子(Operator)有多個輸入流時,算子的Event Time以最小流Event Time為準。

詳細參考:

https://help.aliyun.com/document_detail/110837.html?spm=a2c4g.11186623.6.624.7ab63a98bPsoRU

五 資料“沒有被抛棄”

基于Watermark的解釋,其實資料不應該丢棄,而offset參數設定也是為了能夠進一步保證資料不會被丢棄;

既然上述的所有測試都出現的資料丢棄的情況發生;那麼我們就要回到最初的源頭來考慮滾動視窗到底是要實作什麼樣的功能呢?

其實從業務角度來考慮的話并不難以了解,就拿官方提供的示例來講,使用者點選頁面,即使并發再高,速度再快,時間總是有先後順序的,入庫也是肯定是有先後的,是以就可以模拟這樣一個順序寫入的一個場景然後觀察資料是否會被丢棄;

這時候有人可能會疑問還有時區的問題存在,那麼就可以考慮參考官方提供時區内容部分;

https://help.aliyun.com/document_detail/96910.html?spm=a2c4g.11186623.6.622.2ffc56cfVYMyD7

當然至于像淘寶,天貓這樣遍布全球的大電商,對于這樣場景的統計,就不得而知了,我想可能内部會基于時區做轉換吧,或者先考慮本地計算彙總然後再做總的彙總?

瞎猜的,佩服阿裡雲的這些大佬們,這裡省略10000個欽佩感慨的字;

那麼回過頭來,我們就以中原標準時間為準,進行模拟順序寫入的方式來實作滾動視窗,看是否會出現資料丢失的情況發生;

同樣,我們通過logstash一條條的順序寫入到datahub當中;

Jark,http://taobao.com/xxx,1507600800000000   2017-10-10 10:00:00.0  
Jark,http://taobao.com/xxx,1507600810000000   2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000   2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000   2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000   2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000   2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000  2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000  2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000  2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000  2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000   2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000   2017-10-10 10:04:29.0
Tim,http://taobao.com/xxxx,1507601070000000   2017-10-10 10:04:30.0
Tim,http://taobao.com/xxxx,1507601071000000   2017-10-10 10:04:31.0
Tim,http://taobao.com/xxxx,1507601072000000   2017-10-10 10:04:32.0
Tim,http://taobao.com/xxxx,1507601078000000   2017-10-10 10:04:38.0
Tim,http://taobao.com/xxxx,1507601082000000   2017-10-10 10:04:42.0
Tim,http://taobao.com/xxxx,1507601083000000   2017-10-10 10:04:43.0
Tim,http://taobao.com/xxxx,1507601101000000   2017-10-10 10:05:01.0
Tim,http://taobao.com/xxxx,1507601110000000   2017-10-10 10:05:10.0           

輸入的csv檔案内容如下:

基于阿裡雲官方Flink滾動視窗測試示例完善篇

資料通過logstash插入datahub日志列印如下:

這裡顯示的就是一條條的進行了插入

基于阿裡雲官方Flink滾動視窗測試示例完善篇

然後我們再次從flink當中檢視資料統計的輸出情況,資料沒有丢失,并且最新的資料的輸出,需要下遊資料再進來以後進行計算統計;

基于阿裡雲官方Flink滾動視窗測試示例完善篇

至此,我們這裡也就應該解釋清楚了吧;

資料之是以丢,是因為“若t(Watermark)已經生效,則後續Event Time小于t的記錄将全部丢棄。”這句話,原因從測試的整個現象來推測,是并行寫入導緻的(解釋上述4.1章節疑問,并行寫入加上我們的資料是一下子輸入到datahub中的),可能我們認為應該是順序寫入的,是以出現了資料的丢失;而如果按照滾動視窗來說,就拿文檔中使用者點選頁面來舉例,點選的情況是随着時間一下一下點選出來的,是以基本是順序寫入,然後根據相應的時間間隔粒度來統計,資料就不丢失了。

對于offset則是要控制在并行寫入(無序)的情況下,對于資料延遲問題的解決,結論就是在offset設定過小時,出現資料丢失的機率較大;offset設定過大,則又會出現資料處理不及時的情況,有興趣的同學可以通過基于連續時間以及offset設定來驗證,篇幅有限,各位有興趣可搞一波事情;