天天看點

Flink - Working with State

all transformations in flink may look like functions (in the functional processing terminology), but are in fact stateful operators. 

you can make every transformation (<code>map</code>, <code>filter</code>, etc) stateful by using flink’s state interface or checkpointing instance fields of your function. 

you can register any instance field as managed state by implementing an interface. 

in this case, and also in the case of using flink’s native state interface, flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

讨論如何使用flink的state接口來管理狀态資料,對于這些狀态資料,flink會自動的定期做snapshots,并且當failure後,會自動restore這些狀态

state主要可以分為兩種,<code>keyed state</code> and <code>operator state</code>

keyed state is always relative to keys and can only be used in functions and operators on a <code>keyedstream</code>.

you can think of keyed state as operator state that has been partitioned, or sharded, with exactly one state-partition per key. each keyed-state is logically bound to a unique composite of &lt;parallel-operator-instance, key&gt;, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as &lt;operator, key&gt;.

keyed state is further organized into so-called key groups. key groups are the atomic unit by which flink can redistribute keyed state; there are exactly as many key groups as the defined maximum parallelism. during execution each parallel instance of a keyed operator works with the keys for one or more key groups.

keyed state 隻能用于<code>keyedstream</code>

keyed state會以key做partitioned, or sharded,每個keyed-state都會關聯一個parallel-operator-instance

keyed state的問題在于,在并發度增加時,需要把keyed state切分開

為了便于keyed state的遷移和管理,實作key groups,這是flink redistribute的最小機關

with operator state (or non-keyed state), each operator state is bound to one parallel operator instance. the kafka source connector is a good motivating example for the use of operator state in flink. each parallel instance of this kafka consumer maintains a map of topic partitions and offsets as its operator state.

the operator state interfaces support redistributing state among parallel operator instances when the parallelism is changed. there can be different schemes for doing this redistribution; the following are currently defined:

list-style redistribution: each operator returns a list of state elements. the whole state is logically a concatenation of all lists. on restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. each operator gets a sublist, which can be empty, or contain one or more elements.

operator state就是non-keyed state,比如kafka source connector,這種operator state在做redistribution,比較簡單

keyed state and operator state exist in two forms: managed and raw.

managed state is represented in data structures controlled by the flink runtime, such as internal hash tables, or rocksdb. examples are “valuestate”, “liststate”, etc. flink’s runtime encodes the states and writes them into the checkpoints.

raw state is state that operators keep in their own data structures. when checkpointed, they only write a sequence of bytes into the checkpoint. flink knows nothing about the state’s data structures and sees only the raw bytes.

all datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. using managed state (rather than raw state) is recommended, since with managed state flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management.

raw是沒有被flink管理的狀态

the key/value state interface provides access to different types of state that are all scoped to the key of the current input element. 

this means that this type of state can only be used on a <code>keyedstream</code>, which can be created via <code>stream.keyby(…)</code>.

key/value state 隻能用于<code>keyedstream</code>

<code></code>

now, we will first look at the different types of state available and then we will see how they can be used in a program. the available state primitives are:

<code>valuestate&lt;t&gt;</code>: this keeps a value that can be updated and retrieved (scoped to key of the input element, mentioned above, so there will possibly be one value for each key that the operation sees). the value can be set using <code>update(t)</code>and retrieved using <code>t value()</code>.

<code>liststate&lt;t&gt;</code>: this keeps a list of elements. you can append elements and retrieve an <code>iterable</code> over all currently stored elements. elements are added using <code>add(t)</code>, the iterable can be retrieved using <code>iterable&lt;t&gt; get()</code>.

<code>reducingstate&lt;t&gt;</code>: this keeps a single value that represents the aggregation of all values added to the state. the interface is the same as for <code>liststate</code> but elements added using <code>add(t)</code> are reduced to an aggregate using a specified<code>reducefunction</code>.

all types of state also have a method <code>clear()</code> that clears the state for the currently active key (i.e. the key of the input element).

3種不同類型的state,

<code>valuestate</code>,單值的state,可以通過<code>update(t)</code>或<code>t value()</code>來操作

<code>liststate&lt;t&gt;</code>, 多隻的state,通過<code>add(t)或<code>iterable&lt;t&gt; get()來操作和通路</code></code>

<code>reducingstate&lt;t&gt;</code>,多值狀态,但是隻保留reduce的結果

并且所有的state,都有clear,來清除狀态資料

it is important to keep in mind that these state objects are only used for interfacing with state. the state is not necessarily stored inside but might reside on disk or somewhere else. 

the second thing to keep in mind is that the value you get from the state depend on the key of the input element. 

so the value you get in one invocation of your user function can be different from the one you get in another invocation if the key of the element is different.

這些state對象隻能被狀态接口使用, 

并且取出的狀态對象,取決于input element的key;是以不同的調用user function 得到的state value是不一樣的,因為element的key 可能不同

to get a state handle you have to create a <code>statedescriptor</code> this holds the name of the state (as we will later see you can create several states, and they have to have unique names so that you can reference them), the type of the values that the state holds and possibly a user-specified function, such as a <code>reducefunction</code>. depending on what type of state you want to retrieve you create one of <code>valuestatedescriptor</code>, <code>liststatedescriptor</code> or <code>reducingstatedescriptor</code>.

對于state,需要一個<code>statedescriptor</code> ,作為name用于reference這個state,如果你定義多個state,他們的<code>statedescriptor</code> 必須是unique的。 

不同類型的state,有不同類型的<code>statedescriptor</code>

state is accessed using the <code>runtimecontext</code>, so it is only possible in rich functions. 

the <code>runtimecontext</code> that is available in a <code>richfunction</code> has these methods for accessing state:

<code>valuestate&lt;t&gt; getstate(valuestatedescriptor&lt;t&gt;)</code>

<code>reducingstate&lt;t&gt; getreducingstate(reducingstatedescriptor&lt;t&gt;)</code>

<code>liststate&lt;t&gt; getliststate(liststatedescriptor&lt;t&gt;)</code>

state對象通過<code>runtimecontext</code>的接口擷取到,當然不同類型的state,對應于不同的接口; 

關鍵是,如果要使用state,必須要使用rich function,用普通的function是無法擷取到的

this is an example <code>flatmapfunction</code> that shows how all of the parts fit together:

Flink - Working with State
Flink - Working with State

a stateful function can implement either the more general <code>checkpointedfunction</code> interface, or the <code>listcheckpointed&lt;t extends serializable&gt;</code> interface.

in both cases, the non-keyed state is expected to be a <code>list</code> of serializable objects, independent from each other, thus eligible for redistribution upon rescaling. in other words, these objects are the finest granularity at which non-keyed state can be repartitioned. as an example, if with parallelism 1 the checkpointed state of the <code>bufferingsink</code> contains elements<code>(test1, 2)</code> and <code>(test2, 2)</code>, when increasing the parallelism to 2, <code>(test1, 2)</code> may end up in task 0, while <code>(test2, 2)</code> will go to task 1.

operator state,即non-keyed state , 被表示為serializable 對象清單,這些對象間是無關的,是以在變更parallelism 時,隻需要簡單的repartitioned

可以通過實作<code>listcheckpointed&lt;t extends serializable&gt;</code>或<code>checkpointedfunction</code>接口,來實作對operator state的管理

the <code>listcheckpointed</code> interface requires the implementation of two methods:

on <code>snapshotstate()</code> the operator should return a list of objects to checkpoint and <code>restorestate</code> has to handle such a list upon recovery. 

if the state is not re-partitionable, you can always return a <code>collections.singletonlist(my_state)</code> in the<code>snapshotstate()</code>.

collections.singletonlist表示不可變清單

the <code>checkpointedfunction</code> interface also requires the implementation of two methods:

whenever a checkpoint has to be performed <code>snapshotstate()</code> is called. the counterpart, <code>initializestate()</code>, is called every time the user-defined function is initialized, be that when the function is first initialized or be that when actually recovering from an earlier checkpoint. given this, <code>initializestate()</code> is not only the place where different types of state are initialized, but also where state recovery logic is included.

this is an example of a function that uses <code>checkpointedfunction</code>, a stateful <code>sinkfunction</code> that uses state to buffer elements before sending them to the outside world:

給個例子,stateful sinkfunction,在發送前先cache,

Flink - Working with State
Flink - Working with State

stateful sources require a bit more care as opposed to other operators. 

in order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.

對于有狀态的source,有些不一樣的是,在更新state和output時,注意要加鎖來保證exactly-once,比如避免多個線程同時更新offset

Flink - Working with State
Flink - Working with State

<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_backends.html">https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_backends.html</a>

windows gather elements or aggregates until they are triggered

transformation functions may use the key/value state interface to store values

transformation functions may implement the <code>checkpointed</code> interface to make their local variables fault tolerant

主要的state,包含幾種,

windows裡面gather的elements 

transformation functions中用key/value state interface建立的state 

transformation functions 中通過<code>checkpointed</code> interface 去對local variables做的state

when checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently. 

how the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen state backend.

關鍵,state如何和存到何處,還是看具體用什麼state backend

out of the box, flink bundles these state backends:

memorystatebacked

fsstatebackend

rocksdbstatebackend

if nothing else is configured, the system will use the memorystatebacked.

目前有3種state backends,預設的是用memorystatebacked

the memorystatebackend

the memorystatebacked holds data internally as objects on the java heap. key/value state and window operators hold hash tables that store the values, triggers, etc.

upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the jobmanager (master), which stores it on its heap as well.

memorystatebackend顧名思義,就是state是存儲在java heap中的;在做checkpoints的時候,state backend 會将state snapshot放入 checkpoint acknowledgement messages 發給jobmanager,jobmanager 仍然是将它存在heap中。

the fsstatebackend

the fsstatebackend is configured with a file system url (type, address, path), such as for example “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.

the fsstatebackend holds in-flight data in the taskmanager’s memory. upon checkpointing, it writes state snapshots into files in the configured file system and directory. 

minimal metadata is stored in the jobmanager’s memory (or, in high-availability mode, in the metadata checkpoint).

state snapshot資料是存在檔案系統中的,而jobmanager的記憶體中,隻是存放最小的中繼資料

the rocksdbstatebackend

隻是用rocksdb來替換檔案系統,

note: to use the rocksdbstatebackend you also have to add the correct maven dependency to your project:

state backends can be configured per job. in addition, you can define a default state backend to be used when the job does not explicitly define a state backend.

setting the per-job state backend

the per-job state backend is set on the <code>streamexecutionenvironment</code> of the job, as shown in the example below:

a default state backend can be configured in the <code>flink-conf.yaml</code>, using the configuration key <code>state.backend</code>.

in the case where the default state backend is set to filesystem, the entry <code>state.backend.fs.checkpointdir</code> defines the directory where the checkpoint data will be stored.

a sample section in the configuration file could look as follows: