State & Fault Tolerance
这里写目录标题
- State & Fault Tolerance
-
- Managed Keyed State
-
-
- ValueState
- ListState
- MapState
- ReducingState
- AggregatingState
- FoldingState
-
- State Time-To-Live (TTL) (状态的生存时间)
-
-
-
- 如何开启使用
-
- `提醒:`
- Cleanup Of Expired State (清除过期状态)
-
- Checkpoint & Savepoint 检查点和保存点
- State Backend 状态后端
-
-
- `注意呦:`
- MemoryStateBackend(jobmanager) 内存状态后端
- FsStateBackend(filesystem)
- RocksDBStateBackend(rocksdb)
-
- Managed Operator State 管理运算符状态
-
- CheckpointedFunction
- ListCheckpointed 检查点列表
- Broadcast State Pattern 广播状态模式
-
-
-
- DataStream链接 BroadcastStream
- KeyedStream链接 BroadcastStream
-
-
- Queryable State 可查询状态
-
-
- Architecture 架构
-
- 激活 Queryable State
- Making State Queryable 使状态可查询
- Queryable State Stream 可查询状态流
- Managed Keyed State
- Querying State 查询状态
-
-
Flink是⼀个基于状态计算的流计算服务。Flink将所有的状态分为两⼤类:
keyed state
(键控状态) 与
operatorstate
(运算符状态) .所谓的keyed state指的是Flink底层会给每⼀个Key绑定若⼲个类型的状态值,特指
KeyedStream
中所涉及的状态。所谓operator state指的是
⾮keyed stream
中所涉及状态称为operatorstate,所有的operator state会将状态和具体某个操作符进⾏绑定。
⽆论是 keyed state 还是 operator state flink将这些状态管理底层分为两种存储形式:
Managed State
和
Raw State
Managed State- 所谓的Managed State,指的是由Flink控制状态存储结构,例如:状态数据结构、数据类型等,由于是Flink⾃⼰管理状态,因此Flink可以更好的针对于管理状态做内存的优化和故障恢复。
Raw State - 所谓的Raw state,指的是Flink对状态的信息和结构⼀⽆所知,Flink仅仅知道该状态是⼀些⼆进制字节数组,需要⽤户⾃⼰完成状态序列化和反序列化。因此Raw State Flink不能够针对性的做内存优化,也不⽀持故障状态的恢复。因此在Flink实战项⽬开发中,⼏乎不使⽤Raw State.
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html
Managed Keyed State
flink中的managed keyed state接⼝提供访问不同数据类型状态,这些状态都是和key进⾏绑定的。这意味着这种状态只能在KeyedStream上使⽤。flink内建了以下六种类型的state:
类型 | 使用场景 | 方法 |
---|---|---|
ValueState | 该状态主要⽤于存储单⼀状态值。 | Tvalue() update(T) clear() |
ListState | 该状态主要⽤于存储单集合状态值。 | add(T) addAll(List) update(List) Iterable get() clear() |
MapState<UK, UV> | 该状态主要⽤于存储⼀个Map集合 | put(UK, UV) putAll(Map) get(UK) entries() keys() clear() |
ReducingState | 该状态主要⽤于存储单⼀状态值。该状态会将添加的元素和历史状态⾃动做运算,调⽤⽤户提供的ReduceFunction | add(T) Tget() clear() |
AggregatingState<IN,OUT> | 该状态主要⽤于存储单⼀状态值。该状态会将添加的元素和历史状态⾃动做运算,调⽤⽤户提供的AggregateFunction,该状态和ReducingState不同点在于数据输⼊和输出类型可以不⼀致 | add(IN) OUT get() clear() |
FoldingState<T,ACC> | 该状态主要⽤于存储单⼀状态值。该状态会将添加的元素和历史状态⾃动做运算,调⽤⽤户提供的FoldFunction,该状态和ReducingState不同点在于数据输⼊和中间结果类型可以不⼀致 | add(T) Tget() clear() |
务必记住,这些状态对象仅用于与状态交互。状态不一定存储在内存,但可能驻留在磁盘或其他地方。要记住的第二件事是,您从状态获得的值取决于输入元素的键。因此,如果涉及的键不同,那么在一次用户函数调用中得到的值可能与在另一次调用中得到的值不同。
要获得状态句柄,必须创建状态描述符()。它包含状态的名称(稍后我们将看到,您可以创建多个状态,并且它们必须具有惟一的名称,以便您可以引用它们)、状态包含的值的类型,以及用户指定的函数,例如
StateDescriptor
。根据您希望检索的状态类型,您可以创建
ReduceFunction
、
ValueStateDescriptor
、
ListStateDescriptor
、
ReducingStateDescriptor
或
FoldingStateDescriptor
。
MapStateDescriptor
使用RuntimeContext访问状态,因此只有在rich 函数中才有可能。请参阅这里的信息。在RichFunction中可用的RuntimeContext有以下访问状态的方法:
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- AggregatingState getAggregatingState(AggregatingStateDescriptor)
- FoldingState getFoldingState(FoldingStateDescriptor)
- MapState getMapState(MapStateDescriptor)
ValueState
该状态主要⽤于存储单⼀状态值。
object FlinkWordCountValueState {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.map(new WordCountMapFunction)//调用创建状态的自定义方法
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class WordCountMapFunction extends RichMapFunction[(String,Int),(String,Int)]{
var vs:ValueState[Int]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val vsd = new ValueStateDescriptor[Int]("wordcount", createTypeInformation[Int])
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
vs=context.getState(vsd)
}
override def map(value: (String, Int)): (String, Int) = {
//获取历史值
val historyData = vs.value()
//更新状态
vs.update(historyData+value._2)
//返回最新值
(value._1,vs.value())
}
}
ListState
该状态主要⽤于存储单集合状态值。
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化 001 zhangsan 电⼦类 xxxx 001 zhangsan ⼿机类 xxxx 001
zhangsan ⺟婴类 xxxx
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.map(line=>line.split("\\s+"))
.map(ts=>(ts(0)+":"+ts(1),ts(2)))
.keyBy(0)
.map(new UserVisitedMapFunction)//*
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class UserVisitedMapFunction extends RichMapFunction[(String,String),(String,String)]{
var userVisited:ListState[String]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val lsd = new ListStateDescriptor[String]("userVisited",
createTypeInformation[String])
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
userVisited=context.getListState(lsd)
}
override def map(value: (String, String)): (String, String) = {
//获取历史值
var historyData = userVisited.get().asScala.toList
//更新状态
historyData = historyData.::(value._2).distinct
userVisited.update(historyData.asJava)
//返回最新值
(value._1,historyData.mkString(" | "))
}
}
MapState
该状态主要⽤于存储⼀个Map集合
object FlinkUserVisitedMapState {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化 001 zhangsan 电⼦类 xxxx 001 zhangsan ⼿机类 xxxx 001
zhangsan ⺟婴类 xxxx
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.map(line=>line.split("\\s+"))
.map(ts=>(ts(0)+":"+ts(1),ts(2)))
.keyBy(0)
.map(new UserVisitedMapMapFunction)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class UserVisitedMapMapFunction extends RichMapFunction[(String,String),
(String,String)]{
var userVisitedMap:MapState[String,Int]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val msd = new MapStateDescriptor[String,Int]("UserVisitedMap",
createTypeInformation[String],createTypeInformation[Int])
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
userVisitedMap=context.getMapState(msd)
}
override def map(value: (String, String)): (String, String) = {
var count=0
if(userVisitedMap.contains(value._2)){
count=userVisitedMap.get(value._2)
}
userVisitedMap.put(value._2,count+1)
var historyList= userVisitedMap.entries()
.asScala
.map(entry=> entry.getKey+":"+entry.getValue)
.toList
//返回最新值
(value._1,historyList.mkString(" | "))
}
ReducingState
该状态主要⽤于存储单⼀状态值。该状态会将添加的元素
和历史状态⾃动做运算,调⽤⽤户提供的ReduceFunction
object FlinkWordCountReduceState {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.map(new WordCountReduceStateMapFunction)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class WordCountReduceStateMapFunction extends RichMapFunction[(String,Int),
(String,Int)]{
var rs:ReducingState[Int]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val rsd = new ReducingStateDescriptor[Int]("wordcountReducingStateDescriptor",
new ReduceFunction[Int](){
override def reduce(v1: Int, v2: Int): Int = v1+v2
},createTypeInformation[Int])
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
rs=context.getReducingState(rsd)
}
override def map(value: (String, Int)): (String, Int) = {
rs.add(value._2)
//返回最新值
(value._1,rs.get())
}
}
AggregatingState
该状态主要⽤于存储单⼀状态值。该状态会将添加的元素
和历史状态⾃动做运算,调⽤⽤户提供的
AggregateFunction,该状态和ReducingState不同点在于数
据输⼊和输出类型可以不⼀致
object FlinkUserOrderAggregatingState {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化 001 zhangsan 1000
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.map(line=>line.split("\\s+"))
.map(ts=>(ts(0)+":"+ts(1),ts(2).toDouble))
.keyBy(0)
.map(new UserOrderAggregatingStateMapFunction)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class UserOrderAggregatingStateMapFunction extends RichMapFunction[(String,Double),
(String,Double)]{
var as:AggregatingState[Double,Double]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val asd = new AggregatingStateDescriptor[Double,(Int,Double),Double]
("userOrderAggregatingStateMapFunction",
new AggregateFunction[Double,(Int,Double),Double](){
override def createAccumulator(): (Int, Double) = (0,0.0)
override def add(value: Double, accumulator: (Int, Double)): (Int, Double) = {
(accumulator._1+1,accumulator._2+value)
}
override def getResult(accumulator: (Int, Double)): Double = {
accumulator._2/accumulator._1
}
override def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) = {
(a._1+b._1,a._2+b._2)
}
},createTypeInformation[(Int,Double)])
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
as=context.getAggregatingState(asd)
}
override def map(value: (String, Double)): (String, Double) = {
as.add(value._2)
//返回最新值
(value._1,as.get())
}
}
FoldingState
该状态主要⽤于存储单⼀状态值。该状态会将添加的元素
和历史状态⾃动做运算,调⽤⽤户提供的FoldFunction,
该状态和ReducingState不同点在于数据输⼊和中间结果类
型可以不⼀致
object FlinkUserOrderFoldState {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化 001 zhangsan 1000
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.map(line=>line.split("\\s+"))
.map(ts=>(ts(0)+":"+ts(1),ts(2).toDouble))
.keyBy(0)
.map(new UserOrderAvgMapFunction)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class UserOrderAvgMapFunction extends RichMapFunction[(String,Double),(String,Double)]
{
var rs:ReducingState[Int]=_
var fs:FoldingState[Double,Double]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val rsd = new ReducingStateDescriptor[Int]("wordcountReducingStateDescriptor",
new ReduceFunction[Int](){
override def reduce(v1: Int, v2: Int): Int = v1+v2
},createTypeInformation[Int])
val fsd=new FoldingStateDescriptor[Double,Double]("foldstate",0,new
FoldFunction[Double,Double](){
override def fold(accumulator: Double, value: Double): Double = {
accumulator+value
}
State Time-To-Live (TTL)
Flink⽀持对上有的keyed state的状态指定TTL存活时间,配置状态的时效性,该特性默认是关闭。⼀旦开
启该特性,Flink会尽最⼤努⼒删除过期状态。TTL⽀持单⼀值失效特性,同时也⽀持集合类型数据失
效,例如MapState和ListState中的元素,每个元素都有⾃⼰的时效时间。
基本使⽤
},createTypeInformation[Double])
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
rs=context.getReducingState(rsd)
fs=context.getFoldingState(fsd)
}
override def map(value: (String, Double)): (String, Double) = {
rs.add(1)
fs.add(value._2)
//返回最新值
(value._1,fs.get()/rs.get())
}
}
State Time-To-Live (TTL) (状态的生存时间)
Flink⽀持对上有的keyed state的状态指定TTL存活时间,配置状态的时效性,该特性默认是关闭。⼀旦开启该特性,Flink会尽最⼤努⼒删除过期状态。TTL⽀持单⼀值失效特性,同时也⽀持集合类型数据失效,例如MapState和ListState中的元素,每个元素都有⾃⼰的时效时间。
如何开启使用
//1.创建对应状态描述符
val xsd = new XxxxStateDescriptor[Int]("wordcount", createTypeInformation[Int])
//设置TTL实效性
val stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //设置存活时间5s ①
.setUpdateType(UpdateType.OnCreateAndWrite) //创建、修改重新更新时间 ②
.setStateVisibility(StateVisibility.NeverReturnExpired) //永不返回过期数据 ③
.build()
//启⽤TTL特性
xsd.enableTimeToLive(stateTtlConfig)
注意:
①:该参数指定State存活时间,必须指定。
②:该参数指定State实效时间更新时机,默认值
OnCreateAndWrite
OnCreateAndWrite: 只有修改操作,才会更新时间
OnReadAndWrite:只有访问读取、修改state时间就会更新
③:设置state的可⻅性,默认值
NeverReturnExpired
NeverReturnExpired:永不返回过期状态
ReturnExpiredIfNotCleanedUp:如果flink没有删除过期的状态数据,系统会将过期的数据返回
提醒:
⼀旦⽤户开启了TTL特征,系统每个存储的状态数据会额外开辟8bytes(Long类型)的字节⼤⼩,⽤于存储state时间戳;系统的时效时间⽬前仅仅⽀持的是计算节点时间;如果程序⼀开始没有开启TTL,在服务重启以后,开启了TTL,此时服务在故障恢复的时候,会报错!
Cleanup Of Expired State (清除过期状态)
Flink默认仅仅当⽤户读状态的时候,才会去检查状态数据是否失效,如果失效将失效的数据⽴即删除。但就会导致系统在⻓时间运⾏的时候,会存在很多数据已经过期了,但是系统⼜没有去读取过期的状态数据,该数据⼀直驻留在内存中。
在flink-1.10版本中,系统可以根据State backend配置,定期在后台收集失效状态进⾏删除。⽤户可以通过调⽤以下API关闭⾃动清理。
val stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //设置存活时间5s
.setUpdateType(UpdateType.OnCreateAndWrite) //创建、修改重新更新时间
.setStateVisibility(StateVisibility.NeverReturnExpired) //永不返回过期数据
.disableCleanupInBackground()
.build()
提醒:
早期版本需要⽤户⼿动调⽤cleanupInBackground开启后台清理。flink-1.10版本该特性⾃动打开。
Checkpoint & Savepoint 检查点和保存点
由于Flink是⼀个有状态计算的流服务,因此状态的管理和容错是⾮常重要的。为了保证程序的健壮性,
Flink提出Checkpoint机制,该机制⽤于持久化计算节点的状态数据,继⽽实现Flink故障恢复。所谓的Checkpoint机制指的是Flink会定期的持久化的状态数据。将状态数据持久化到远程⽂件系统(取决于State backend),例如HDFS,该检查点协调或者发起是由JobManager负责实施。JobManager会定期向下游的计算节点发送Barrier(栅栏),下游计算节点收到该Barrier信号之后,会预先提交⾃⼰的状态信息,并且给JobManage以应答,同时会继续将接收到的Barrier继续传递给下游的任务节点,⼀次内推,所有的下游计算节点在收到该Barrier信号的时候都会做预提交⾃⼰的状态信息。等到所有的下游节点都完成了状态的预提交,并且JobManager收集完成所有下游节点的应答之后,JobManager才会认定此次的Checkpoint是成功的,并且会⾃动删除上⼀次检查点数据。
Savepoint是⼿动触发的Checkpoint, Savepoint为程序创建快照并将其写到
State Backend
。Savepoint依靠常规的Checkpoint机制。所谓的Checkpoint指的是程序在执⾏期间,程序会定期在⼯作节点上快照并产⽣Checkpoint。为了进⾏恢复,仅需要获取最后⼀次完成的Checkpoint即可,并且可以在新的Checkpoint完成后⽴即安全地丢弃较旧的Checkpoint。
Savepoint与这些定期Checkpoint类似,Savepoint由⽤户触发并且更新的Checkpoint完成时不会⾃动过期。⽤户可以使⽤命令⾏或通过REST API取消作业时创建Savepoint由于Flink 中的Checkpoint机制默认是不开启的,需要⽤户通过调⽤以下⽅法开启检查点机制。
env.enableCheckpointing(1000); //间隔1s执行一次checkpoint
为了控制检查点执⾏的⼀些细节,Flink⽀持⽤户
定制Checkpoiont
的⼀些⾏为。
//间隔5s执⾏⼀次checkpoint 精准⼀次//设置检查点超时 4s
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//开启本次检查点 与上⼀次完成的检查点时间间隔不得⼩于 2s 优先级⾼于
env.getCheckpointConfig.setCheckpointTimeout(4000)
//如果检查点失败,任务宣告退出 setFailOnCheckpointingErrors(true)
checkpoint interval env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
//设置如果任务取消,系统该如何处理检查点数据
//RETAIN_ON_CANCELLATION:如果取消任务的时候,没有加–savepoint,系统会保留检查点数据
//DELETE_ON_CANCELLATION:取消任务,⾃动是删除检查点(不建议使⽤)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup. RETAIN_ON_CANCELLATION)
State Backend 状态后端
Flink指定多种State Backend实现,State Backend指定了状态数据(检查点数据)存储的位置信息。配置Flink的状态后端的⽅式有两种:
- 每个计算独⽴ 状态后端
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(…)
- 全局默认状态后端,需要在
配置flink-conf.yaml
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: rocksdb
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: hdfs:///flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir: hdfs:///flink-savepoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
state.backend.incremental: false
由于状态后端需要将数据同步到HDFS,因此Flink必须能够连接HDFS,所以需要在
注意呦:
~/.bashrc
配
置
HADOOP_CLASSPATH
JAVA_HOME=/usr/java/latest
HADOOP_HOME=/usr/hadoop-2.9.2
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export CLASSPATH
export PATH
export HADOOP_HOME
export HADOOP_CLASSPATH=`hadoop classpath`
MemoryStateBackend(jobmanager) 内存状态后端
MemoryStateBackend 使⽤内存存储内部状态数据,将状态数据存储在在Java的堆中。在Checkpoint时候,此状态后端将对该状态进⾏快照,并将其作为检查点确认消息的⼀部分发送给JobManager(主服务器),该JobManager也将其存储在其堆中。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, true))
限制
- 默认情况下,每个状态的大小限制为5 MB。这个值可以在
的构造函数中增加。MemoryStateBackend
-
不管配置的最大状态大小如何,状态都不能大于akka帧的大小
(参见配置)。
- 聚合状态必须适合于JobManager内存。
可用场景
1)本地部署进⾏debug调试的可以使⽤
2)不仅涉及太多的状态管理。
FsStateBackend(filesystem)
该种状态后端实现是将数据的状态存储在TaskManager(计算节点)的内存。在执⾏检查点的时候后会将TaskManager内存的数据写⼊远程的⽂件系统。⾮常少量的元数据想信息会存储在JobManager的内存中。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend(“hdfs:///flink-checkpoints”,true))
场景:
1)当⽤户有⾮常⼤的状态需要管理
2)所有⽣产环境
RocksDBStateBackend(rocksdb)
该种状态后端实现是将数据的状态存储在TaskManager(计算节点)的本地的RocksDB数据⽂件中。在执⾏检查点的时候后会将TaskManager本地的RocksDB数据库⽂件写⼊远程的⽂件系统。⾮常少量的元数据想信息会存储在JobManager的内存中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
</dependency>
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink-rocksdb-checkpoints",true))
限制
由于RocksDB的JNI桥API基于byte[],所以每个键和每个值支持的最大大小为2 ^ 31个字节。重要提示:在RocksDB中使用合并操作的状态(例如ListState)可以悄悄地累积值大小> 2^31字节,然后在下一次检索时失败。这是RocksDB JNI当前的一个限制。
使用场景:
1)当⽤户有超⼤的状态需要管理
2)所有⽣产环境
请注意,您可以保留的状态量仅受可用磁盘空间量的限制。与将状态保存在内存中的FsStateBackend相比,这允许保存非常大的状态。然而,这也意味着使用此状态后端可以实现的最大吞吐量会更低。从/到这个后端进行的所有读/写都必须经过反序列化来检索/存储状态对象,这也比基于堆的后端处理堆上的表示要昂贵得多。
Managed Operator State 管理运算符状态
Flink提供了基于keyed stream操作符状态称为keyedstate,对于⼀些⾮keyed stream的操作中使⽤的状态统称为Operator State,如果⽤户希望使⽤Operator State需要实现通⽤的
CheckpointedFunction
接⼝或者
ListCheckpointed
CheckpointedFunction
其中CheckpointedFunction接⼝提供non-keyed state的不同状态分发策略。⽤户在实现该接⼝的时候需要实现以下两个⽅法:
public interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
}
- **snapshotState:**当系统进⾏Checkpoint的时候,系统回调⽤该⽅法,通常⽤户需要将持久化的状态数据存储到状态中。
- initializeState: 当第⼀次启动的时候系统⾃动调⽤initializeState,进⾏状态初始化。或者系统在故障恢复的时候进⾏状态的恢复。
无论何时必须执行检查点,都会调用snapshotState()。对应的initializeState()在每次初始化用户定义的函数时都会被调用,无论是在函数第一次初始化时,还是在函数从之前的检查点实际恢复时。考虑到这一点,initializeState()不仅仅是不同类型状态的地方初始化,但也包括状态恢复逻辑。
当前,Operator State⽀持list-style的Managed State。该状态应为彼此独⽴的可序列化对象的列表,因此在系统故障恢复的时候才有可能进⾏重新分配。⽬前Flink针对于Operator State分配⽅案有以下两种:
Even-split redistribution -
每⼀个操作符实例都会保留⼀个List的状态,因此Operator State逻辑上是将该Operator的并⾏实例的所有的List状态拼接成⼀个完成的List State。当系统在恢复、重新分发状态的时候,系统会根据当前Operator实例并⾏度,对当前的状态进⾏均分。例如,如果在并⾏度为1的情况下,Operator的检查点状态包含元素element1和element2,则在将Operator并⾏度提⾼到2时,element1可能会分配给Operator Instance 0,⽽element2将进⼊Operator Instance 1
Union redistribution: -
每⼀个操作符实例都会保留⼀个List的状态,因此Operator State逻辑上是将该Operator的并⾏实例的所有的List状态拼接成⼀个完成的List State。在还原/重新分发状态时,每个Operator实例都会获得状态元素的完整列表。
object FlinkWordCountValueStateCheckpoint {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink-rocksdbcheckpoints",true))
//间隔5s执⾏⼀次checkpoint 精准⼀次
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//设置检查点超时 4s
env.getCheckpointConfig.setCheckpointTimeout(4000)
//开启本次检查点 与上⼀次完成的检查点时间间隔不得⼩于 2s 优先级⾼于 checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
//如果检查点失败,任务宣告退出 setFailOnCheckpointingErrors(true)
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
//设置如果任务取消,系统该如何处理检查点数据
//RETAIN_ON_CANCELLATION:如果取消任务的时候,没有加--savepoint,系统会保留检查点数据
//DELETE_ON_CANCELLATION:取消任务,⾃动是删除检查点(不建议使⽤)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.R
ETAIN_ON_CANCELLATION)
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.map(new WordCountMapFunction)
.uid("wc-map")
//4.将计算的结果在控制打印
counts.addSink(new UserDefineBufferSinkEvenSplit(3))
.uid("buffer-sink")
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class WordCountMapFunction extends RichMapFunction[(String,Int),(String,Int)]{
var vs:ValueState[Int]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val vsd = new ValueStateDescriptor[Int]("wordcount", createTypeInformation[Int])
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
vs=context.getState(vsd)
}
override def map(value: (String, Int)): (String, Int) = {
//获取历史值
val historyData = vs.value()
//更新状态
vs.update(historyData+value._2)
//返回最新值
(value._1,vs.value())
}
}
class UserDefineBufferSinkEvenSplit(threshold: Int = 0) extends SinkFunction[(String,
Int)] with CheckpointedFunction{
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
//复写写出逻辑
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit =
{
bufferedElements += value
if(bufferedElements.size >= threshold){
for(e <- bufferedElements){
println("元素:"+e)
}
bufferedElements.clear()
}
}
//需要将状态数据存储起来
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
checkpointedState.update(bufferedElements.asJava)//直接将状态数据存储起来
}
//初始化状态逻辑、状态恢复逻辑
override def initializeState(context: FunctionInitializationContext): Unit = {
//初始化状态、也有可能是故障恢复
val lsd=new ListStateDescriptor[(String, Int)]("liststate",createTypeInformation[(String,Int)])
//默认均分⽅式恢复
checkpointedState = context.getOperatorStateStore.getListState(lsd)
//context.getOperatorStateStore.getUnionListState(lsd) //默认⼴播⽅式恢复
if(context.isRestored){ //实现故障恢复逻辑
bufferedElements.appendAll(checkpointedState.get().asScala.toList)
}
}
}
ListCheckpointed 检查点列表
ListCheckpointed接⼝是CheckpointedFunction的更有限的变体写法。因为该接⼝仅仅⽀持list-style state的Even Split分发策略。
public interface ListCheckpointed<T extends Serializable> {
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
}
- snapshotState:在做系统检查点的时候,⽤户只需要将需要存储的数据返回即可。
- restoreState:直接提供给⽤户需要恢复状态。
在snapshotState()中,操作符应该向检查点返回一个对象列表,而restoreState必须在恢复时处理这样的列表。如果状态不可重分区,则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)。
object FlinkCounterSource {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink-rocksdbcheckpoints",true))
//间隔5s执⾏⼀次checkpoint 精准⼀次
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//设置检查点超时 4s
env.getCheckpointConfig.setCheckpointTimeout(4000)
//开启本次检查点 与上⼀次完成的检查点时间间隔不得⼩于 2s 优先级⾼于 checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
//如果检查点失败,任务宣告退出 setFailOnCheckpointingErrors(true)
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
//设置如果任务取消,系统该如何处理检查点数据
//RETAIN_ON_CANCELLATION:如果取消任务的时候,没有加--savepoint,系统会保留检查点数据
//DELETE_ON_CANCELLATION:取消任务,⾃动是删除检查点(不建议使⽤)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.R
ETAIN_ON_CANCELLATION)
val text = env.addSource(new UserDefineCounterSource)
.uid("UserDefineCounterSource")
text.print("offset")
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class UserDefineCounterSource extends RichParallelSourceFunction[Long] with
ListCheckpointed[JLong]{
@volatile
private var isRunning = true
private var offset = 0L
//存储状态值
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] =
{
println("snapshotState:"+offset)
Collections.singletonList(offset)//返回⼀个不可拆分集合
}
override def restoreState(state: util.List[JLong]): Unit = {
println("restoreState:"+state.asScala)
offset=state.asScala.head //取第⼀个元素
}
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
val lock = ctx.getCheckpointLock
while (isRunning) {
Thread.sleep(1000)
lock.synchronized({
ctx.collect(offset) //往下游输出当前offset
offset += 1
})
}
}
override def cancel(): Unit = isRunning=false
}
Broadcast State Pattern 广播状态模式
⼴播状态是Flink提供的第三种状态共享的场景。通常需要将⼀个吞吐量⽐较⼩的流中状态数据进⾏⼴播给下游的任务,另外⼀个流可以以只读的形式读取⼴播状态。
第三种受支持的操作符状态是广播状态。广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。例如,广播状态可以自然地出现,可以想象一个低吞吐量流包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。
考虑到上述类型的用例,广播状态与其他操作符状态的区别在于:
- it has a map format,(他有映射格式)
- 它只对特定的操作符可用,这些操作符有一个广播流和一个非广播流作为输入,
- 并且这样的操作符可以具有多个具有不同名称的广播状态。
案例剖析
DataStream链接 BroadcastStream
//仅仅输出满⾜规则的数据
object FlinkBroadcastNonKeyedStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//吞吐量⾼
val inputs = env.socketTextStream("CentOS", 9999)
//定义需要⼴播流 吞吐量低
val bcsd=new MapStateDescriptor[String,String]
("bcsd",createTypeInformation[String],createTypeInformation[String])
val broadcaststream = env.socketTextStream("CentOS", 8888)
.broadcast(bcsd)
val tag = new OutputTag[String]("notmatch")
val datastream = inputs.connect(broadcaststream)
.process(new UserDefineBroadcastProcessFunction(tag, bcsd))
datastream.print("满⾜条件")
datastream.getSideOutput(tag).print("不满⾜")
env.execute("Window Stream WordCount")
}
}
//第⼀个流类型 第⼆个流类型 输出类型
class UserDefineBroadcastProcessFunction(tag:OutputTag[String],msd:MapStateDescriptor[String
,String]) extends BroadcastProcessFunction[String,String,String]{
//处理正常流 ⾼吞吐 ,通常在改法读取⼴播状态
override def processElement(value: String, ctx: BroadcastProcessFunction[String, String,
String]#ReadOnlyContext,
out: Collector[String]): Unit = {
//获取状态 只读
val readOnlyMapstate = ctx.getBroadcastState(msd)
if(readOnlyMapstate.contains("rule")){
val rule=readOnlyMapstate.get("rule")
if(value.contains(rule)){//将数据写出去
out.collect(rule+"\t"+value)
}else{
ctx.output(tag,rule+"\t"+value)
}
}else{//使⽤Side out将数据输出
ctx.output(tag,value)
}
}
//处理⼴播流,通常在这⾥修改需要⼴播的状态 低吞吐
override def processBroadcastElement(value: String,
ctx: BroadcastProcessFunction[String, String,
String]#Context,
out: Collector[String]): Unit = {
val mapstate = ctx.getBroadcastState(msd)
mapstate.put("rule",value)
}
}
KeyedStream链接 BroadcastStream
case class OrderItem(id:String,name:String,category:String,count:Int,price:Double)
case class Rule(category:String,threshold:Double)
case class User(id:String,name:String)
object FlinkBroadcastKeyedStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//id name 品类 数量 单价 -- 订单项
//1 zhangsan ⽔果 2 4.5
//吞吐量⾼
val inputs = env.socketTextStream("CentOS", 9999)
.map(line=>line.split("\\s+"))
.map(ts=>OrderItem(ts(0),ts(1),ts(2),ts(3).toInt,ts(4).toDouble))
.keyBy(orderItem=> orderItem.category+":"+orderItem.id)
//品类 阈值 ⽔果 8.0 -- 奖 励
val bcsd=new MapStateDescriptor[String,Double]
("bcsd",createTypeInformation[String],createTypeInformation[Double])
val broadcaststream = env.socketTextStream("CentOS", 8888)
.map(line=>line.split("\\s+"))
.map(ts=>Rule(ts(0),ts(1).toDouble))
.broadcast(bcsd)
inputs.connect(broadcaststream)
.process(new UserDefineKeyedBroadcastProcessFunction(bcsd))
.print("奖励:")
env.execute("Window Stream WordCount")
}
}
//key类型 第⼀个流类型 第⼆个流类型 输出类型
class UserDefineKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Double])
extends
KeyedBroadcastProcessFunction[String,OrderItem,Rule,User]{
var userTotalCost:ReducingState[Double]=_
override def open(parameters: Configuration): Unit = {
val rsd = new ReducingStateDescriptor[Double]("userTotalCost", new
ReduceFunction[Double] {
override def reduce(value1: Double, value2: Double): Double = value1 + value2
}, createTypeInformation[Double])
userTotalCost=getRuntimeContext.getReducingState(rsd)
}
override def processElement(value: OrderItem,
ctx: KeyedBroadcastProcessFunction[String, OrderItem,
Rule, User]#ReadOnlyContext,
out: Collector[User]): Unit = {
//计算出当前品类别下⽤户的总消费
userTotalCost.add(value.count*value.price)
val ruleState = ctx.getBroadcastState(msd)
var u=User(value.id,value.name)
//设定的有奖励规则
if(ruleState!=null && ruleState.contains(value.category)){
if(userTotalCost.get() >= ruleState.get(value.category)){//达到了奖励阈值
out.collect(u)
userTotalCost.clear()
}else{
println("不满⾜条件:"+u+" 当前总消费:"+userTotalCost.get()+"
threshold:"+ruleState.get(value.category))
}
}
}
override def processBroadcastElement(value: Rule, ctx:
KeyedBroadcastProcessFunction[String, OrderItem, Rule, User]#Context, out:
Collector[User]): Unit = {
val broadcastState = ctx.getBroadcastState(msd)
broadcastState.put(value.category,value.threshold)
}
}
Queryable State 可查询状态
Architecture 架构
Client连接其中的⼀个代理服务区然后发送查询请求给Proxy服务器,查询指定key所对应的状态数据,底层Flink按照KeyGroup的⽅式管理Keyed State,这些KeyGroup被分配给了所有的TaskMnager的服务。每个TaskManage服务多个KeyGroup状态的存储。为了找到查询key所在的KeyGroup所属地TaskManager服务,Proxy服务会去询问JobManager查询TaskManager的信息,然后直接访问TaskManager上的QueryableStateServer服务器获取状态数据,最后将获取的状态数据返回给Client端。
- QueryableStateClient- 运⾏在Flink集群以外,负责提交⽤户的查询给Flink集群
- QueryableStateClientProxy- 运⾏在Flink集群中的TaskManager中的⼀个代理服务,负责接收客户端的查询,代理负责相应TaskManager获取请求的state,并将其state返回给客户端
- **QueryableStateServer -**运⾏在Flink集群中的TaskManager中服务,仅仅负责读取当前TaskManage主机上存储到状态数据。
客户端连接到其中一个代理,并发送一个与特定密钥k相关联的状态请求。正如在与状态一起工作中所述,键控状态被组织在关键组中,并且每个关键组TaskManager被分配了许多这样的关键组。为了发现哪个TaskManager负责持有k的密钥组,代理将询问JobManager。根据答案,代理将查询在任务管理器上运行的QueryableStateServer,以获得与k相关的状态,并将响应转发回客户机。
激活 Queryable State
- 将Flink的 opt/ 拷⻉ flink-queryable-state-runtime_2.11-1.10.0.jar 到Flink的lib/ ⽬录.
[[email protected] flink-1.10.0]# cp opt/flink-queryable-state-runtime_2.11-1.10.0.jar lib/
- 在Flink的flink-conf.yaml配置⽂件中添加以下配置
queryable-state.enable: true
- 重启Flink服务,为了校验服务是否开启你可以查看task manager⽇志,可以看到
."Started the Queryable State Proxy Server @ ..."
[[email protected] flink-1.10.0]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host CentOS.
Starting taskexecutor daemon on host CentOS.
查看TaskManager启动⽇志
Making State Queryable 使状态可查询
为了使State对外界可⻅,需要使⽤以下命令显式地使其可查询:
- 创建QueryableStateStream,该QueryableStateStream充当⼀个Sink的输出,仅仅是将数据存储到state中。
- 或者stateDescriptor.setQueryable(String queryableStateName)⽅法使得我们的状态可查询。
Queryable State Stream 可查询状态流
⽤户可以调⽤keyedstream的.asQueryableState(stateName, stateDescriptor)⽅法,提供⼀个可以查询状态。
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
注意:
没有可查询的ListState接收器,因为它会导致一个不断增长的列表,这个列表可能不会被清除,因此最终会消耗太多的内存。
返回的QueryableStateStream可以看作是⼀个Sink,因为⽆法对QueryableStateStream进⼀步转换。在内部,QueryableStateStream被转换为运算符,该运算符使⽤所有传⼊记录来更新可查询状态实例。更新
逻辑由asQueryableState调⽤中提供的StateDescriptor的类型隐含。在类似以下的程序中,keyedstream的所有记录将通过ValueState.update(value)⽤于更新状态实例:
stream.keyBy(0).asQueryableState(“query-name”)
object FlinkWordCountQueryableStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
var rsd=new ReducingStateDescriptor[(String,Int)]("reducestate",new
ReduceFunction[(String, Int)] {
override def reduce(v1: (String, Int), v2: (String, Int)): (String, Int) = {
(v1._1,(v1._2+v2._2))
}
},createTypeInformation[(String,Int)])
env.socketTextStream("CentOS", 9999)
.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.asQueryableState("wordcount", rsd)//状态名字,后期查询需要
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
Managed Keyed State
object FlinkWordCountQueryable {
def main(args: Array[String]): Unit = {
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("CentOS", 9999)
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.map(new WordCountMapFunction)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Stream WordCount")
}
}
class WordCountMapFunction extends RichMapFunction[(String,Int),(String,Int)]{
var vs:ValueState[Int]=_
override def open(parameters: Configuration): Unit = {
//1.创建对应状态描述符
val vsd = new ValueStateDescriptor[Int]("wordcount", createTypeInformation[Int])
vsd.setQueryable("query-wc")
//2.获取RuntimeContext
var context: RuntimeContext = getRuntimeContext
//3.获取指定类型状态
vs=context.getState(vsd)
}
override def map(value: (String, Int)): (String, Int) = {
//获取历史值
val historyData = vs.value()
//更新状态
vs.update(historyData+value._2)
//返回最新值
(value._1,vs.value())
}
}
Querying State 查询状态
- 引⼊依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>1.10.0</version>
</dependency>
- 查询代码如下
同步获取结果
//链接proxy服务器
val client = new QueryableStateClient("CentOS", 9069)
var jobID=JobID.fromHexString("dc60cd61dc2d591014c062397e3bd6b9")
var queryName="wordcount" //状态名字
var queryKey="this" //⽤户需要查询的 key
var rsd=new ReducingStateDescriptor[(String,Int)]("reducestate",new
ReduceFunction[(String, Int)] {
override def reduce(v1: (String, Int), v2: (String, Int)): (String, Int) = {
(v1._1,(v1._2+v2._2))
}
},createTypeInformation[(String,Int)])
val resultFuture = client.getKvState(jobID, queryName, queryKey,
createTypeInformation[String], rsd)
//同步获取结果
val state: ReducingState[(String, Int)] = resultFuture.get()
println("结果:"+state.get())
client.shutdownAndWait()
异步获取结果
resultFuture.thenAccept(new Consumer[ReducingState[(String, Int)]] {
override def accept(t: ReducingState[(String, Int)]): Unit = {
println("结果:"+t.get())
}
})
Thread.sleep(10000)
client.shutdownAndWait()