天天看点

10.14-10.17为flink项目添加checkpoint对flink checkpoint的理解与实现

目录

  • 对flink checkpoint的理解与实现
    • 背景
    • 什么是flink checkpoint
      • 链接
      • 我的一些理解
        • checkpoint实现流程
        • checkpoint存储
      • checkpoint实现
      • checkpoint和savepoint的区别
    • AB Test

对flink checkpoint的理解与实现

背景

由于我们公司的实时架构主要是kafka -> spark/storm -> kafka -> druid,用flink跑的任务寥寥,老大让我调研flink的优势(说不定哪天公司就转flink了呢(狗头)),而flink最大的优势就是计算高效性和checkpoint保证数据唯一性。

什么是flink checkpoint

链接

https://mp.weixin.qq.com/s?__biz=MzA3MDY0NTMxOQ==&mid=2247485150&idx=1&sn=ba0570a9f631501a189d8184807d518b&chksm=9f38e5f6a84f6ce0dd805eae87d6c62e2c96670f14c2bac1607961614ba49f310a4630e31a3b&scene=21#wechat_redirect kafka-flink-kafka的checkpoint过程

https://www.jianshu.com/p/4d31d6cddc99 excatly-once

我的一些理解

checkpoint的存在是为了实现exactly-once,即:在任务出现cancel或中断的情况下,数据流的每份数据都能被处理到且只能被处理一次。spark的checkpoint无法实现该需求,这也是flink优于spark的一点。

checkpoint实现流程

10.14-10.17为flink项目添加checkpoint对flink checkpoint的理解与实现

每隔一段时间,jobmanager会在数据流中插入barrier。barriar在每个operator中流过,当流过一个operator时,operator会打一个snapshot,即快照,并完成以下两步:1、将快照内容,即此时的state信息发送给backend。2、将barrier传递给下一个operator。

需要注意的是:

1、只有backend接收到一个checkpoint周期里的所有state信息,才能真正存储成为一个checkpoint。若有一个丢失或没接收到,则丢弃该checkpoint,视为没完成。

2、为了保证所有operator都正常commit,flink采用“两阶段提交”机制,每次operator的提交都为pre-commit,一旦所有operator完成各自的pre-commit,它们会发起一个commit操作。倘若有一个pre-commit失败,所有其他的pre-commit必须被终止,并且Flink会回滚到最近成功完成decheckpoint。

checkpoint存储

若在定义checkpoint时不显式声明fsbackend…等,则默认将checkpoint存储在taskmanager上,且只保留最近一次成功的checkpoint。

当然,如果你想保存在hdfs上,则需要在env中定义一个fsstatebackend,定义其存储路径

val checkPointPath = "hdfs:///user/flink/checkpoints"
    val fsStateBackend:StateBackend = new FsStateBackend(checkPointPath)
    env.setStateBackend(fsStateBackend)
    外部存储的话,就会保留每一个checkpoint,可以选择在任意节点进行恢复
           

checkpoint实现

不多bb,直接上代码

//设置checkpoint
    //checkpoint默认关闭,开启checkpoint,并设置周期为1000ms
    env.enableCheckpointing(60000)
    //设置处理数据形式为exactly-once
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //检查点间隔至少为500ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    //检查点必须在1分钟内完成,否则被丢弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    //同一时间只允许进行一个检查点
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    //表示flink程序被cancel时,会自动保留checkpoint数据以便恢复(外部持久化)
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //表示当checkpoint发生任何异常时,直接fail该task
    env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
设置周期的时候,要根据你的数据量来决定。当你数据量很大,周期要设置小一点,延缓任务恢复时候的计算压力。
           

checkpoint和savepoint的区别

https://blog.csdn.net/qq_40651753/article/details/88427115

简单来说:一个自动,一个人为。

AB Test

将老大的项目clone下来

git clone [email protected]:bpd/bpd_realtime.git

在分支里修改flink项目代码,增加checkpoint部分

cd bpd_realtime/ -> git pull -> git config -l -> git branch (-r) -> git status -> git commit -a -m '测试修改' -> git push

在服务器中编写flink项目运行的sh文件

#!/bin/sh
source ~/.bashrc

ls_date=`date +%Y%m%d%k%M`
if [ x$1 != x ]; then
    ls_date=$1
fi

app_name=flink_test_A


appid=`yarn application -list -appStates RUNNING | grep mbbi |grep root.sohumb |grep "$app_name" |awk '{print $1}'`
if [ ! $appid ]; then
    echo "appid is null"

else
    yarn application -kill $appid
    echo $appid
fi

sleep 3

pid=`ps -ef |grep $app_name |grep '/bin/sh' |awk '{print $2}'`
if [ $pid ]; then
    echo "kill pid " $pid
    kill -9 $pid

fi

pid_java=`ps -ef |grep $app_name |grep '/bin/java' |awk '{print $2}'`
if [ $pid_java ]; then
    echo "kill pid_java " $pid_java
    kill -9 $pid_java

fi


/opt/flink-1.7.2/bin/flink run \
-c com.speed.flink.mbbiFlinkTestA \
-ynm $app_name \
-yqu root.sohumb \
-m yarn-cluster \
-p 60 \
/home/mbbi/lyh/bpd_realtime-jar-with-dependencies.jar $app_name$ls_date mbbi_bpd_tracking_charge
           

执行sh文件

nohup ./flink_Test_A.sh &

在Yarn集群观察执行情况及报错

编写json文件,将kafka的那个topic的数据流入druid,便于后续查询数据

{
    "type": "kafka",
    "dataSchema": {
      "dataSource": "mbbi_flink_trackingcharge_test_m",
      "parser": {
        "type": "string",
        "parseSpec": {
          "format": "json",
          "timestampSpec": {
            "column": "timestamp",
            "format": "millis"
          },
          "dimensionsSpec": {
            "dimensions": [
              "appid",
              "os_rename",
              "flag",
              "dt",
              "m",
              "h",
              "tstamp"
            ]
          }
        }
      },
      "metricsSpec": [
          {"name": "all_count_num","fieldName": "count_num","type": "longSum"},
          {"name": "all_sum_charge","fieldName": "sum_charge","type": "longSum"},
	  	    {"name": "all_sum_unify_charge","fieldName": "unification_charge","type": "longSum"}
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "NONE",
        "rollup": true
      }
    },
    "ioConfig": {
      "topic": "mbbi_d_dwd_xps2_trackingcharge_flink_m",
      "consumerProperties": {
        "bootstrap.servers": "kfs159088.heracles.sohuno.com:6667"
      }
    }
}
           
curl -XPOST -H 'Content-Type: application/json' -d @/opt/druid/bpd_druid_conf/mbbi_flink_trackingcharge_test_m.json http://10.16.39.13:8090/druid/indexer/v1/supervisor
           

对任务B,cancel几次;对任务A,不做操作

在实时查询中,通过以下sql来验证checkpoint的作用:

按照小时和flag标示,查询flink计算准确度
select 
sum(cast (all_count_num as bigint)) as all_count_num,
h,
flag 
from mbbi_flink_trackingcharge_test_m 
group by h,flag
           

若出现以下情形,则证明了checkpoint有效。任务结束

10.14-10.17为flink项目添加checkpoint对flink checkpoint的理解与实现

继续阅读