天天看點

Flink 旁路輸出(分流)一、背景二、官方常用的幾種方法三、示範生産實踐

官方文檔-關于分流

最好大家還是看官方文檔,我隻是當一個搬運工

一、背景

在一些業務場景中,一個流中可能有多種類型的資料,比如訂單:有線上訂單,有線下訂單。當需要将不同類型的資料進行分别處理,比如 寫入到不同的資料表或者join 不同的其他流時,這個時候使用分流就比較合适。

二、官方常用的幾種方法

三、示範

本文隻詳細介紹最常用 process 分流 ,道理都是相通的

直接上代碼僞碼,大家主要要了解,而不是直接複制代碼

//這是訂單source,最原始的流
val orderSource =  這是你建構source 的方法

  //建立線上訂單 tag
    val onlineOrderTag = new OutputTag[JSONObject]("onlineOrder")
   //建立線下訂單 tag
    val offlineOrderTag = new OutputTag[JSONObject]("offlineOrder")

   // 這個sideOutStream 就是分流之後的流對象
   val sideOutStream = orderSource
      .filter(new PaymentFilter)  // 這裡是一個過濾邏輯,如果你沒有可以不過濾
       // 這個process 就是分流的操作了
      .process(new ProcessFunction[String, JSONObject] {
        override def processElement(orderString: String, ctx: ProcessFunction[String, JSONObject]#Context, out: Collector[JSONObject]): Unit = {
          val outOrder = JSON.parseObject(orderString)
        
          //通過收銀員資訊判斷是否屬于線下訂單
          if (!outOrder.containsKey("cashier_id") || StringUtils.isBlank(outOrder.getString("cashier_id"))) {
            ctx.output(onlineOrderTag, outOrder)
          } else {
            ctx.output(offlineOrderTag, outOrder)
          }
        }
      }
      )


val onlineStream = sideOutStream.getSideOutput(onlineOrderTag)

val offlineStream = sideOutStream.getSideOutput(offlineOrderTag)

// 流已經分好了,後面是sink 還是 去幹其他的,就看你的業務邏輯了
onlineStream.addSink()

offlineStream.addSink()
           

生産實踐

下圖是真實生産的一個DAG圖

内部使用了分流, join ,自定義剔除器 等滿足業務需求

後面會更新 join 和 自定義剔除器 trigger 等 實戰場景,感興趣的朋友可以加個關注喲

Flink 旁路輸出(分流)一、背景二、官方常用的幾種方法三、示範生産實踐

繼續閱讀