the following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows.


flink應用的代碼結構如下,
flink datastream programs look like regular java programs with a <code>main()</code> method. each program consists of the same basic parts:
obtaining a <code>streamexecutionenvironment</code>,
connecting to data stream sources,
specifying transformations on the data streams,
specifying output for the processed data,
executing the program.
以這個例子,說明
首先會建立sockettextstream,從socket讀入text流
接着是個flatmap,和map的不同是,map,1->1,而flatmap為1->n,而這個splitter就是将text用“”分割,将每個word作為一個tuple輸出
最後,keyby産生一個有key的tuple流,這裡是以word為key
基于5s的timewindow,對後面的計數進行sum
最終,output是print
太常用的就不列了
==============================================================================================
reduce
keyedstream → datastream
a "rolling" reduce on a keyed data stream. combines the current element with the last reduced value and emits the new value.


fold
a "rolling" fold on a keyed data stream with an initial value. combines the current element with the last folded value and emits the new value.
a fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...


fold和reduce的差別,fold可以有個初始值,而且foldfunciton可以将一種類型fold到另一種類型
而reduce function,隻能是一種類型
aggregations
rolling aggregations on a keyed data stream.
the difference between min and minby is that min returns the minimun value, whereas minby returns the element that has the minimum value in this field (same for max and maxby).


可以認為是特殊的reduce
不帶by,隻是傳回value
帶by,傳回整個element
=============================================================================================
union
datastream* → datastream
union of two or more data streams creating a new stream containing all the elements from all the streams. node: if you union a data stream with itself you will get each element twice in the resulting stream.
connect
datastream,datastream → connectedstreams
"connects" two data streams retaining their types. connect allowing for shared state between the two streams.
connect就是兩個不同type的流可以共享一個流,tuple可以同時拿到來自兩個流的資料
comap, coflatmap
connectedstreams → datastream
similar to map and flatmap on a connected data stream


split
datastream → splitstream
split the stream into two or more streams according to some criterion.


select
splitstream → datastream
select one or more streams from a split stream.
====================================================================================
project
datastream → datastream
selects a subset of fields from the tuples
===========================================================================================
window
keyedstream → windowedstream
基于keyedstream的window
windowall
datastream → allwindowedstream
warning: this is in many cases a non-parallel transformation. all records will be gathered in one task for the windowall operator.
主要,由于沒有key,是以如果要對all做transform,是無法parallel的,隻能在一個task裡面做
window apply
windowedstream → datastream
allwindowedstream → datastream
applies a general function to the window as a whole. below is a function that manually sums the elements of a window.
note: if you are using a windowall transformation, you need to use an allwindowfunction instead.
window reduce
windowedstream → datastream
applies a functional reduce function to the window and returns the reduced value.
aggregations on windows
aggregates the contents of a window. the difference between min and minby is that min returns the minimun value, whereas minby returns the element that has the minimum value in this field (same for max and maxby).
window join
datastream,datastream → datastream
join two data streams on a given key and a common window.
類似storm的group方式,可以自己配置
hash partitioning, 等同于 groupby field
identical to keyby but returns a datastream instead of a keyedstream.
custom partitioning
uses a user-defined partitioner to select the target task for each element.
random partitioning,等同于shuffle
partitions elements randomly according to a uniform distribution.
rebalancing (round-robin partitioning)
partitions elements round-robin, creating equal load per partition. useful for performance optimization in the presence of data skew.
這個保證資料不會skew,round-robin就是每個一條,輪流來
broadcasting,等同于globle
broadcasts elements to every partition.
chaining two subsequent transformations means co-locating them within the same thread for better performance.
flink by default chains operators if this is possible (e.g., two subsequent map transformations).
the api gives fine-grained control over chaining if desired:
start new chain
begin a new chain, starting with this operator. the two mappers will be chained, and filter will not be chained to the first mapper.
注意startnewchain是應用于,左邊的那個operator,是以上面從第一個map開始start new chain
disable chaining
do not chain the map operator
start a new resource group
start a new resource group containing the map and the subsequent operators.
isolate resources
isolate the operator in its own slot.
使用獨立的slot
隻有下面兩個和batch的配置不同,
parameters in the <code>executionconfig</code> that pertain specifically to the datastream api are:
<code>enabletimestamps()</code> / <code>disabletimestamps()</code>: attach a timestamp to each event emitted from a source.<code>aretimestampsenabled()</code>returns the current value.
<code>setautowatermarkinterval(long milliseconds)</code>: set the interval for automatic watermark emission. you can get the current value with<code>long getautowatermarkinterval()</code>
a localenvironment is created and used as follows:
collection data sources can be used as follows:


flink also provides a sink to collect datastream results for testing and debugging purposes. it can be used as follows:
working with time
3種時間,
processing time,真正的處理時間
event time, 事件真正發生的時間
ingestion time,資料進入flink時間,在data source
預設是用processing 時間,
如果要用event time,you need to follow four steps:
set <code>env.setstreamtimecharacteristic(timecharacteristic.eventtime)</code>
use <code>datastream.assigntimestamps(...)</code> in order to tell flink how timestamps relate to events (e.g., which record field is the timestamp)
set <code>enabletimestamps()</code>, as well the interval for watermark emission (<code>setautowatermarkinterval(long milliseconds)</code>) in<code>executionconfig</code>.
for example, assume that we have a data stream of tuples, in which the first field is the timestamp (assigned by the system that generates these data streams), and we know that the lag between the current processing time and the timestamp of an event is never more than 1 second:


basic window constructs
tumbling time window,非滑動
defines a window of 5 seconds, that "tumbles".
sliding time window,滑動
defines a window of 5 seconds, that "slides" by 1 seconds.
tumbling count window
sliding count window
advanced window constructs
the general recipe for building a custom window is to specify (1) a <code>windowassigner</code>, (2) a <code>trigger</code> (optionally), and (3) an<code>evictor</code> (optionally).
上面的如timewindow,是封裝好的,而如果用advanced建構方式,需要3步,
1. 首先是<code>windowassigner</code>,主要是滑動和非滑動兩類,解決主要的是where的問題
global window
all incoming elements of a given key are assigned to the same window. the window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.
用于count window
tumbling time windows
預設的trigger,
先了解watermark的含義:當我收到一個watermark時,表示我不可能收到event time 小于該water mark的資料
是以我收到的water mark都大于我window的結束時間,說明,window的資料已經到齊了,可以觸發trigger
sliding time windows
預設的trigger與上同,
2. 第二步,是定義trigger,何時觸發,解決的是when的問題
the <code>trigger</code> specifies when the function that comes after the window clause (e.g., <code>sum</code>, <code>count</code>) is evaluated (“fires”) for each window.
if a trigger is not specified, a default trigger for each window type is used (that is part of the definition of the<code>windowassigner</code>).
processing time trigger
a window is fired when the current processing time exceeds its end-value. the elements on the triggered window are henceforth discarded.
watermark trigger
a window is fired when a watermark with value that exceeds the window's end-value has been received. the elements on the triggered window are henceforth discarded.
continuous processing time trigger
a window is periodically considered for being fired (every 5 seconds in the example). the window is actually fired only when the current processing time exceeds its end-value. the elements on the triggered window are retained.
continuous watermark time trigger
a window is periodically considered for being fired (every 5 seconds in the example). a window is actually fired when a watermark with value that exceeds the window's end-value has been received. the elements on the triggered window are retained.
這個和上面的不同,在于,window在觸發後,不會被discard,而是會保留,并且每隔一段時間會反複的觸發
count trigger
a window is fired when it has more than a certain number of elements (1000 below). the elements of the triggered window are retained.
按count觸發,window會被保留
purging trigger
takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.
上面有些trigger是會retain資料的,如果你想discard,怎麼搞? 用purgingtrigger
delta trigger
a window is periodically considered for being fired (every 5000 milliseconds in the example). a window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `deltafunction`.
delta trigger,即,每次會通過getdelta比較新來的值和舊值的delta,當delta大于定義的門檻值時,就會fire
3. 最後,指定<code>evictor</code>
after the trigger fires, and before the function (e.g., <code>sum</code>, <code>count</code>) is applied to the window contents, an optional <code>evictor</code>removes some elements from the beginning of the window before the remaining elements are passed on to the function.
說白了,當windows被觸發時,我們可以選取部分資料進行處理,
evictor,清除者,即清除部分資料,保留你想要的
time evictor
evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).
count evictor
retain 1000 elements from the end of the window backwards, evicting all others.
邏輯是保留,而不是清除,比如countevictor.of(1000)是保留最後1000個,有點不好了解
delta evictor
starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a deltafunction).
recipes for building windows
下面給出一些window定義的例子,了解一下,例子給的太簡單
windows on unkeyed data streams
window,也可以用于unkeyed的資料流,
不同,是在window後面加上all,
tumbling time window all
datastream → windowedstream
defines a window of 5 seconds, that "tumbles". this means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. the notion of time used is controlled by the streamexecutionenvironment.
sliding time window all
defines a window of 5 seconds, that "slides" by 1 seconds. this means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) the notion of time used is controlled by the streamexecutionenvironment.
tumbling count window all
defines a window of 1000 elements, that "tumbles". this means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
sliding count window all
defines a window of 1000 elements, that "slides" every 100 elements. this means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).
all transformations in flink may look like functions (in the functional processing terminology), but are in fact stateful operators.
you can make everytransformation (<code>map</code>, <code>filter</code>, etc) stateful by declaring local variables or using flink’s state interface.
you can register any local variable as managedstate by implementing an interface.
in this case, and also in the case of using flink’s native state interface, flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.
the end effect is that updates to any form of state are the same under failure-free execution and execution under failures.
first, we look at how to make local variables consistent under failures, and then we look at flink’s state interface.
by default state checkpoints will be stored in-memory at the jobmanager. for proper persistence of large state, flink supports storing the checkpoints on file systems (hdfs, s3, or any mounted posix file system), which can be configured in the <code>flink-conf.yaml</code> or via<code>streamexecutionenvironment.setstatebackend(…)</code>.
這塊是flink流式處理的核心價值,可以友善的checkpoint的local state,有幾種方式,後面會具體說;
預設情況下,這些checkpoints 是存儲在jobmanager的記憶體中的,當然也可以配置checkpoint到檔案系統
checkpointing local variables
這個比較好了解
local variables can be checkpointed by using the <code>checkpointed</code> interface.
when the user-defined function implements the <code>checkpointed</code> interface, the <code>snapshotstate(…)</code> and <code>restorestate(…)</code> methods will be executed to draw and restore function state.


如上,隻是實作snapshotstate和restorestate,就可以對local變量counter實作checkpoint,這個很好了解
n addition to that, user functions can also implement the <code>checkpointnotifier</code> interface to receive notifications on completed checkpoints via the<code>notifycheckpointcomplete(long checkpointid)</code> method. note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. the notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.、
除此,還能實作<code>checkpointnotifier</code> ,這樣當完成checkpoints時,會調用<code>notifycheckpointcomplete,但不能保證一定觸發</code>
using the key/value state interface
這個是顯式調用state interface
the state interface gives access to key/value states, which are a collection of key/value pairs.
because the state is partitioned by the keys (distributed accross workers), it can only be used on the <code>keyedstream</code>, created via <code>stream.keyby(…)</code> (which means also that it is usable in all types of functions on keyed windows).
the handle to the state can be obtained from the function’s <code>runtimecontext</code>.
the state handle will then give access to the value mapped under the key of the current record or window - each key consequently has its own value.
the following code sample shows how to use the key/value state inside a reduce function.
when creating the state handle, one needs to supply a name for that state (a function can have multiple states of different types), the type of the state (used to create efficient serializers), and the default value (returned as a value for keys that do not yet have a value associated).


state updated by this is usually kept locally inside the flink process (unless one configures explicitly an external state backend). this means that lookups and updates are process local and this very fast.
the important implication of having the keys set implicitly is that it forces programs to group the stream by key (via the<code>keyby()</code> function), making the key partitioning transparent to flink. that allows the system to efficiently restore and redistribute keys and state.
the scala api has shortcuts that for stateful <code>map()</code> or <code>flatmap()</code> functions on <code>keyedstream</code>, which give the state of the current key as an option directly into the function, and return the result with a state update:


state checkpoints in iterative jobs
flink currently only provides processing guarantees for jobs without iterations. enabling checkpointing on an iterative job causes an exception. in order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:<code>env.enablecheckpointing(interval, force = true)</code>.
please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
對于iterative,即有環的case,做checkpoint更加複雜點,并且恢複後,會丢失中間過程,比如n次疊代,執行到n-1次,失敗,還是要從1開始
for example, here is program that continuously subtracts 1 from a series of integers until they reach zero:


這個直接看例子,
首先,someintegers是一個由0到1000的datastream
對于每個tuple,都需要疊代的執行一個map function,在這兒,會不斷減一
什麼時候結束,
根據iteration.closewith,closewith後面是一個filter,如果filter傳回為true,這個tuple就繼續iterate,如果傳回為false,就close iterate
而最後的lessthanzero是someintegers經過iterate後,最終産生的輸出datastream
connectors provide code for interfacing with various third-party systems.
currently these systems are supported:
隻看下kafka,
then, import the connector in your maven project:


如何fault tolerance?
with flink’s checkpointing enabled, the flink kafka consumer will consume records from a topic and periodically checkpoint all its kafka offsets, together with the state of other operations, in a consistent manner. in case of a job failure, flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from kafka, starting from the offsets that where stored in the checkpoint.
原理就是會和其他state一起把所有的kafka partition的offset都checkpoint下來,這樣恢複的時候,可以從這些offset開始讀;
to use fault tolerant kafka consumers, checkpointing of the topology needs to be enabled at the execution environment:
if checkpointing is not enabled, the kafka consumer will periodically commit the offsets to zookeeper.
由于用的是simple consumer,是以就算不開checkpoint,offset也要被記錄;這裡使用通常的做法把kafka的offset記錄到zookeeper
也可以把資料寫入kafka,<code>flinkkafkaproducer</code>
the <code>flinkkafkaproducer</code> writes data to a kafka topic. the producer can specify a custom partitioner that assigns recors to partitions.