天天看點

Flink 筆記Apache Flink

Apache Flink

概述

Flink是建構在Data Stream之上一款有狀态計算架構。由于該款架構出現的較晚2014.12月釋出,通常被人們認為是第3代流計算架構。

第一代

:MapReduce 2006年 批 磁盤 M->R 矢量 | 2014.9 Storm誕生 流 延遲低/吞吐小

第二代

:Spark RDD 2014.2 批 記憶體 DAG (若幹Stage) | 使用micro-batch 模拟 流處理 DStream 延遲高/吞吐大

第三代

Flink Datastream 2014.12 流計算 記憶體 Datafollow Graph(若幹個Task) | Flink Dataset在流計算建構批處理

流處理應用領域:風險控制/智能交通/疾病預測/網際網路金融/…

Flink 架構

宏觀戰略

Flink 筆記Apache Flink

Flink VS Spark

Spark計算核心是建構在RDD的批處理之上,通過批模拟流計算。而Flink建構流處理之上,通過流模拟批。

Flink計算架構

Flink 筆記Apache Flink

JobManagers- 所謂Master ,負責協調分布式任務執行。 負責排程任務,協調checkpoint,協調故障恢複等。

There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of which one is always the leader, and the others are standby.

TaskManagers- 所謂slaves(工作節點/Worker),負責真正任務執行,執行一些Task(等價Spark Stage)下的subtask。負責流計算當中資料緩存或者資料shuffle.計算機節點連接配接JobManager彙報自身狀态資訊,并且告知主節點自己配置設定到任務的計算狀态。

There must always be at least one TaskManager.

client - 主要是在任務計算之前将任務翻譯成Dataflow Graph,将該Dataflow Graph送出給JobManagers。

Task - Flink會将任務通過Operator Chain的方式将一個任務劃分為若幹個Task,每個Task都有自己的并行度,根據設定并行度建立相應的subtask(線程)。通過Operator Chain可以減少線程-線程間通信成本和系統開銷。

Flink 筆記Apache Flink

**Task Slots ** - 每個Task Slot代表TaskManager 計算資源子集。Task Slot可以均分TaskManager 的記憶體。比如說一個TaskManager 有3個Task Slot.則每個Task slot就代表1/3的記憶體空間。不同job的subtask之間可以通過Task Slot進行隔離。同一個Job的不同task的subtask可以共享Task slots。預設所有的subtask因為共享的是同一個資源組

default

,是以一個Job所需的Task Slots的數量就等于該Job下Task的最大并行度。

Flink環境搭建

  • 設定CentOS程序數和檔案數(重新開機生效) -可選
[[email protected] ~]# vi /etc/security/limits.conf

* soft nofile 204800
* hard nofile 204800
* soft nproc 204800
* hard nproc 204800
           
  • 配置主機名(重新開機生效)
[[email protected] ~]# cat /etc/hostname
Spark
           
  • 設定IP映射
[[email protected] ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.11.100  Spark
           
  • 防火牆服務
[[email protected] ~]# systemctl stop firewalld
[[email protected] ~]# systemctl disable firewalld
[[email protected] ~]# firewall-cmd --state
not running
           
  • 安裝JDK1.8+
[[email protected] ~]# rpm -ivh jdk-8u171-linux-x64.rpm 
[[email protected] ~]# ls -l /usr/java/
total 4
lrwxrwxrwx. 1 root root   16 Mar 26 00:56 default -> /usr/java/latest
drwxr-xr-x. 9 root root 4096 Mar 26 00:56 jdk1.8.0_171-amd64
lrwxrwxrwx. 1 root root   28 Mar 26 00:56 latest -> /usr/java/jdk1.8.0_171-amd64
[[email protected] ~]# vi .bashrc 
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
[[email protected] ~]# source ~/.bashrc
           
  • SSH配置免密
[[email protected] ~]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa): 
Created directory '/root/.ssh'.
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
4b:29:93:1c:7f:06:93:67:fc:c5:ed:27:9b:83:26:c0 [email protected]
The key's randomart image is:
+--[ RSA 2048]----+
|                 |
|         o   . . |
|      . + +   o .|
|     . = * . . . |
|      = E o . . o|
|       + =   . +.|
|        . . o +  |
|           o   . |
|                 |
+-----------------+
[[email protected] ~]# ssh-copy-id CentOS
The authenticity of host 'centos (192.168.40.128)' can't be established.
RSA key fingerprint is 3f:86:41:46:f2:05:33:31:5d:b6:11:45:9c:64:12:8e.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'centos,192.168.40.128' (RSA) to the list of known hosts.
[email protected]'s password: 
Now try logging into the machine, with "ssh 'CentOS'", and check in:

  .ssh/authorized_keys

to make sure we haven't added extra keys that you weren't expecting.
[[email protected] ~]# ssh [email protected]
Last login: Tue Mar 26 01:03:52 2019 from 192.168.40.1
[[email protected] ~]# exit
logout
Connection to CentOS closed.
           
  • 安裝配置Flink
[[email protected] ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
[[email protected] ~]# cd /usr/flink-1.8.1/
[[email protected] ~]# vi conf/flink-conf.yaml
jobmanager.rpc.address: Spark
taskmanager.numberOfTaskSlots: 3

[[email protected] ~]#  vi conf/slaves
Spark
[[email protected] ~]# ./bin/start-cluster.sh

           

Quick Start

  • 引入依賴
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
           
  • Quick start
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

//3.對資料做轉換 - operator
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()

//執行計算
fsEnv.execute("FlinkWordCountsQuickStart")
           
将程式打包 ,通過UI頁面或者./bin/flink run執行
[[email protected] flink-1.8.1]# ./bin/flink run 
							-c com.baizhi.quickstart.FlinkWordCountsQuickStart 
							-p 3 
							/root/flink-1.0-SNAPSHOT-jar-with-dependencies.jar
           
[[email protected] flink-1.8.1]# ./bin/flink list -m Spark:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
13.11.2019 16:49:36 : 36e8f1ec3173ccc2c5e1296d1564da87 : FlinkWordCountsQuickStart (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[[email protected] flink-1.8.1]# ./bin/flink cancel -m Spark:8081 36e8f1ec3173ccc2c5e1296d1564da87
Cancelling job 36e8f1ec3173ccc2c5e1296d1564da87.
Cancelled job 36e8f1ec3173ccc2c5e1296d1564da87.
           

DataSource

DataSource指定了流計算的輸入,使用者可以通過

StreamExecutionEnvironment.addSource(sourceFunction)

,Flink已經預先實作了一些DataSource的實作,如果使用者需要自定義自己的實作可以通過實作

SourceFunction

接口(非并行Source)或者

ParallelSourceFunction

接口(實作并行Source)或者繼承

RichParallelSourceFunction

.

File Based(了解)

readTextFile(path)

- 讀取文本檔案,底層通過

TextInputFormat

一行行讀取檔案資料,傳回是一個DataStream[String] - 僅僅處理一次

//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val filePath="file:///D:\\data"
val dataStream: DataStream[String] = fsEnv.readTextFile(filePath)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

readFile(fileInputFormat, path)

- 讀取文本檔案,底層通過指定輸入格式 - 僅僅處理一次

//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val filePath="file:///D:\\data"
val inputFormat = new TextInputFormat(null)
val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

readFile(fileInputFormat, path, watchType, interval, pathFilter)

- 以上兩個方法底層調用都是該方法。

//1.建立StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream -細化
    val filePath="file:///D:\\data"
    val inputFormat = new TextInputFormat(null)

    inputFormat.setFilesFilter(new FilePathFilter {
      override def filterPath(path: Path): Boolean = {
        if(path.getName().startsWith("1")){ //過濾不符合的檔案
          return true
        }
        false
      }
    })
    val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath,
      FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
    //3.對資料做轉換
    dataStream.flatMap(_.split("\\s+"))
      .map((_,1))
      .keyBy(0)
      .sum(1)
      .print()

    fsEnv.execute("FlinkWordCountsQuickStart")

           
定期的掃描檔案,如果檔案内容被修改了,該檔案會被完整的重新讀取。是以可能會産生重複計算。

Collection(測試)

//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.fromCollection(List("this is a demo","hello world"))
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

自定義SourceFunction(掌握)

class UserDefineParallelSourceFunction extends ParallelSourceFunction[String]{

  val lines=Array("this is a demo","hello world","hello flink")
  @volatile
  var isRunning=true
  //運作
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(1000)
      sourceContext.collect(lines(new Random().nextInt(lines.length)))
    }
  }
  //取消
  override def cancel(): Unit = {
    isRunning=false
  }
}         
           
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.addSource(new UserDefineParallelSourceFunction)
dataStream.setParallelism(10)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Flink和Kafka Source(重點)

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
           
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val props = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  "Spark:9092")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")

val flinkKafkaConsumer = new FlinkKafkaConsumer[String]("topic01",
                                                        new SimpleStringSchema(),props)

val dataStream: DataStream[String] = fsEnv.addSource(flinkKafkaConsumer)
dataStream.setParallelism(10)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           
隻能擷取value資訊,如果使用者需要擷取key/offset/partition資訊使用者需要定制

KafkaDeserializationSchema

擷取Record中繼資料資訊

class UserDefineKafkaDeserializationSchema
extends KafkaDeserializationSchema[(Int,Long,String,String,String)]{
    override def isEndOfStream(t: (Int, Long, String, String, String)): Boolean = {
        return false;
    }

    override def deserialize(r: ConsumerRecord[Array[Byte], Array[Byte]]): (Int, Long, String, String, String) = {
        if(r.key()==null){
            (r.partition(),r.offset(),r.topic(),"",new String(r.value()))
        }else{
            (r.partition(),r.offset(),r.topic(),StringUtils.arrayToString(r.key()),new String(r.value()))
        }
    }
    //告知傳回值類型
    override def getProducedType: TypeInformation[(Int, Long, String, String, String)] = {
        createTypeInformation[(Int, Long, String, String, String)]
    }
}

           
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val props = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  "Spark:9092")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")

val flinkKafkaConsumer = new FlinkKafkaConsumer[(Int,Long,String,String,String)]("topic01",
                                                                                 new UserDefineKafkaDeserializationSchema(),props)

val dataStream: DataStream[(Int,Long,String,String,String)] = fsEnv.addSource(flinkKafkaConsumer)

dataStream.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Data Sink

Data sinks負責消費Data Stream的資料,将資料寫出到外圍系統,例如:檔案/網絡/NoSQL/RDBMS/Message Queue等。Flink底層也預定義了一些常用的Sinks,同時使用者也可以根據實際需求定制Data Sink通過內建SinkFunction或者RichSinkFunction。

File Based(測試)

  • writeAsText()|writeAsCsv(…)|writeUsingOutputFormat()

    at-least-once

//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val output = new CsvOutputFormat[Tuple2[String, Int]](new Path("file:///D:/fink-results"))
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .map(t=> new Tuple2(t._1,t._2))
    .writeUsingOutputFormat(output)

fsEnv.execute("FlinkWordCountsQuickStart")
           
  • Bucketing File Sink (exactly-once)
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>1.8.1</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.9.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.9.2</version>
</dependency>
           
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val bucketSink = new BucketingSink[String]("hdfs://Spark:9000/bucketSink")
bucketSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH", ZoneId.of("Asia/Shanghai")))

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.map(t=>t._1+"\t"+t._2)
.addSink(bucketSink)

fsEnv.execute("FlinkWordCountsQuickStart")
           

print()/ printToErr()

//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print("error")

fsEnv.execute("FlinkWordCountsQuickStart")
           

自定義Sink(熟練)

class UserDefineRichSinkFunction extends RichSinkFunction[(String,Int)]{
    override def open(parameters: Configuration): Unit = {
        println("open")
    }
    override def invoke(value: (String, Int)): Unit = {
        println("insert into xxx "+value)
    }

    override def close(): Unit = {
        println("close")
    }
}
           
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",7788)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new UserDefineRichSinkFunction)

fsEnv.execute("FlinkWordCountsQuickStart")
           

Redis Sink(掌握)

參考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
           
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment


val conf = new FlinkJedisPoolConfig.Builder()
.setHost("Spark")
.setPort(6379).build()

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",7788)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new RedisSink(conf,new UserDefineRedisMapper))

fsEnv.execute("FlinkWordCountsQuickStart")
           
class UserDefineRedisMapper extends RedisMapper[(String,Int)]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"word-count")
  }

  override def getKeyFromData(t: (String, Int)): String = {
    t._1
  }

  override def getValueFromData(t: (String, Int)): String = {
    t._2.toString
  }
}
           
在安裝Redis如果通路不到,需要關閉Redis

protect-model:no

Kafka Sink(掌握)

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.8.1</version>
</dependency>
           
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val props = new Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"Spark:9092")
//不建議覆寫
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[ByteArraySerializer])
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[ByteArraySerializer])


props.put(ProducerConfig.RETRIES_CONFIG,"3")
props.put(ProducerConfig.ACKS_CONFIG,"-1")
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
props.put(ProducerConfig.BATCH_SIZE_CONFIG,"100")
props.put(ProducerConfig.LINGER_MS_CONFIG,"500")


//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",7788)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new FlinkKafkaProducer[(String, Int)]("topicxx",
                                               new UserDefineKeyedSerializationSchema,
                                               props))

fsEnv.execute("FlinkWordCountsQuickStart")
           
class UserDefineKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
  override def serializeKey(t: (String, Int)): Array[Byte] = {
    t._1.getBytes()
  }

  override def serializeValue(t: (String, Int)): Array[Byte] = {
    t._2.toString.getBytes()
  }

  override def getTargetTopic(t: (String, Int)): String = "topic01"
}

           

Operator(會用)

DataStream Transformations

Datastream -> Datasteam

Map

Takes one element and produces one element. A map function that doubles the values of the input stream:

dataStream.map { x => x * 2 }
           
FlatMap

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap { str => str.split(" ") }
           
Filter

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter { _ != 0 }
           
Union

Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2,  ...)
           

DataStream,DataStream → ConnectedStreams

Connect

“Connects” two data streams retaining their types, allowing for shared state between the two streams.

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
           
CoMap, CoFlatMap

Similar to map and flatMap on a connected data stream

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)
           

案例小節

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val s1 = fsEnv.socketTextStream("Spark",9999)
val s2 = fsEnv.socketTextStream("Spark",8888)
s1.connect(s2).flatMap(
    (line:String)=>line.split("\\s+"),//s1流轉換邏輯
    (line:String)=>line.split("\\s+")//s2流轉換邏輯
)
.map((_,1))
.keyBy(0)
.sum(1)
.print()

fsEnv.execute("ConnectedStream")
           

DataStream → SplitStream

Split

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)
           
Select

Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
           

案例小節(過期了)

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val logStream = fsEnv.socketTextStream("Spark",9999)

val splitStream: SplitStream[String] = logStream.split(new OutputSelector[String] {
    override def select(out: String): lang.Iterable[String] = {
        if (out.startsWith("INFO")) {
            val array = new util.ArrayList[String]()
            array.add("info")
            return array
        } else  {
            val array = new util.ArrayList[String]()
            array.add("error")
            return array
        }
    }
})

splitStream.select("info").print("info")
splitStream.select("error").printToErr("error")

fsEnv.execute("ConnectedStream")
           

用法二(優先)

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

val logStream = fsEnv.socketTextStream("Spark",9999)


val errorTag = new OutputTag[String]("error")

val dataStream = logStream.process(new ProcessFunction[String, String] {
    override def processElement(line: String,
                                context: ProcessFunction[String, String]#Context,
                                collector: Collector[String]): Unit = {
        if (line.startsWith("INFO")) {
            collector.collect(line)
        }else{
            context.output(errorTag,line)//分支輸出
        }
    }
})

dataStream.print("正常資訊")
dataStream.getSideOutput(errorTag).print("錯誤資訊")

fsEnv.execute("ConnectedStream")
           

DataStream → KeyedStream

KeyBy

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
           
Reduce

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

keyedStream.reduce { _ + _ }
           
Fold

A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
           
Aggregations

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
           

Physical partitioning

Flink提供了一些分區方案,可供使用者選擇,分區目的是為了任務之間資料的能夠均衡分布。

分區方案 說明
Custom partitioning

需要使用者實作分區政策

dataStream.partitionCustom(partitioner, “someKey”)

Random partitioning

将目前的資料随機配置設定給下遊任務

dataStream.shuffle()

Rebalancing (Round-robin partitioning)

輪詢将上遊的資料均分下遊任務

dataStream.rebalance()

Rescaling

縮放分區資料,例如上遊2個并行度/下遊4個 ,上遊會将1個分區的資料發送給下遊前兩個分區,後1個分區,會發送下遊後兩個。

dataStream.rescale()

Broadcasting

上遊會将分區所有資料,廣播給下遊的所有任務分區。

dataStream.broadcast()

Task chaining and resource groups(了解)

連接配接兩個Operator 轉換,嘗試将兩個Operator 轉換放置到一個線程當中,可以減少線程消耗,避免不必要的線程通信。使用者可以通過

StreamExecutionEnvironment.disableOperatorChaining()

禁用chain操作。

val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

    //3.對資料做轉換
    dataStream.filter(line => line.startsWith("INFO"))
    .flatMap(_.split("\\s+"))
    .map((_,1))
    .map(t=>WordPair(t._1,t._2))
    .print()
           
Flink 筆記Apache Flink

為了友善,Flink提供一下算子用于修改chain的行為

算子 操作 說明
Start new chain someStream.filter(…).map(…).startNewChain().map(…) 開啟新chain,将目前算子和filter斷開
Disable chaining someStream.map(…).disableChaining() 目前算子和前後都要斷開chain操作
Set slot sharing group someStream.filter(…).slotSharingGroup(“name”) 設定操作任務所屬資源Group,影響任務對TaskSlots占用。
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

//3.對資料做轉換
dataStream.filter(line => line.startsWith("INFO"))
.flatMap(_.split("\\s+"))
.startNewChain()
.slotSharingGroup("g1")
.map((_,1))
.map(t=>WordPair(t._1,t._2))
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

State & Fault Tolerance(重點)

Flink将流計算的狀态分為兩類:

Keyed Sate

\

Opertator State

.其中Keyed Sate狀态是操作符中key綁定的,而 Operator State隻可以和操作符綁定。無論是Keyed state還是Operator State Flink對狀态的管理分為兩種形式 Managed State 和 Raw Sate。

Managed State - 由于狀态處于被管理,是以狀态結構和資訊都是被Flink預制好的,是以使用Managed State Flink可以更好的對存儲做優化。

Raw Sate - 該狀态是原生的資料,隻有在使用者自定義Operator實作的時候,才開會用到,并且Flink在存儲原生狀态的時候,僅僅存儲了位元組數組,是以Flink無法擷取有關注狀态任何資訊,是以在實際的開發中基本不用。

All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. Using managed state (rather than raw state) is recommended, since with managed state Flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management.

Managed Keyed State

針對于Keyed Sate狀态flink提供了豐富的狀态變量,一遍使用者完成狀态存儲。目前有以下幾種狀态:

類型 說明 方法
ValueState 這個狀态主要存儲一個可以用作更新的值

update(T)

T value()

clear()

ListState 這将存儲List集合元素

add(T)

addAll(List)

Iterable

get()

update(List)

clear()

ReducingState

這将保留一個值,該值表示添加到狀态的所有值的彙總

需要使用者提供ReduceFunction

add(T)

T get()

clear()

AggregatingState<IN, OUT>

這将保留一個值,該值表示添加到狀态的所有值的彙總

需要使用者提供AggregateFunction

add(IN)

T get()

clear()

FoldingState<T, ACC>

這将保留一個值,該值表示添加到狀态的所有值的彙總

需要使用者提供FoldFunction

add(IN)

T get()

clear()

MapState<UK, UV> 這個狀态會保留一個Map集合元素

put(UK, UV)

putAll(Map<UK, UV>)

entries()

keys()

values()

clear()

如果想拿到一個State的引用,必須建立相應SateDescriptor,Flink提供了以下的以下SateDescriptor

ValueStateDescriptor

,

ListStateDescriptor

,

ReducingStateDescriptor

,

FoldingStateDescriptor

MapStateDescriptor

,

AggregatingStateDescriptor

建立完SateDescriptor使用者需要在Rich Function擷取RuntimeConext對象,然後調用該對象的相應方法擷取Sate對象

  • ValueState<T> getState(ValueStateDescriptor<T>)

  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)

  • ListState<T> getListState(ListStateDescriptor<T>)

  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)

  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)

  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

ValueState

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.disableOperatorChaining()//
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var valueState:ValueState[Int]=_

    override def open(parameters: Configuration): Unit = {
        val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])
        valueState= getRuntimeContext.getState(vsd)
    }

    override def map(value: (String, Int)): (String, Int) = {
        var historyValue = valueState.value()
        if(historyValue==null){
            historyValue=0
        }
        //更新曆史
        valueState.update(historyValue+value._2)
        (value._1,valueState.value())
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

ReduceState

//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.disableOperatorChaining()//
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var reduceState:ReducingState[Int]=_

    override def open(parameters: Configuration): Unit = {
        val rsd = new ReducingStateDescriptor[Int]("wordcount",new ReduceFunction[Int] {
            override def reduce(value1: Int, value2: Int): Int = value1+value2
        },createTypeInformation[Int])
        reduceState= getRuntimeContext.getReducingState(rsd)
    }

    override def map(value: (String, Int)): (String, Int) = {
        reduceState.add(value._2)
        (value._1,reduceState.get())
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

AggregatingState

//1.建立StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    fsEnv.disableOperatorChaining()//
    //2.建立DataStream -細化
    val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
    //3.對資料做轉換 1 zhangsan 銷售部 10000
    dataStream.map(_.split("\\s+"))
      .map(ts=>Employee(ts(0),ts(1),ts(2),ts(3).toDouble))
      .keyBy("dept")
      .map(new RichMapFunction[Employee,(String,Double)] {
        var aggregatingState:AggregatingState[Double,Double]= _

        override def open(parameters: Configuration): Unit = {
          val asd=new AggregatingStateDescriptor[Double,(Double,Int),Double]("agggstate",
            new AggregateFunction[Double,(Double,Int),Double] {
              override def createAccumulator(): (Double, Int) = (0.0,0)

              override def add(value: Double, accumulator: (Double, Int)): (Double, Int) = {
                var total=accumulator._1
                var count=accumulator._2
                (total+value,count+1)
              }
              override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = {
                (a._1+b._1,a._2+b._2)
              }
              override def getResult(accumulator: (Double, Int)): Double = {
                accumulator._1/accumulator._2
              }

            }
            ,createTypeInformation[(Double,Int)])

          aggregatingState=getRuntimeContext.getAggregatingState(asd)
        }

        override def map(value: Employee): (String, Double) = {
          aggregatingState.add(value.salary)
          (value.dept,aggregatingState.get())
        }
      })
      .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")
           

List State

//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換 zhangsan 123456
dataStream.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1)))
.keyBy(0)
.map(new RichMapFunction[(String,String),String] {
    var historyPasswords:ListState[String]=_

    override def open(parameters: Configuration): Unit = {
        val lsd = new ListStateDescriptor[String]("pwdstate",createTypeInformation[String])
        historyPasswords=getRuntimeContext.getListState(lsd)
    }
    override def map(value: (String, String)): String = {
        var list = historyPasswords.get().asScala.toList
        list= list.::(value._2)
        list = list.distinct //去重
        historyPasswords.update(list.asJava)

        value._1+"\t"+list.mkString(",")
    }
})
.print()
fsEnv.execute("FlinkWordCountsQuickStart")
}
           

Managed Operator State

如果使用者想去使用Operator State,使用者可以實作一個通用接口

CheckpointedFunction

或者實作

ListCheckpointed<T extends Serializable>

CheckpointFunction

目前CheckpointFunction僅僅支援List風格狀态,每個Operator執行個體維護者一個SubList,真個系統會将所有的Operator執行個體sublist進行邏輯拼接。在系統恢複的時候,系統可以在多個Operator執行個體中進行分發狀态,在狀态分發時,遵循兩種政策:Even-Split(均分)/Union(聯合|廣播)

public interface CheckpointedFunction {
    void snapshotState(FunctionSnapshotContext var1) throws Exception;
    void initializeState(FunctionInitializationContext var1) throws Exception;
}
           
class UserDefineBufferSink(threshold: Int = 0) extends SinkFunction[String] with CheckpointedFunction {
    @transient
    private var checkpointedState: ListState[String] = _
    private val bufferedElements = ListBuffer[String]()

    override def invoke(value: String): Unit = {
        bufferedElements += value
        println(bufferedElements.size+ " " + (bufferedElements.size >= threshold))
        if(bufferedElements.size == threshold){
            bufferedElements.foreach(item => println(item))
            bufferedElements.clear()
        }
    }
    //将資料儲存起來
    override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = {
        checkpointedState.clear()
        for(i <- bufferedElements){
            checkpointedState.add(i)
        }
    }

    override def initializeState(context: FunctionInitializationContext): Unit = {
        val descriptor = new ListStateDescriptor[String]("buffered-elements",createTypeInformation[String])
        checkpointedState = context.getOperatorStateStore.getListState(descriptor)
        if(context.isRestored){
            for(element <- checkpointedState.get().asScala) {
                bufferedElements += element
            }
        }
    }
}
           

ListCheckpointed

該接口是CheckpointFunction一個變體,僅僅

支援List style風格狀态的Even-Split方案

public interface ListCheckpointed<T extends Serializable> {
    //傳回值即是需要存儲的狀态
    List<T> snapshotState(long var1, long var3) throws Exception;
    //狀态初始化|恢複邏輯
    void restoreState(List<T> var1) throws Exception;
}
           
class UserDefineCounterSource extends RichParallelSourceFunction[Long] with ListCheckpointed[JLong]{

    @volatile
    private var isRunning = true
    private var offset = 0L

    override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {
        println("存儲:"+offset)
        Collections.singletonList(offset)//存儲的是 offset
    }

    override def restoreState(list: util.List[JLong]): Unit = {
        for (i <- list.asScala){
            println("狀态恢複:"+i)
            offset=i
        }
    }

    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        val lock = ctx.getCheckpointLock

        while (isRunning) {
            Thread.sleep(1000)
            lock.synchronized({
                ctx.collect(offset) //向下遊輸出offset
                offset += 1
            })

        }
    }
    override def cancel(): Unit =  isRunning=false
}
           
//1.建立StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

//2.建立DataStream -細化
val dataStream: DataStream[Long] = fsEnv.addSource(new UserDefineCounterSource)

dataStream.map(counter => "offset: "+counter)
.print()
fsEnv.execute("FlinkWordCountsQuickStart")
           

State Time-To-Live(TTL)

keyed state任意類型都可以指定TTL存活時間(配置狀态時效性),如果狀态配置TTL,并且該狀态已經失效了,Flink将盡最大努力清楚過期的狀态。TTL除了支援單一值的TTL時效,針對集合類型例如 MapState|ListState中的元素,每一個元素都有自己的TTL失效時間。

基本使用

val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(5))//狀态存活時間
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//更新時機 預設
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//永不反回過期資料 預設
.build
//開啟TTL特性
vsd.enableTimeToLive(ttlConfig)
           

Cleanup of Expired State

過期的資料當且僅當使用者在讀取State資料時候,系統才會将過期的資料删除。

This means that by default if expired state is not read, it won’t be removed, possibly leading to ever growing state.
  • Cleanup in full snapshot

系統在重新開機的時候,會去讀取磁盤中的備份的狀态資料,這個時候可以過濾掉過期的資料,但是在計算過程中依然無法剔除過期的資料,是以需要使用者定期的重新開機流計算服務,以釋放記憶體空間。

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(5))//狀态存活時間
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//更新時機
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//永不反回過期資料
    .cleanupFullSnapshot()
    .build
           
  • Cleanup in background

除了在啟動的時候在做full snapshot的時候清除過期資料,還可以在運作時,背景清理過期資料,可以使用

.cleanupInBackground()

開啟該特性,但是具體的清理政策,取決于使用者state backend後端的實作。

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(5))//狀态存活時間
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//更新時機
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//永不反回過期資料
    .cleanupFullSnapshot()
    .cleanupInBackground()//根據State backend實作,選擇預設清除政策
    .build
           

當然使用者如果知道自己的statebackend實作,可以自己定制後端清除政策,目前基于記憶體實作隻支援

Incremental cleanup

和基于RocksDB的

compaction filter

的清理政策.

Incremental cleanup

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(5))//狀态存活時間
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//更新時機
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//永不反回過期資料
    .cleanupFullSnapshot()
    // .cleanupInBackground()
    //一次增量處理10條 ,每一次有Record過來就清理,預設值 5,false
    .cleanupIncrementally(10,true)
    .build
    //開啟TTL特性
vsd.enableTimeToLive(ttlConfig)
           

Cleanup during RocksDB compaction

如果使用RocksDB狀态後端,則另一種清除政策是激活Flink特定的壓縮過濾器。 RocksDB定期運作異步壓縮以合并狀态更新并減少存儲。Flink壓縮過濾器使用TTL檢查狀态條目的到期時間戳記,并排除到期值。預設情況compaction特性禁用此功能。 首先必須通過設定Flink配置選項state.backend.rocksdb.ttl.compaction.filter.enabled或調用RocksDBStateBackend :: enableTtlCompactionFilter為RocksDB後端激活

Flink 筆記Apache Flink
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(5))//狀态存活時間
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//更新時機
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//永不反回過期資料
    .cleanupFullSnapshot()
    // .cleanupInBackground()
    //  .cleanupIncrementally(10,true)
    //當RocksDB 累計合并1000條記錄的時候,查詢一次過期的記錄,并且将過期的記錄清理掉
    .cleanupInRocksdbCompactFilter(1000)
    .build
//開啟TTL特性
vsd.enableTimeToLive(ttlConfig)
           

Broadcast State(狀态廣播)

在Flink中除了Operator Sate或者Keyed Sate,還存在第三種狀态,稱為廣播狀态,該廣播狀态可以将A流中的計算結果,廣播給B流。B流隻可以通過隻讀的方式讀取A流狀态。A流狀态可以在A流實時更新。

  • non-keyed :DataStream 連接配接 BroadcastStream
  • keyed:KeydStream 連接配接 BroadcastStream

Checkpoint & Savepoint

Chackpoint是一種機制,

Flink會定期

存儲流計算的狀态資訊,該檢查點的協調任務由JobManager負責協調。JobManager會定期給下遊的任務發送barrier(栅欄)信号給下遊的節點,下遊的任務收到barrier信号之後會預先送出自己的狀态,并且将該barrier繼續傳遞下遊,下遊接受信号後也會預先送出自己的狀态,并且會通知JobManager狀态持久化情況,隻有當所有下遊的狀态送出都是ok狀态時候,JobManager才會标記目前一次checkpoint是成功的。(自動觸發過程,無需人工幹預)

Savepoint一種手動觸發的checkpoint機制。需要人工幹預。

flink cancel --wirthSavepoint

預設checkpoint沒有開啟的,需要使用者去配置對應的job作業。
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

// checkpoint 頻率 12次/分鐘
fsEnv.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
// 每次Checkpoint時長不得超過4s
fsEnv.getCheckpointConfig.setCheckpointTimeout(4000)
// 此次chk距離上一次chk時間不得少于2s,同一時刻隻能有一個chk
fsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000);
// 如果用取消任務,但是沒有添加--withSavepoint,系統保留checkpoint資料
fsEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果檢查點恢複失敗,放棄任務執行
fsEnv.getCheckpointConfig.setFailOnCheckpointingErrors(true);

val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var valueState:ValueState[Int]=_

    override def open(parameters: Configuration): Unit = {
        val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])
        valueState= getRuntimeContext.getState(vsd)
    }

    override def map(value: (String, Int)): (String, Int) = {
        var historyValue = valueState.value()
        if(historyValue==null){
            historyValue=0
        }
        //更新曆史
        valueState.update(historyValue+value._2)
        (value._1,valueState.value())
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

State backend(狀态後端)

參考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html

MemoryStateBackend- 将快照資料存儲在JobManager的記憶體中,每個State大小,預設不得超過5MB,總的State大小不得大于JobManager的記憶體。一般用于開發測試階段,狀态資料比較小。

FsStateBackend- 将程式計算的狀态資料存儲在TaskManager的記憶體中,當系統做checkpoint的時候,系統會将資料異步寫進檔案系統。JobManager在記憶體中存儲少許的中繼資料資訊。一般用在生産環境,大state需要存儲。

RocksDBStateBackend- 将程式計算的狀态資料存儲在TaskManager運作所在RocksDB的資料庫檔案中,系統會以增量方式将完成檢查點。在chk的時候,TaskManager會将本地的RocksDB的資料庫資料資訊異步寫入到 遠端檔案系統。JobManager在記憶體中存儲少許的中繼資料資訊。一般用在生産環境,超大state需要存儲。(key,value 不可以大于 2^31 bytes)

FsStateBackend VS RocksDBStateBackend:FsStateBackend 受限于TaskManager記憶體,效率高。RocksDBStateBackend僅僅受限于TaskManager本地磁盤,同時由于資料是存儲在磁盤中可能序列化和反序列化,是以性能可能有所下降。

Window(視窗計算)

視窗計算是流計算的核心,通過使用視窗對無限的流資料劃分成固定大小的 buckets,然後基于落入同一個bucket(視窗)中的元素執行計算。Flink将視窗計算分為兩大類。

一類基于keyed-stream視窗計算。

stream
       .keyBy(...)               <-  分組
       .window(...)              <-  必須: "assigner" 視窗配置設定器
      [.trigger(...)]            <-  可選: "trigger" 每一種類型的視窗系統都有預設觸發器
      [.evictor(...)]            <-  可選: "evictor" 可以剔除視窗中元素
      [.allowedLateness(...)]    <-  可選: "lateness" 可以處理遲到資料
      [.sideOutputLateData(...)] <-  可選: "output tag" 可以Side Out擷取遲到的元素
       .reduce/aggregate/fold/apply()      <-  必須: "function"
      [.getSideOutput(...)]      <-  可選: 擷取Sideout資料 例如遲到資料
           

直接對non-keyed Stream視窗計算

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
           

Window Lifecycle

簡而言之,一旦應屬于該視窗的第一個元素到達,就會建立一個視窗,并且當時間|WaterMarker(Event Tme或Process Time)超過其Window End 時間加上使用者指定的允許延遲時,該視窗将被完全删除。視窗觸發計算前提 水位線 沒過視窗的End Time這個時候視窗處于Ready狀态,這個時候Flink才會對視窗做真正的輸出計算。

Trigger:負責監控視窗,隻有滿足觸發器的條件,視窗才會觸發。(例如 水位線計算)

evictor: 在視窗觸發之後在應用聚合函數之前或之後剔除視窗中的元素。

Window Assigners

Window Assigners定義了如何将元素配置設定給視窗。在定義完視窗之後,使用者可以使用reduce/aggregate/folder/apply等算子實作對視窗的聚合計算。

  • Tumbling Windows :滾動,視窗長度和滑動間隔相等,視窗之間沒有重疊。(時間)
dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
    .print()
           
  • Sliding Windows:滑動,視窗長度 大于 滑動間隔,視窗之間存在資料重疊。(時間)
dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
    .fold(("",0))((z,v)=>(v._1,z._2+v._2))
    .print()
           
  • Session Windows: 會話視窗,視窗沒有固定大小,每個元素都會形成一個新視窗,如果視窗的間隔小于指定時間,這些視窗會進行合并。(時間)
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.aggregate(new AggregateFunction[(String,Int),(String,Int),(String,Int)] {
    override def createAccumulator(): (String, Int) = {
        ("",0)
    }

    override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = {
        (value._1,value._2+accumulator._2)
    }

    override def getResult(accumulator: (String, Int)): (String, Int) = {
        accumulator
    }

    override def merge(a: (String, Int), b: (String, Int)): (String, Int) = {
        (a._1,a._2+b._2)
    }
})
.print()
           
  • Global Windows:全局視窗,視窗并不是基于時間劃分視窗,是以不存在視窗長度和時間概念。需要使用者定制觸發政策,視窗才會觸發。
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(4))
.apply(new WindowFunction[(String,Int),(String,Int),String, GlobalWindow] {
    override def apply(key: String, window: GlobalWindow, inputs: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
        println("key:"+key+" w:"+window)
        inputs.foreach(t=>println(t))
        out.collect((key,inputs.map(_._2).sum))
    }
})
.print()
           

Window Function

定義Window Assigners後,我們需要指定要在每個視窗上執行的計算。 這是Window Function的職責,一旦系統确定某個視窗已準備好進行處理,該Window Function将用于處理每個視窗的元素。Flink提供了以下Window Function處理函數:

  • ReduceFunction
new ReduceFunction[(String, Int)] {
    override def reduce(v1: (String, Int), v2: (String, Int)): (String, Int) = {
        (v1._1,v1._2+v2._2)
    }
}
           
  • AggregateFunction
new AggregateFunction[(String,Int),(String,Int),(String,Int)] {
    override def createAccumulator(): (String, Int) = {
        ("",0)
    }
    override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = {
        (value._1,value._2+accumulator._2)
    }
    override def getResult(accumulator: (String, Int)): (String, Int) = {
        accumulator
    }
    override def merge(a: (String, Int), b: (String, Int)): (String, Int) = {
        (a._1,a._2+b._2)
    }
}
           
  • FoldFunction(廢棄)
new FoldFunction[(String,Int),(String,Int)] {
    override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
        (value._1,accumulator._2+value._2)
    }
}
           
不能用在Merger window中,不可用在SessionWindows中。
  • apply/WindowFunction(舊版-一般不推薦)

可以擷取視窗的中的所有元素,并且可以拿到一些中繼資料資訊,無法操作視窗狀态。

new WindowFunction[(String,Int),(String,Int),String, GlobalWindow] {
    override def apply(key: String, window: GlobalWindow, inputs: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
        println("key:"+key+" w:"+window)
        inputs.foreach(t=>println(t))
        out.collect((key,inputs.map(_._2).sum))
    }
}
           
在keyBy的時候,不能使用下标,隻能使用

keyBy(_._1)

  • ProcessWindowFunction(重點掌握)

可以擷取視窗的中的所有元素,并且拿到一些中繼資料資訊。是WindowFunction的替代方案,因為該接口可以直接操作視窗的State|全局State

擷取視窗狀态

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow] {

    override def process(key: String,
                         context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {

        val w = context.window
        val sdf = new SimpleDateFormat("HH:mm:ss")

        println(sdf.format(w.getStart)+" ~ "+ sdf.format(w.getEnd))

        val total = elements.map(_._2).sum
        out.collect((key,total))
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

配合Reduce|Aggregate|FoldFunction

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((v1:(String,Int),v2:(String,Int))=>(v1._1,v1._2+v2._2),
        new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow] {

    override def process(key: String,
                         context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {

        val w = context.window
        val sdf = new SimpleDateFormat("HH:mm:ss")

        println(sdf.format(w.getStart)+" ~ "+ sdf.format(w.getEnd))

        val total = elements.map(_._2).sum
        out.collect((key,total))
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")

           

操作WindowState|GlobalState

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

//3.對資料做轉換
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((v1:(String,Int),v2:(String,Int))=>(v1._1,v1._2+v2._2),
        new ProcessWindowFunction[(String,Int),String,String,TimeWindow] {
            var windowStateDescriptor:ReducingStateDescriptor[Int]=_
            var globalStateDescriptor:ReducingStateDescriptor[Int]=_

            override def open(parameters: Configuration): Unit = {
                windowStateDescriptor = new ReducingStateDescriptor[Int]("wcs",new ReduceFunction[Int] {
                    override def reduce(value1: Int, value2: Int): Int = value1+value2
                },createTypeInformation[Int])
                globalStateDescriptor = new ReducingStateDescriptor[Int]("gcs",new ReduceFunction[Int] {
                    override def reduce(value1: Int, value2: Int): Int = value1+value2
                },createTypeInformation[Int])
            }

            override def process(key: String,
                                 context: Context,
                                 elements: Iterable[(String, Int)],
                                 out: Collector[String]): Unit = {

                val w = context.window
                val sdf = new SimpleDateFormat("HH:mm:ss")

                val windowState = context.windowState.getReducingState(windowStateDescriptor)
                val globalState = context.globalState.getReducingState(globalStateDescriptor)

                elements.foreach(t=>{
                    windowState.add(t._2)
                    globalState.add(t._2)
                })
                out.collect(key+"\t"+windowState.get()+"\t"+globalState.get())
            }
        })
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Trigger (觸發器)

Trigger确定視窗(由Window Assigner形成)何時準備好由Window Function處理。 每個Window Assigner都帶有一個預設Trigger。 如果預設Trigger不适合您的需求,則可以使用trigger(…)指定自定義觸發器。

視窗類型 觸發器 觸發時機
event-time window(Tumbling/Sliding/Session) EventTimeTrigger 一旦watermarker沒過視窗的末端,該觸發器便會觸發
processing-time window(Tumbling/Sliding/Session) ProcessingTimeTrigger 一旦系統時間沒過視窗末端,該觸發器便會觸發
GlobalWindow 并不是基于時間的視窗 NeverTrigger 永遠不會觸發。
public class UserDefineDeltaTrigger<T, W extends Window> extends Trigger<T, W> {

    private final DeltaFunction<T> deltaFunction;
    private final double threshold;
    private final ValueStateDescriptor<T> stateDesc;

    private UserDefineDeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        this.deltaFunction = deltaFunction;
        this.threshold = threshold;
        this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);
    }

    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);
        if (lastElementState.value() == null) {
            lastElementState.update(element);
            return TriggerResult.CONTINUE;
        } else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
            lastElementState.update(element);
            return TriggerResult.FIRE_AND_PURGE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, TriggerContext ctx) throws Exception {
        ((ValueState)ctx.getPartitionedState(this.stateDesc)).clear();
    }

    public String toString() {
        return "DeltaTrigger(" + this.deltaFunction + ", " + this.threshold + ")";
    }

    public static <T, W extends Window> UserDefineDeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        return new UserDefineDeltaTrigger(threshold, deltaFunction, stateSerializer);
    }
}

           
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

var deltaTrigger=UserDefineDeltaTrigger.of[(String,Double),GlobalWindow](10.0,new DeltaFunction[(String, Double)] {
    override def getDelta(lastData: (String, Double), newData: (String, Double)): Double = {
        newData._2-lastData._2
    }
},createTypeInformation[(String,Double)].createSerializer(fsEnv.getConfig))

//3.對資料做轉換  10
// a  100.0
dataStream.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble))
.keyBy(_._1)
.window(GlobalWindows.create())
.trigger(deltaTrigger)
.apply(new WindowFunction[(String,Double),(String,Int),String, GlobalWindow] {
    override def apply(key: String, window: GlobalWindow, inputs: Iterable[(String, Double)],
                       out: Collector[(String, Int)]): Unit = {
        println("key:"+key+" w:"+window)
        inputs.foreach(t=>println(t))
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Evictors(剔除器)

Evictors可以在觸發器觸發後,應用Window Function之前 和/或 之後從視窗中删除元素。 為此,Evictor界面有兩種方法:

public interface Evictor<T, W extends Window> extends Serializable {

	/**
	 * Optionally evicts elements. Called before windowing function.
	 *
	 * @param elements The elements currently in the pane.
	 * @param size The current number of elements in the pane.
	 * @param window The {@link Window}
	 * @param evictorContext The context for the Evictor
     */
	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

	/**
	 * Optionally evicts elements. Called after windowing function.
	 *
	 * @param elements The elements currently in the pane.
	 * @param size The current number of elements in the pane.
	 * @param window The {@link Window}
	 * @param evictorContext The context for the Evictor
	 */
	void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
	}
}
           
public class UserDefineErrorEvictor<W extends  Window> implements Evictor<String, W> {
    private  boolean isEvictorBefore;
    private  String  content;

    public UserDefineErrorEvictor(boolean isEvictorBefore, String content) {
        this.isEvictorBefore = isEvictorBefore;
        this.content=content;
    }

    public void evictBefore(Iterable<TimestampedValue<String>> elements, int size, W window, EvictorContext evictorContext) {
        if(isEvictorBefore){
            evict(elements,  size,  window,  evictorContext);
        }
    }

    public void evictAfter(Iterable<TimestampedValue<String>> elements, int size, W window, EvictorContext evictorContext) {
        if(!isEvictorBefore){
            evict(elements,  size,  window,  evictorContext);
        }
    }
    private  void evict(Iterable<TimestampedValue<String>> elements, int size, W window, EvictorContext evictorContext) {
        Iterator<TimestampedValue<String>> iterator = elements.iterator();
        while(iterator.hasNext()){
            TimestampedValue<String> next = iterator.next();
            String value = next.getValue();
            if(value.contains(content)){
                iterator.remove();
            }
        }
    }
}
           

EventTime Window

Flink在流式傳輸程式中支援不同的時間概念。包含:Processing Time/Event Time/Ingestion Time

Flink 筆記Apache Flink
如果使用者不指定Flink處理時間屬性,預設使用的是ProcessingTime.其中Ingestion和Processing Time都是系統産生的,不同的是Ingestion Time是Source Function産生,而Processing Time由計算節點産生,無需使用者指定時間抽取政策。

Flink中用于衡量事件時間進度的機制是水印。 水印作為資料流的一部分流動,并帶有時間戳t。 Watermark(t)聲明事件時間已在該流中達到時間t,這意味着該流中不應再有時間戳t’<= t的元素。

watermarker(T)= max Event time seen by Process Node  - maxOrderness 時間  
           

水位線計算

.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks|AssignerWithPunctuatedWatermarks)
           
  • AssignerWithPeriodicWatermarks:會定期的計算watermarker的值
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)
           
class UserDefineAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[(String,Long)] {

    var  maxOrderness=2000L
    var  maxSeenTime=0L
    var sdf=new SimpleDateFormat("HH:mm:ss")
    override def getCurrentWatermark: Watermark = {
        // println("watermarker:"+sdf.format(maxSeenTime-maxOrderness))
        new Watermark(maxSeenTime-maxOrderness)
    }

    override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
        maxSeenTime=Math.max(element._2,maxSeenTime)
        element._2
    }
}
           
  • AssignerWithPunctuatedWatermarks:系統每接收一個元素,就會觸發水位線的計算
class UserDefineAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[(String,Int)]{
    var  maxOrderness=2000L //最大亂序時間
    var  maxSeenTime=0L
    override def checkAndGetNextWatermark(lastElement: (String, Int), extractedTimestamp: Long): Watermark = {
        new Watermark(maxSeenTime-maxOrderness)
    }

    override def extractTimestamp(element: (String, Int), previousElementTimestamp: Long): Long = {
        maxSeenTime=Math.max(element._2,maxSeenTime)
        element._2
    }
}
           

基本案例

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
//設定時間特性
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)
//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.對資料做轉換
//a 時間戳
dataStream.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new UserDefineAssignerWithPeriodicWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction[(String,Long),String,TimeWindow] {
    var sdf=new SimpleDateFormat("HH:mm:ss")

    override def apply(window: TimeWindow,
                       input: Iterable[(String, Long)],
                       out: Collector[String]): Unit = {
        println(sdf.format(window.getStart)+" ~ "+ sdf.format(window.getEnd))
        out.collect(input.map(t=>t._1+"->" +sdf.format(t._2)).reduce((v1,v2)=>v1+" | "+v2))
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

遲到資料處理

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
//設定時間特性
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)

//3.對資料做轉換
//a 時間戳
dataStream.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new UserDefineAssignerWithPeriodicWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(2)) // w - window End < 2 資料還可以參與計算
.apply(new AllWindowFunction[(String,Long),String,TimeWindow] {
    var sdf=new SimpleDateFormat("HH:mm:ss")

    override def apply(window: TimeWindow,
                       input: Iterable[(String, Long)],
                       out: Collector[String]): Unit = {
        println(sdf.format(window.getStart)+" ~ "+ sdf.format(window.getEnd))
        out.collect(input.map(t=>t._1+"->" +sdf.format(t._2)).reduce((v1,v2)=>v1+" | "+v2))
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           
當 視窗end時間 <watermarkwer < 視窗End時間 + 遲到之間,有資料落入到該觸發過的視窗,系統會将這些資料定義為遲到的資料,并且可以加入到視窗的計算。

太遲的資料

如果目前水位線的時間T - 視窗的End時間 >= 最大遲到的時間,此時如果有資料落入到視窗中,該資料預設Flink是丢棄的,如果需要擷取這些沒有參與計算的資料使用者可以通過sideout手段擷取,這些太遲的資料。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
//設定時間特性
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)

//2.建立DataStream -細化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
val lateTag = new OutputTag[(String,Long)]("late")
//3.對資料做轉換
//a 時間戳
val stream = dataStream.map(_.split("\\s+"))
.map(ts => (ts(0), ts(1).toLong))
.assignTimestampsAndWatermarks(new UserDefineAssignerWithPeriodicWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(2)) // w - window end time < 2 資料還可以參與計算
.sideOutputLateData(lateTag) //将太遲的資料,輸出到
.apply(new AllWindowFunction[(String, Long), String, TimeWindow] {
    var sdf = new SimpleDateFormat("HH:mm:ss")

    override def apply(window: TimeWindow,
                       input: Iterable[(String, Long)],
                       out: Collector[String]): Unit = {
        println(sdf.format(window.getStart) + " ~ " + sdf.format(window.getEnd))
        out.collect(input.map(t => t._1 + "->" + sdf.format(t._2)).reduce((v1, v2) => v1 + " | " + v2))
    }
})
stream.print("視窗")
stream.getSideOutput(lateTag).print("遲到資料:")

fsEnv.execute("FlinkWordCountsQuickStart")
           

Watermarks in Parallel Streams

watermarker 在Source Function 之後直接生成。 Source Function 的每個并行子任務通常獨立生成其watermarker 。随着watermarker 在流程式中的流動,它們會增加計算節點的EventTime。 每當Operator更新了事件時間,該事件事件都會為其後Operator在下遊生成新的watermarker。當下遊操作符接收到多個watermarker的值得時候,系統會選擇最小的watermarker。

Flink 筆記Apache Flink

Join(連接配接)

Window Join

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
           

Tumbling Window Join

When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to a

JoinFunction

or

FlatJoinFunction

. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted!

Flink 筆記Apache Flink
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
//設定時間特性
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)

// 001 zhangsan 時間戳
val userStrem: DataStream[(String,String,Long)] = fsEnv.socketTextStream("Spark",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)
// 001 100.0 時間戳
val orderStream: DataStream[(String,Double,Long)] = fsEnv.socketTextStream("Spark",8888)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble,ts(2).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)

userStrem.join(orderStream)
.where(_._1)
.equalTo(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply((v1,v2,out:Collector[String])=>{
    out.collect(v1._1+"\t"+v1._2+"\t"+v2._2)
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Sliding Window Join

When performing a sliding window join, all elements with a common key and common sliding window are joined as pairwise combinations and passed on to the

JoinFunction

or

FlatJoinFunction

. Elements of one stream that do not have elements from the other stream in the current sliding window are not emitted!

Flink 筆記Apache Flink
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
//設定時間特性
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)

// 001 zhangsan 時間戳
val userStrem: DataStream[(String,String,Long)] = fsEnv.socketTextStream("Spark",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)
// 001 100.0 時間戳
val orderStream: DataStream[(String,Double,Long)] = fsEnv.socketTextStream("Spark",8888)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble,ts(2).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)

userStrem.join(orderStream)
.where(_._1)
.equalTo(_._1)
.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.seconds(1)))
.apply((v1,v2,out:Collector[String])=>{
    out.collect(v1._1+"\t"+v1._2+"\t"+v2._2)
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Session Window Join

When performing a session window join, all elements with the same key that when “combined” fulfill the session criteria are joined in pairwise combinations and passed on to the

JoinFunction

or

FlatJoinFunction

. Again this performs an inner join, so if there is a session window that only contains elements from one stream, no output will be emitted!

Flink 筆記Apache Flink
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
//設定時間特性
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)

// 001 zhangsan 時間戳
val userStrem: DataStream[(String,String,Long)] = fsEnv.socketTextStream("Spark",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)
// 001 100.0 時間戳
val orderStream: DataStream[(String,Double,Long)] = fsEnv.socketTextStream("Spark",8888)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble,ts(2).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)

userStrem.join(orderStream)
.where(_._1)
.equalTo(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(2)))
.apply((v1,v2,out:Collector[String])=>{
    out.collect(v1._1+"\t"+v1._2+"\t"+v2._2)
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Interval Join

The interval join joins elements of two streams (we’ll call them A & B for now) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A.

This can also be expressed more formally as

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]

or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

where a and b are elements of A and B that share a common key. Both the lower and upper bound can be either negative or positive as long as as the lower bound is always smaller or equal to the upper bound. The interval join currently only performs inner joins.

When a pair of elements are passed to the

ProcessJoinFunction

, they will be assigned with the larger timestamp (which can be accessed via the

ProcessJoinFunction.Context

) of the two elements.

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
//設定時間特性
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設定水位線計算頻率 1s
fsEnv.getConfig.setAutoWatermarkInterval(1000)

// 001 zhangsan 時間戳
val userkeyedStrem: KeyedStream[(String,String,Long),String] = fsEnv.socketTextStream("Spark",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)
.keyBy(t=>t._1)
// 001 100.0 時間戳
val orderStream: KeyedStream[(String,Double,Long),String] = fsEnv.socketTextStream("Spark",8888)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble,ts(2).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)
.keyBy(t=>t._1)

userkeyedStrem.intervalJoin(orderStream)
.between(Time.seconds(-2),Time.seconds(2))
.process(new ProcessJoinFunction[(String,String,Long),(String,Double,Long),String] {
    override def processElement(left: (String, String, Long),
                                right: (String, Double, Long),
                                ctx: ProcessJoinFunction[(String, String, Long), (String, Double, Long), String]#Context,
                                out: Collector[String]): Unit = {
        val leftTimestamp = ctx.getLeftTimestamp
        val rightTimestamp = ctx.getRightTimestamp
        val timestamp = ctx.getTimestamp
        println(s"left:${leftTimestamp},right:${rightTimestamp},timestamp:${timestamp}")
        out.collect(left._1+"\t"+left._2+"\t"+right._2)
    }
})
.print()

fsEnv.execute("FlinkWordCountsQuickStart")
           

Flink HA搭建

The general idea of JobManager high availability for standalone clusters is that there is a single leading JobManager at any time and multiple standby JobManagers to take over leadership in case the leader fails. This guarantees that there is no single point of failure and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.

Flink 筆記Apache Flink

準備工作

  • 時鐘同步
  • IP和主機映射
  • SSH免密登陸
  • 關閉防火牆
  • 安裝JDK8
  • 安裝zookeeeper
  • 安裝HDFS -HA
  • 搭建Flink -HA

imeCharacteristic(TimeCharacteristic.EventTime)

//設定水位線計算頻率 1s

fsEnv.getConfig.setAutoWatermarkInterval(1000)

// 001 zhangsan 時間戳

val userkeyedStrem: KeyedStream[(String,String,Long),String] = fsEnv.socketTextStream(“Spark”,9999)

.map(_.split("\s+"))

.map(ts=>(ts(0),ts(1),ts(2).toLong))

.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)

.keyBy(t=>t.1)

// 001 100.0 時間戳

val orderStream: KeyedStream[(String,Double,Long),String] = fsEnv.socketTextStream(“Spark”,8888)

.map(.split("\s+"))

.map(ts=>(ts(0),ts(1).toDouble,ts(2).toLong))

.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)

.keyBy(t=>t._1)

userkeyedStrem.intervalJoin(orderStream)

.between(Time.seconds(-2),Time.seconds(2))

.process(new ProcessJoinFunction[(String,String,Long),(String,Double,Long),String] {

override def processElement(left: (String, String, Long),

right: (String, Double, Long),

ctx: ProcessJoinFunction[(String, String, Long), (String, Double, Long), String]#Context,

out: Collector[String]): Unit = {

val leftTimestamp = ctx.getLeftTimestamp

val rightTimestamp = ctx.getRightTimestamp

val timestamp = ctx.getTimestamp

println(s"left: l e f t T i m e s t a m p , r i g h t : {leftTimestamp},right: leftTimestamp,right:{rightTimestamp},timestamp:${timestamp}")

out.collect(left._1+"\t"+left._2+"\t"+right._2)

}

})

.print()

fsEnv.execute(“FlinkWordCountsQuickStart”)

## Flink HA搭建

The general idea of JobManager high availability for standalone clusters is that there is a single leading JobManager at any time and multiple standby JobManagers to take over leadership in case the leader fails. This guarantees that there is no single point of failure and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.

[外鍊圖檔轉存中...(img-zZBs00hW-1577527760177)]

### 準備工作

- 時鐘同步
- IP和主機映射
- SSH免密登陸
- 關閉防火牆
- 安裝JDK8
- 安裝zookeeeper
- 安裝HDFS -HA
- 搭建Flink -HA 

           

繼續閱讀