天天看點

Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join

聲明:本系列部落格是根據SGG的視訊整理而成,非常适合大家入門學習。

《2021年最新版大資料面試題全面開啟更新》

簡介

之前文章中提到JoinedStream與CoGroupedStream,例如下列代碼:

dataStream.join(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }      

由于joinedStream與coGroupedStream來自于一個特定的window,且在一個關聯上的key中實作,是以,Flink中的雙流join一定是在一個基于Inner Join的key的前提下的操作。

雙流Join中的Inner、Left、Right Join操作,實際上是指在特定的window範圍内的join。即Join的主體是window範圍,如果視窗内都沒有資料,則不發生join。

具體實作

這裡我通過2個Socket接收資料,模拟雙流,共3個參數,代碼如下:

Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join
if (args.length != 3) {
    System.err.println("USAGE:\nSocketTextStreamJoinType <hostname> <port1> <port2>")
    return
}

val hostName = args(0)
val port1 = args(1).toInt
val port2 = args(2).toInt      
Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join

接下來,我們建立了2個case class,來模拟2個socket的輸入流資料,代碼如下:

case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)      

最後要注意的地方就是如何實作Inner Join、Left Join與Right Join了。這裡采用coGroup方式,通過對coGroupFunction中的2個Iterable集合判斷是否為空來實作,例如:

Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join
if(scalaT1.nonEmpty && scalaT2.nonEmpty){
    for(transaction <- scalaT1){
        for(snapshot <- scalaT2){
        out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
        }
    }
    }
}      
Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join

完整的代碼示例

Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join
package wikiedits

import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object InnerLeftRightJoinTest {

  // *************************************************************************
  // PROGRAM
  // *************************************************************************

  def main(args : Array[String]) : Unit ={
    if (args.length != 3) {
      System.err.println("USAGE:\nSocketTextStreamJoinType <hostname> <port1> <port2>")
      return
    }

    val hostName = args(0)
    val port1 = args(1).toInt
    val port2 = args(2).toInt

    /**
      * 擷取執行環境以及TimeCharacteristic
      */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream1 = env.socketTextStream(hostName, port1)
    val dataStream2 = env.socketTextStream(hostName, port2)

    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

    /**
      * operator操作
      * 資料格式如下:
      *           TX:2016-07-28 13:00:01.000,000002,10.2
      *           MD: 2016-07-28 13:00:00.000,000002,10.1
      *  這裡由于是測試,固水位線采用升序(即資料的Event Time本身是升序輸入的)
      */
    val dataStreamMap1 = dataStream1.map(f => {
      val tokens1 = f.split(",")
      StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
    })
      .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)

    val dataStreamMap2 = dataStream2.map(f => {
      val tokens2 = f.split(",")
      StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
    })
      .assignAscendingTimestamps(f => format.parse(f.md_time).getTime)

    /**
      * Join操作
      * 限定範圍是3秒鐘的Event Time視窗
      */
    val joinedStream = dataStreamMap1
      .coGroup(dataStreamMap2)
      .where(_.tx_code)
      .equalTo(_.md_code)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    val innerJoinedStream = joinedStream.apply(new InnerJoinFunction)
    val leftJoinedStream = joinedStream.apply(new LeftJoinFunction)
    val rightJoinedStream = joinedStream.apply(new RightJoinFunction)

    innerJoinedStream.name("InnerJoinedStream").print()
    leftJoinedStream.name("LeftJoinedStream").print()
    rightJoinedStream.name("RightJoinedStream").print()

    env.execute("3 Type of Double Stream Join")
  }


  // *************************************************************************
  // USER FUNCTIONS
  // *************************************************************************

  case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
  case class StockSnapshot(md_time:String, md_code:String,md_value:Double)

  class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {

      /**
        * 将Java中的Iterable對象轉換為Scala的Iterable
        * scala的集合操作效率高,簡潔
        */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList

      /**
        * Inner Join要比較的是同一個key下,同一個時間視窗内的資料
        */
      if(scalaT1.nonEmpty && scalaT2.nonEmpty){
        for(transaction <- scalaT1){
          for(snapshot <- scalaT2){
            out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
          }
        }
      }
    }
  }

  class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
      /**
        * 将Java中的Iterable對象轉換為Scala的Iterable
        * scala的集合操作效率高,簡潔
        */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList

      /**
        * Left Join要比較的是同一個key下,同一個時間視窗内的資料
        */
      if(scalaT1.nonEmpty && scalaT2.isEmpty){
        for(transaction <- scalaT1){
          out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
        }
      }
    }
  }

  class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
      /**
        * 将Java中的Iterable對象轉換為Scala的Iterable
        * scala的集合操作效率高,簡潔
        */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList

      /**
        * Right Join要比較的是同一個key下,同一個時間視窗内的資料
        */
      if(scalaT1.isEmpty && scalaT2.nonEmpty){
        for(snapshot <- scalaT2){
          out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
        }
      }
    }
  }
}


/**
  * 用于測試的資料
  */

/**
  * Transaction:
  * 2016-07-28 13:00:01.820,000001,10.2
  * 2016-07-28 13:00:01.260,000001,10.2
  * 2016-07-28 13:00:02.980,000001,10.1
  * 2016-07-28 13:00:03.120,000001,10.1
  * 2016-07-28 13:00:04.330,000001,10.0
  * 2016-07-28 13:00:05.570,000001,10.0
  * 2016-07-28 13:00:05.990,000001,10.0
  * 2016-07-28 13:00:14.000,000001,10.1
  * 2016-07-28 13:00:20.000,000001,10.2
  */

/**
  * Snapshot:
  * 2016-07-28 13:00:01.000,000001,10.2
  * 2016-07-28 13:00:04.000,000001,10.1
  * 2016-07-28 13:00:07.000,000001,10.0
  * 2016-07-28 13:00:16.000,000001,10.1
  */      
Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join

測試

首先,開啟2個socket接口,分别使用9998和9999端口:

root@master:~# nc -lk 9998      

r> oot@master:~# nc -lk 9999

其次,打包程式,釋出到叢集:

mvn clean package      
Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join

 之後,在socket中模拟輸入資料:

Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join
root@master:~# nc -lk 9998
2016-07-28 13:00:01.820,000001,10.2
2016-07-28 13:00:01.260,000001,10.2
2016-07-28 13:00:02.980,000001,10.1
2016-07-28 13:00:04.330,000001,10.0
2016-07-28 13:00:05.570,000001,10.0
2016-07-28 13:00:05.990,000001,10.0
2016-07-28 13:00:14.000,000001,10.1
2016-07-28 13:00:20.000,000001,10.2      
Flink實戰(101):雙流join(三)雙流中實作Inner Join、Left Join與Right Join
root@master:~# nc -lk 9999
2016-07-28 13:00:01.000,000001,10.2
2016-07-28 13:00:04.000,000001,10.1
2016-07-28 13:00:07.000,000001,10.0
2016-07-28 13:00:16.000,000001,10.1      

最後,看一下輸出:

(000001,2016-07-28 13:00:01.820,2016-07-28 13:00:01.000,10.2,10.2,Inner Join Test)
(000001,2016-07-28 13:00:01.260,2016-07-28 13:00:01.000,10.2,10.2,Inner Join Test)
(000001,2016-07-28 13:00:02.980,2016-07-28 13:00:01.000,10.1,10.2,Inner Join Test)
(000001,2016-07-28 13:00:04.330,2016-07-28 13:00:04.000,10.0,10.1,Inner Join Test)
(000001,2016-07-28 13:00:05.570,2016-07-28 13:00:04.000,10.0,10.1,Inner Join Test)
(000001,2016-07-28 13:00:05.990,2016-07-28 13:00:04.000,10.0,10.1,Inner Join Test)
(000001,2016-07-28 13:00:14.000,,10.1,0.0,Left Join Test)
(000001,,2016-07-28 13:00:07.000,0.0,10.0,Right Join Test)      

總結

繼續閱讀