天天看點

Flink-CEP之模式流與運算符模式流運算符

之前我們分析了CEP的API,接下來我們将分析CEP API的内部實作包括模式流與運算符。

模式流(PatternStream)是CEP模式比對的流抽象,一個PatternStream對象表示模式檢測到的序列所對應的流。該序列以映射來表示,以模式名關聯一組事件對象。

為了使用PatternStream,我們首先要建構它,為此Flink提供了一個名為CEP的幫助類,它定義了一個pattern靜态方法:

該方法接收初始事件流DataStream對象以及用于比對的Pattern對象,在pattern方法内部通過将這兩個參數傳遞給PatternStream的構造方法來建構該對象的。

從之前的案例代碼中我們看到,通常會在PatternStream上調用select或flatSelect來擷取某個模式下比對到的事件來實作我們的業務邏輯。而在select/flatSelect方法内部,其實仍然是借助于正常DataStream實作的,我們以其中select方法(存在多個重載)一個作為示例:

方法的第一行,借助于CEPOperatorUtils這一幫助類建構DataStream

接着,會判斷初始輸入流是否是基于鍵分組的(KeyedStream),這是為了采用不同的運算符對初始輸入流進行轉換,如果是KeyedStream,則将其初始輸入流進行強制轉換為KeyedStream并采用KeyedCEPPatternOperator:

如果是普通未分組的資料流,則采用CEPPatternOperator:

從上面我們看到無論是哪種運算符都要求傳遞NFA工廠,說明NFA是在運算符内部工作的。另外需要注意的是,如果是普通資料流,其并行度被設定為1,也就是整個資料流沒辦法分區以并行執行,而是作為一個全局資料流參與模式比對。這一點其實不難想象,因為我們在分析模式時,其有事件選擇政策(嚴格緊鄰還是非嚴格緊鄰),也就是說事件前後順序是模式的一部分,那麼這時候如果普通事件流再分區執行,将會打破這種順序,進而導緻比對失效。

通過對PatternStream的解析可知,它其實不同于DataStream

API裡的各種資料流對象,它并不是�DataStream的特例,也不是由轉換函數得來,它隻是對DataStream的二次封裝。

上面我們提及了兩種運算符,但其實并不止這麼多,具體它們的實作以及差别,我們接下來會進行詳細分析。

CEP的運算符實作有兩個考慮因素:是否針對基于鍵分區的資料流以及是否支援對逾時的比對序列進行處理。是以針對這兩個因素的組合将會産生四種運算符的實作,所有運算符相關的類圖如下所示:

AbstractCEPBasePatternOperator為所有的運算符提供基礎模闆,它自身繼承自流運算符(AbstractStreamOperator)并擴充了單輸入流運算符接口(OneInputStreamOperator)。AbstractCEPBasePatternOperator定義了兩對抽象方法,分别是:

(get/update)NFA:(獲得/更新)NFA的執行個體;

(get/update)PriorityQueue:(獲得/更新)優先級隊列;

其實,這兩對方法主要是為了實作基于鍵分組的運算符提供的,因為它們會利用使用者狀态API來擷取并更新NFA執行個體以及優先級隊列(優先級隊列用于緩存事件時間語義時的事件以等待水位線),這一點我們會在下文剖析。借助于這兩對抽象方法,提供了對processElement(定義在OneInputStreamOperator接口中)的實作:

從代碼實作來看,真正執行模式比對的是processEvent方法。AbstractCEPBasePatternOperator有兩個抽象派生類,分别是:

AbstractCEPPatternOperator:普通的CEP模式運算符;

AbstractKeyedCEPPatternOperator:基于鍵分區的CEP模式運算符

由于AbstractCEPPatternOperator相對較為簡單,是以我們先分析它的實作。AbstractCEPPatternOperator在運作時是單執行個體的,因為它的并行度為一,是以它不需要用到使用者狀态API,同時也就不需要實作抽象方法updateNFA以及updatePriorityQueue。它實作了processWatermark方法:

由于AbstractCEPPatternOperator最終繼承自AbstractStreamOperator,是以它還需要實作運算符狀态的快照/恢複方法對。我們可以直接利用運算符狀态快照來儲存相關狀态,這裡主要的狀态就是NFA對象以及優先級隊列。

接下來,我們來分析AbstractKeyedCEPPatternOperator的實作,不同于AbstractCEPPatternOperator所處理的全局事件流。AbstractKeyedCEPPatternOperator所面對的是基于鍵分區的事件流,是以除了NFA對象以及優先級隊列,還有所有使用者的鍵集合需要存儲。且因為是多分區并行執行,那麼NFA對象和優先級隊列也将會在多個分區内并行存在。這時,将不得不使用使用者狀态API,以在内部将這些狀态是跟鍵關聯(内部是KVState):

是以get/updateNFA方法對是為了配合ValueState的value/update方法對。但鍵集合仍然可以使用運算符狀态來儲存。

AbstractKeyedCEPPatternOperator跟AbstractCEPPatternOperator還有一個差別比較大的地方在于對processWatermark方法的實作,在processWatermark内部它會疊代所有的鍵,并使得它們内部符合計算條件(參照水位線)的元素都被計算。

參照我們上面給出的運算符繼承關系圖,到目前為止,我們已經解析了上面兩層運算符。其中,第一層為processElement提供模闆實作,第二層為processWatermark(跟事件時間有關)提供模闆實作以及對運算符邏輯相關的狀态進行維護。而最後一層則才是真正處理事件的模式比對的processEvent方法的實作,該方法由AbstractCEPBasePatternOperator定義。

運算符對processEvent方法的實作,其邏輯基本上都是類似的:調用NFA對象的process方法,逐個處理事件,該方法我們在分析NFA時做過重點剖析。下面我們選擇四個運算符裡最為複雜的基于鍵分區且支援逾時的運算符(TimeoutKeyedCEPPatternOperator)進行分析:

其他運算符對processEvent的實作大同小異,由于篇幅有限,不再贅述。

原文釋出時間為:2017-03-16

本文作者:vinoYang