天天看點

Flink 狀态程式設計 chaplinthink

概念

在Flink架構體系中,有狀态計算可以說是Flink非常重要的特性之一

Flink優勢:

  • 支援高吞吐、低延遲、高性能
  • 支援事件時間Event_time概念
  • 支援有狀态計算

有狀态計算是指:

在程式計算過程中,在Flink程式内部存儲計算産生的中間結果,并提供給後續Function或算子計算結果使用。(如下圖所示)

Flink 狀态程式設計 chaplinthink

無狀态計算實作的複雜度相對較低,實作起來較容易,但是無法完成提到的比較複雜的業務場景:

  • CEP(複雜事件處理):擷取符合某一特定事件規則的事件,狀态計算就可以将接入的事件進行存儲,然後等待符合規則的事件觸發
  • 最大值、均值等聚合名額(如pv,uv):
  • 需要利用狀态來維護目前計算過程中産生的結果,例如事件的總數、總和以及最大,最小值等
  • 機器學習場景,維護目前版本模型使用的參數
  • 其他需要使用曆史資料的計算

Flink狀态程式設計

支援的狀态類型

Flink根據資料集是否根據Key進行分區,将狀态分為Keyed State和 Operator State(Non-keyed State) 兩種類型。

其中Keyed State是Operator State的特例,可以通過Key Groups進行管理,主要用于當算子并行度發生變化時,自動重新分布Keyed Sate資料

同時在Flink中Keyed State和Operator State均具有兩種形式:

一種為托管狀态(ManagedState)形式,由Flink Runtime中控制和管理狀态資料,并将狀态資料轉換成為記憶體Hashtables或RocksDB的對象存儲,然後将這些狀态資料通過内部的接口持久化到Checkpoints中,任務異常時可以通過這些狀态資料恢複任務。

另外一種是原生狀态(Raw State)形式,由算子自己管理資料結構,當觸發Checkpoint過程中,Flink并不知道狀态資料内部的資料結構,隻是将資料轉換成bytes資料存儲在Checkpoints中,當從Checkpoints恢複任務時,算子自己再反序列化出狀态的資料結構。

在Flink中推薦使用者使用Managed State管理狀态資料,主要原因是Managed State能夠更好地支援狀态資料的重平衡以及更加完善的記憶體管理。

Managed Keyed State

六種類型

Managed Keyed State 又分為如下六種類型:

Flink 狀态程式設計 chaplinthink

基本API

在Flink中需要通過建立StateDescriptor來擷取相應State的操作類。如下方代碼,建構一個ValueState:

lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))
           

其中對ValueState可以增删改查:

  1. 擷取狀态值
val isPayed = isPayedState.value()
           
  1. 更新狀态值
isPayedState.update(true)
           
  1. 釋放狀态值
isPayedState.clear()
           

狀态的生命周期

對于任何類型Keyed State都可以設定狀态的生命周期(TTL),以確定能夠在規定時間内及時地清理狀态資料。

實作方法:

1、生成StateTtlConfig配置

2、将StateTtlConfig配置傳入StateDescriptor中的enableTimeToLive方法中即可

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
    
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

           

Managed Operator State

Operator State是一種non-keyed state,與并行的操作算子執行個體相關聯,例如在KafkaConnector中,每個Kafka消費端算子執行個體都對應到Kafka的一個分區中,維護Topic分區和Offsets偏移量作為算子的Operator State。在Flink中可以實作Checkpointed-Function或者ListCheckpointed兩個接口來定義操作Managed Operator State的函數。

Case : 訂單延遲告警統計

需求描述

在電商平台中,最終創造收入和利潤的是使用者下單購買的環節;更具體一點,是使用者真正完成支付動作的時候。使用者下單的行為可以表明使用者對商品的需求,但在現實中,并不是每次下單都會被使用者立刻支付。當拖延一段時間後,使用者支付的意願會降低。

是以為了讓使用者更有緊迫感進而提高支付轉化率,同時也為了防範訂單支付環節的安全風險,電商網站往往會對訂單狀态進行監控,設定一個失效時間(比如 15 分鐘),如果下單後一段時間仍未支付,訂單就會被取消。

此時需要給使用者發送一個資訊提醒使用者,提高支付轉換率

需求分析

本需求可以使用CEP來實作, 這裡推薦使用process function原生的狀态程式設計。

問題可以簡化成: 在pay事件逾時未發生的情況下,輸出逾時報警資訊。

一個簡單的思路是:

  1. 在訂單的 create 事件到來後注冊定時器,15分鐘後觸發;
  2. 用一個布爾類型的 Value 狀态來作為辨別位,表明 pay 事件是否發生過。
  3. 如果 pay 事件已經發生,狀态被置為true,那麼就不再需要做什麼操作;
  4. 而如果 pay 事件一直沒來,狀态一直為false,到定時器觸發時,就應該輸出逾時報警資訊。

資料及模型

Demo data:

34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,modify,3hu3k2432,1558430845
34731,create,,1558430846
34731,pay,35jue34we,1558430849
34732,create,,1558430852
34733,create,,1558430855
34734,create,,1558430859
34734,create,,1558431000
34733,pay,,1558431000             
34732,pay,,1558449999   
           

Flink的輸入與輸出類:

//定義輸入訂單事件的樣例類
caseclassOrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)
//定義輸出結果樣例類
caseclassOrderResult(orderId: Long, resultMsg: String)
           

代碼實作

case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)

case class OrderResult(orderId: Long, resultMsg: String)

object OrderTimeOut {
  val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val orderEventStream = env.socketTextStream("127.0.0.1", 9999)
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
      })
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.orderId)

    val orderResultStream = orderEventStream.process(new OrderPayMatch)
    orderResultStream.print("payed")
    orderResultStream.getSideOutput(orderTimeoutOutputTag).print("time out order")
    env.execute("order timeout without cep job")
  }

  class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]() {
    lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))
    lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-state", classOf[Long]))

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
      val isPayed = isPayedState.value()
      if (isPayed) {
        ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "payed but no create"))
      } else {
        //Only create, but no pay
        ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
      }
      isPayedState.clear()
      timerState.clear()
    }

    override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
      val isPayed = isPayedState.value()
      val timerTs = timerState.value()
      if (value.eventType == "create") {
        if (isPayed) {
          out.collect(OrderResult(value.orderId, "payed successfully"))
          ctx.timerService().deleteEventTimeTimer(timerTs)
          isPayedState.clear()
          timerState.clear()
        } else {
          val ts = value.eventTime * 1000L + 15 * 60 * 1000L
          ctx.timerService().registerEventTimeTimer(ts)
          timerState.update(ts)
        }
      } else if (value.eventType == "pay") {
          if (timerTs > 0) {
            if (timerTs > value.eventTime * 1000L) {
              out.collect(OrderResult(value.orderId, "payed successfully"))
            } else {
              ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "this order is timeout"))
            }

            ctx.timerService().deleteEventTimeTimer(timerTs)
            isPayedState.clear()
            timerState.clear()
          } else {
            //pay first
            isPayedState.update(true)
            ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L)
            timerState.update(value.eventTime * 1000L)
          }
      }
    }
  }
}
           

總結

有狀态計算是Flink的一個很好特性,在一些場景下如累加計算pv,uv等,不用在項目中引用外部存儲如redis等,架構上更簡單,更易于維護。

參考:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
  2. 《大資料技術之電商使用者行為分析》

作者:chaplinthink

出處:https://www.cnblogs.com/bigdata1024/p/16276691.html

本文以學習、研究和分享為主,如需轉載,請聯系本人,标明作者和出處,非商業用途!

繼續閱讀