天天看点

Flink 旁路输出(分流)一、背景二、官方常用的几种方法三、示范生产实践

官方文档-关于分流

最好大家还是看官方文档,我只是当一个搬运工

一、背景

在一些业务场景中,一个流中可能有多种类型的数据,比如订单:有线上订单,有线下订单。当需要将不同类型的数据进行分别处理,比如 写入到不同的数据表或者join 不同的其他流时,这个时候使用分流就比较合适。

二、官方常用的几种方法

三、示范

本文只详细介绍最常用 process 分流 ,道理都是相通的

直接上代码伪码,大家主要要理解,而不是直接复制代码

//这是订单source,最原始的流
val orderSource =  这是你构建source 的方法

  //创建线上订单 tag
    val onlineOrderTag = new OutputTag[JSONObject]("onlineOrder")
   //创建线下订单 tag
    val offlineOrderTag = new OutputTag[JSONObject]("offlineOrder")

   // 这个sideOutStream 就是分流之后的流对象
   val sideOutStream = orderSource
      .filter(new PaymentFilter)  // 这里是一个过滤逻辑,如果你没有可以不过滤
       // 这个process 就是分流的操作了
      .process(new ProcessFunction[String, JSONObject] {
        override def processElement(orderString: String, ctx: ProcessFunction[String, JSONObject]#Context, out: Collector[JSONObject]): Unit = {
          val outOrder = JSON.parseObject(orderString)
        
          //通过收银员信息判断是否属于线下订单
          if (!outOrder.containsKey("cashier_id") || StringUtils.isBlank(outOrder.getString("cashier_id"))) {
            ctx.output(onlineOrderTag, outOrder)
          } else {
            ctx.output(offlineOrderTag, outOrder)
          }
        }
      }
      )


val onlineStream = sideOutStream.getSideOutput(onlineOrderTag)

val offlineStream = sideOutStream.getSideOutput(offlineOrderTag)

// 流已经分好了,后面是sink 还是 去干其他的,就看你的业务逻辑了
onlineStream.addSink()

offlineStream.addSink()
           

生产实践

下图是真实生产的一个DAG图

内部使用了分流, join ,自定义剔除器 等满足业务需求

后面会更新 join 和 自定义剔除器 trigger 等 实战场景,感兴趣的朋友可以加个关注哟

Flink 旁路输出(分流)一、背景二、官方常用的几种方法三、示范生产实践

继续阅读