天天看點

Siddhi CEP Window機制

<a href="https://docs.wso2.com/display/cep400/siddhiql+guide+3.0#siddhiqlguide3.0-window">https://docs.wso2.com/display/cep400/siddhiql+guide+3.0#siddhiqlguide3.0-window</a>

<a href="https://docs.wso2.com/display/cep400/inbuilt+windows#inbuiltwindows">https://docs.wso2.com/display/cep400/inbuilt+windows#inbuiltwindows</a>

<a href="http://wso2.com/library/articles/2013/06/understanding-siddhi-powers-wso2-cep-2x/">http://wso2.com/library/articles/2013/06/understanding-siddhi-powers-wso2-cep-2x/</a>

<a href="https://docs.wso2.com/display/cep400/samples+on+processing+events">https://docs.wso2.com/display/cep400/samples+on+processing+events</a>

windows機制有點晦澀,而且例子給的也不充分,這裡詳細看看。

基本文法:

<code>from</code> <code>&lt;input stream </code><code>name</code><code>&gt;[&lt;filter condition&gt;]#window.&lt;window </code><code>name</code><code>&gt;(&lt;parameter&gt;, &lt;parameter&gt;, ... )</code>

<code>select</code> <code>&lt;attribute </code><code>name</code><code>&gt;, &lt;attribute </code><code>name</code><code>&gt;, ...</code>

<code>insert [current events | expired events | all events] </code><code>into</code> <code>&lt;</code><code>output</code> <code>stream </code><code>name</code><code>&gt;</code>

<code></code>

window.length

直接看個例子,這裡用expired event,但使用的時候往往不用expired

Siddhi CEP Window機制
Siddhi CEP Window機制

簡單解釋下,

define,定義stream,stream中每個event的結構

@info,可選,定義query的名字

query的含義,對于cseeventstream,當price&lt;700時,生成length為4的視窗 

那麼當windows的length超過4的時候,就會産生expired event,此時就會觸發insert操作

insert的内容取決于select

下面我輸入如下的流資料,

Siddhi CEP Window機制
Siddhi CEP Window機制

得到的結果部分如下,

Siddhi CEP Window機制
Siddhi CEP Window機制

解釋下,可以說明幾個問題,

1. window length = 6, 是以當發出第7個event的時,會觸發expired

2. 此時,outputstream就會收到這條expired的event

這個地方很難以了解,得到的event隻是expired的,無法得到window中的所有event,但用aggre func卻可以對window你們的events做統計

這裡我們做了3個統計,平均值,sum, count,這樣你可以看出avg是怎麼算出來的? 

比如,對于event{timestamp=1447906176329, data=[wso2, 0.0, 15.0, 30.0, 2], isexpired=false} 

由于我們加了groupby,是以隻會針對symbol=wso2的做統計, 

當我們發送"wso2", 30.0 時,會觸發"wso2", 0.0的過期,你會發現這時候去統計,這兩條event都會被排除在外,參加統計的如下

"ibm", 0.0 "wso2", 10.0 "ibm", 10.0 "wso2", 20.0 "ibm", 20.0

是以,count為2, sum為30,而avg=15

如果不加groupby的結果如下,

Siddhi CEP Window機制
Siddhi CEP Window機制

這樣就不會管symbol是什麼,會把window裡面的全相加

這裡expired event是可選的,還有current event和all event, 

expired event是當event expired時觸發,那麼current event就是當event達到時觸發,all event就是兩種情況都觸發,

下面我們看看如果換成all event,會是什麼結果,我測的結果是和current event一樣的,隻會在event到達的時候觸發,bug?

Siddhi CEP Window機制
Siddhi CEP Window機制

window.time

這個和length是一樣的,隻是觸發條件是time

Siddhi CEP Window機制
Siddhi CEP Window機制

得到結果如下,

Siddhi CEP Window機制
Siddhi CEP Window機制

可以看到,這裡expire是根據時間的,是以expire不一定是在event來的時候判斷,而是根據scheduled timer,如下圖,

Siddhi CEP Window機制

是以在算統計的時候,取決于當時間timer被觸發時,window裡面有幾個event,是以上面的結果有可能是1,也有可能是2

window.lengthbatch;timebatch

這種window就是非sliding的,直接看例子,

仍然是上面的輸入,得到結果,

Siddhi CEP Window機制
Siddhi CEP Window機制

可以看到,lengthbatch設為4,當window的length達到8的時候,才觸發expired

每次以一個batch進行expire,是以每次收到4條events,并且不重複的,是以window是沒有sliding的

再看過timebatch的例子,這次用 all event

結果如下,我們每發一組會sleep 1s,是以發6組後觸發第一次expired,expire 6條events 

并且可以看到,這次除了expire,在event reach的時候也會觸發output,因為這次我們用的是all event

Siddhi CEP Window機制
Siddhi CEP Window機制

但對于這樣的場景,我們一般的需求是,對于batch做些統計, 例子,

Siddhi CEP Window機制
Siddhi CEP Window機制

得到的結果,

Siddhi CEP Window機制
Siddhi CEP Window機制

可以看到,對于batch中的資料可以groupby,并進行avg統計, 

注意這裡,不要用expired events,否則aggre結果一直為0,因為對于batch,每次expire完後,window裡面是空的。

window.externaltime

<a href="https://docs.wso2.com/display/cep400/sample+0114+-+using+external+time+windows">https://docs.wso2.com/display/cep400/sample+0114+-+using+external+time+windows</a>

這個挺有用,可以以外部的時間進行slide window,因為大部分時間可能是根據采集時間,而非到達時間做聚合

但局限在于,externaltime必須遞增的,有時候在實際場景中,無法保證嚴格的時序。

看例子,

Siddhi CEP Window機制
Siddhi CEP Window機制

發送的代碼如下,

Siddhi CEP Window機制
Siddhi CEP Window機制

目的,就是按外部時間time,進行sliding window,結果如下,

Siddhi CEP Window機制
Siddhi CEP Window機制

可以看到根據傳入的time,當收到"wso2", 30.0, 1447921190000 時觸發3秒的過期 

其他的和普通的sliding window沒有差別

window.cron

<a href="https://docs.wso2.com/display/cep400/sample+0115+-+quartz+scheduler+based+alerts">https://docs.wso2.com/display/cep400/sample+0115+-+quartz+scheduler+based+alerts</a>

定時任務,其實用timebatch也可以實作,隻是cron更友善些

例子,

Siddhi CEP Window機制
Siddhi CEP Window機制

siddhi的文法多了秒,是以第一個是秒,*/4,即每4秒觸發一次

得到結果如下,可以看到确實是每4秒觸發一次

Siddhi CEP Window機制
Siddhi CEP Window機制

window.unique, window.firstunique

功能如其意,直接看例子,

得到結果,從結果看起來,就和普通的流流過一樣, 

因為每次這個symbol有更新都會觸發一次event,

Siddhi CEP Window機制
Siddhi CEP Window機制

再看看first unique,

得到的結果,可以看到隻有symbol第一次出現時,會觸發

Siddhi CEP Window機制
Siddhi CEP Window機制

這個往往和join會同時使用,如

output rate limiting

隻是以在這裡介紹這個,是因為覺得和unique一起用,很合适

基本文法,<code>output</code> <code>({&lt;</code><code>output</code><code>-type&gt;} every (&lt;</code><code>time</code> <code>interval&gt;|&lt;event interval&gt; events) | snapshot every &lt;</code><code>time</code> <code>interval&gt;)</code>

其中"&lt;output-type&gt;","first", "last" and "all",預設是all

比如普通的window,如果每條都觸發,太頻繁了,我隻想固定條數或時間觸發一次就可以 

這個對于unique尤為合适,因為使用unique,一般是隻想知道最新的情況,是以每一條都觸發是沒有意義的,定期觸發就可以

還是用前面的例子,

Siddhi CEP Window機制
Siddhi CEP Window機制

得到的結果,雖然加上group by symbol,是以每次都會分别輸出wso2,ibm兩條 

但是對于event數的判斷還是合一塊的,并不是5條wso2或5條ibm觸發

Siddhi CEP Window機制
Siddhi CEP Window機制

用時間也是一樣的,

Siddhi CEP Window機制
Siddhi CEP Window機制

結果如下,

Siddhi CEP Window機制
Siddhi CEP Window機制

snapshot功能,emit all current events arrived so far,這個一般不會直接這麼用,想不出啥場景

Siddhi CEP Window機制
Siddhi CEP Window機制
Siddhi CEP Window機制
Siddhi CEP Window機制

window.sort

在window中排序,

&lt;event&gt; sort(&lt;int&gt; windowlength, &lt;string&gt; attribute, &lt;string&gt; order, .. , &lt;string&gt; attributen, &lt;string&gt; ordern)

order,"asc" or "desc",預設為asc

Siddhi CEP Window機制
Siddhi CEP Window機制

length為3,對price升序;這裡的意思是,當window length &gt;3時,即4,會輸出按price升序排序,最大的那個event

Siddhi CEP Window機制
Siddhi CEP Window機制

可以看到,大于3的時候,current event和expired event收到的都是一樣的,因為是asc排序,是以大于前3個的都會被過期

window.frequent;window.lossyfrequent

說實在的,如果對這個算法不了解,相當的晦澀,

frequent的意思,就是你接收current events,如果目前stream的event,是屬于top frequent的,就會輸出,否則就會丢掉 

說白了,從current events,你可以一直重複的收到屬于top frequent的event,其他的則會丢掉

輸入如下,

Siddhi CEP Window機制
Siddhi CEP Window機制

得到結果,來分析一下,

Siddhi CEP Window機制
Siddhi CEP Window機制

前面一直都沒有問題,一直輸入attributes,to, 

直到輸入events.,因為attributes,to已經占滿2個位置,是以要觸發過期,window裡面的所有event的frequency減1,過期frequency=0的event 

可是這裡attributes,to的frequent都是大于0的,是以window裡面沒有可以expire的event, 

那麼隻能把目前的events.給丢掉了,是以在current events中并沒有收到這個event,‘events.’ 

因為我們隻能收到top frequent的events

到收到if,再次觸發expire,window裡面的所有event的frequency再次減1, 

此時,attributes的frequency已經為0,是以attribute被過期,而event,‘if’,被放入window中, 

是以此時,我們會在current events中看到‘if’,而在expired events中看到‘attributes’

沒測,應該是判斷過期的算法不一樣,其他差不多

本文章摘自部落格園,原文釋出日期:2015-11-24