文章目錄
- 1.算子狀态概述
- 1.1 算子狀态分類
- 1.2 狀态分析
- 1.3 CheckpointedFunction 接口
- 2.算子狀态 程式設計案例
- 2.1 清單狀态案例
- 2.2 廣播機制 案例
- 3.狀态持久化和狀态後端
- 3.1 檢查點(Checkpoint)
- 3.2 狀态後端 state Backends
- 4.狀态程式設計總結
1.算子狀态概述
1.1 算子狀态分類
-
算子狀态:
清單狀态, 聯合清單狀态, 廣播狀态
ListState, UnionListState, BroadcastState
1.2 狀态分析
- 清單狀态: 與 Keyed State 中的 ListState 一樣,将狀态表示為一組資料的清單。與 Keyed State 中的清單狀态的差別是:在算子狀态的上下文中,不會按鍵(key)分别處理狀态,是以每一個并行子任務上隻會保留一個“清單”(list),也就是目前并行子任務上所有狀态項的集合。清單中的狀态項就是可以重新配置設定的最細粒度,彼此之間完全獨立。
- 聯合清單狀态: 與 ListState 類似,聯合清單狀态也會将狀态表示為一個清單。它與正常清單狀态的差別在于,算子并行度進行縮放調整時對于狀态的配置設定方式不同。UnionListState 的重點就在于“聯合”(union)。在并行度調整時,正常清單狀态是輪詢配置設定狀态項,而聯合清單狀态的算子則會直接廣播狀态的完整清單。這樣,并行度縮放之後的并行子任務就擷取到了聯合後完整的“大清單”,可以自行選擇要使用的狀态項和要丢棄的狀态項。這種配置設定也叫作“聯合重組”(union redistribution)。如果清單中狀态項數量太多,為資源和效率考慮一般不建議使用聯合重組的方式。
- 廣播狀态: 狀态就像被“廣播”到所有分區一樣,這種特殊的算子狀态,就叫作廣播狀态(BroadcastState)。廣播狀态是以類似映射結構(map)的鍵值對(key-value)來儲存的
1.3 CheckpointedFunction 接口
在 Flink 中,對狀态進行持久化儲存的快照機制叫作“檢查點”(Checkpoint)。于是使用算子狀态時,就需要對檢查點的相關操作進行定義,實作一個 CheckpointedFunction 接口。
public interface CheckpointedFunction {
// 儲存狀态快照到檢查點時,調用這個方法
void snapshotState(FunctionSnapshotContext context) throws Exception
// 初始化狀态時調用這個方法,也會在恢複狀态時調用
void initializeState(FunctionInitializationContext context) throws Exception;
}
每次應用儲存檢查點做快照時,都會調用.snapshotState()方法,将狀态進行外部持久化。
而在算子任務進行初始化時,會調用. initializeState()方法。
2.算子狀态 程式設計案例
2.1 清單狀态案例
自定義的 SinkFunction 會在CheckpointedFunction 中進行資料緩存,然後統一發送到下遊。這個例子示範了清單狀态的平均分割重組(event-split redistribution)。
緩存機制
public class BufferingSinkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.addSink(new BufferingSink(10));
stream.print("input");
environment.execute();
}
public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction{
private final int threshold;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElement = new ArrayList<>();
}
private List<Event> bufferedElement;
private ListState<Event> listState;
@Override
public void invoke(Event value, Context context) throws Exception {
bufferedElement.add(value);
if (bufferedElement.size() == threshold){
for (Event event : bufferedElement) {
System.out.println(event);
}
bufferedElement.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
// 對狀态進行持久化, 複制緩存的清單到 清單狀态
listState.clear();
for (Event event : bufferedElement) {
listState.add(event);
}
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
// 定義算子狀态
ListStateDescriptor<Event> listStateDescriptor = new ListStateDescriptor<>("buffered-elements", Event.class);
listState = functionInitializationContext.getOperatorStateStore().getListState(listStateDescriptor);
// 如果有故障, 需要将清單狀态中元素 複制到 清單
if (functionInitializationContext.isRestored()){
for (Event event : listState.get()) {
bufferedElement.add(event);
}
}
}
}
}
2.2 廣播機制 案例
考慮在電商應用中,往往需要判斷使用者先後發生的行為的“組合模式”,比如“登入-下單”或者“登入-支付”,檢測出這些連續的行為進行統
計,就可以了解平台的運用狀況以及使用者的行為習慣。
public class BehaviorPatternDetectExample {
// 使用者行為模式
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// 1.行為的資料流
DataStream<Action> actionStream = environment.fromElements(
new Action("Alice", "login"),
new Action("Alice", "pay"),
new Action("Bob", "login"),
new Action("Bob", "order")
);
DataStream<Pattern> patternStream = environment.fromElements(
new Pattern("login", "pay"),
new Pattern("login", "order")
);
// 定義廣播狀态描述器
MapStateDescriptor<Void, Pattern> stateDescriptor = new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class));
BroadcastStream<Pattern> broadcast = patternStream.broadcast(stateDescriptor);
// 連接配接兩條流 處理
actionStream.keyBy(data -> data.userId)
.connect(broadcast)
.process(new PatternDetector())
.print();
environment.execute();
// 2.行為的模式流, 廣播流
}
private static class Action{
public String userId;
public String action;
public Action() {
}
public Action(String userId, String action) {
this.userId = userId;
this.action = action;
}
@Override
public String toString() {
return "Action{" +
"userId='" + userId + '\'' +
", action='" + action + '\'' +
'}';
}
}
public static class Pattern{
public String active1;
public String active2;
public Pattern() {
}
public Pattern(String active1, String active2) {
this.active1 = active1;
this.active2 = active2;
}
@Override
public String toString() {
return "Pattern{" +
"active1='" + active1 + '\'' +
", active2='" + active2 + '\'' +
'}';
}
}
public static class PatternDetector extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>{
// 定義一個KeyedState, 儲存上一次使用者的行為
ValueState<String> preActionState;
@Override
public void open(Configuration parameters) throws Exception {
preActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-action", String.class));
}
@Override
public void processElement(Action action, ReadOnlyContext readOnlyContext, Collector<Tuple2<String, Pattern>> collector) throws Exception {
// 判斷規則
// 從廣播狀态拿取
ReadOnlyBroadcastState<Void, Pattern> patternState = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
// 規則
Pattern pattern = patternState.get(null);
// 上一次行為
String preAction = preActionState.value();
// 判斷是否比對
if (pattern != null && preAction != null){
if (pattern.active1.equals(preAction) && pattern.active2.equals(action.action)){
collector.collect(new Tuple2<>(readOnlyContext.getCurrentKey(), pattern));
}
}
preActionState.update(action.action);
}
@Override
public void processBroadcastElement(Pattern pattern, Context context, Collector<Tuple2<String, Pattern>> collector) throws Exception {
BroadcastState<Void, Pattern> patternState = context.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
patternState.put(null, pattern);
}
}
}
3.狀态持久化和狀态後端
在 Flink 的狀态管理機制中,很重要的一個功能就是對狀态進行持久化(persistence)儲存,這樣就可以在發生故障後進行重新開機恢複。Flink 對狀态進行持久化的方式,就是将目前所有分布式狀态進行“快照”儲存,寫入一個“檢查點”(checkpoint)或者儲存點(savepoint)
儲存到外部存儲系統中。具體的存儲媒體,一般是分布式檔案系統(distributed file system)。
3.1 檢查點(Checkpoint)
有狀态流應用中的檢查點(checkpoint),其實就是所有任務的狀态在某個時間點的一個快照(一份拷貝)。簡單來講,就是一次“存盤”,讓我們之前處理資料的進度不要丢掉。在一個流應用程式運作時,Flink 會定期儲存檢查點,在檢查點中會記錄每個算子的 id 和狀态;如果發生故障,Flink 就會用最近一次成功儲存的檢查點來恢複應用的狀态,重新啟動處理流程,就如同“讀檔”一樣。
預設情況下,檢查點是被禁用的.
environment.enableCheckpointing(10000L);
3.2 狀态後端 state Backends
檢查點的儲存離不開 JobManager 和 TaskManager,以及外部存儲系統的協調。在應用進行檢查點儲存時,首先會由 JobManager 向所有 TaskManager 發出觸發檢查點的指令;TaskManger 收到之後,将目前任務的所有狀态進行快照儲存,持久化到遠端的存儲媒體中;
完成之後向 JobManager 傳回确認資訊。這個過程是分布式的,當 JobManger 收到所有TaskManager 的傳回資訊後,就會确認目前檢查點成功儲存
上圖為檢查點的儲存的圖
Flink 中,狀态的存儲、通路以及維護,都是由一個可插拔的元件決定的,這個元件就叫作狀态後端(state backend)。狀态後端主要負責兩件事:一是本地的狀态管理,二是将檢查點(checkpoint)寫入遠端的持久化存儲。
哈希表狀态後端(HashMapStateBackend)
把狀态存放在記憶體裡
environment.setStateBackend(new HashMapStateBackend());
内嵌 RocksDB 狀态後端(EmbeddedRocksDBStateBackend)
RocksDB 是一種内嵌的 key-value 存儲媒體,可以把資料持久化到本地硬碟。
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}artifactId>
<version>1.13.0version>
dependency>
environment.setStateBackend(new EmbeddedRocksDBStateBackend());
如何選擇正确的狀态後端
HashMap 和 RocksDB 兩種狀态後端最大的差別,就在于本地狀态存放在哪裡:前者是記憶體,後者是 RocksDB。在實際應用中,選擇那種狀态後端,主要是需要根據業務需求在處理性能和應用的擴充性上做一個選擇。
HashMapStateBackend 是記憶體計算,讀寫速度非常快;但是,狀态的大小會受到叢集可用記憶體的限制,如果應用的狀态随着時間不停地增長,就會耗盡記憶體資源。
而 RocksDB 是硬碟存儲,是以可以根據可用的磁盤空間進行擴充,而且是唯一支援增量檢查點的狀态後端,是以它非常适合于超級海量狀态的存儲。不過由于每個狀态的讀寫都需要做序列化/反序列化,而且可能需要直接從磁盤讀取資料,這就會導緻性能的降低,平均讀寫
性能要比 HashMapStateBackend 慢一個數量級。
4.狀态程式設計總結
有狀态的流處理是 Flink 的本質,是以狀态可以說是 Flink 中最為重要的概念。之前聚合算子、視窗算子中已經提到了狀态的概念。