天天看點

Flink Source、Sink

程式部署

本地執行
//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.createLocalEnvironment(3)

    //2.建立DataStream
    val text = env.socketTextStream("train",9999)

    //3.執行DataStream的轉換算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
遠端部署
//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.建立DataStream
    val text = env.socketTextStream("train",9999)

    //3.執行DataStream的轉換算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
StreamExecutionEnvironment.getExecutionEnvironment自動識别運作環境,如果運作環境是idea,系統會自動切換成本地模式,預設系統的并行度使用系統最大線程數,等價于Spark中設定的

local[*]

,如果是生産環境,需要使用者在送出任務的時候指定并行度

--parallelism

  • 部署方式
    • WEB UI部署(略)
    • 通過腳本部署
[[email protected] ~]# cd /usr/soft/flink-1.10.0/
[[email protected] flink-1.10.0]# ./bin/flink run 
						--class com.baizhi.quickstart.FlinkWordCountTestLocal 
						--detached --parallelism 4 
						--jobmanager train:8081 /root/flink-1.0-SNAPSHOT.jar
Job has been submitted with JobID 808021820a80d008e8fcd7e72bba1029
           

檢視現有任務

[[email protected] flink-1.10.0]# ./bin/flink list --running --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------

[[email protected] flink-1.10.0]# ./bin/flink list --all --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
           

取消指定任務

[[email protected] flink-1.10.0]# ./bin/flink cancel --jobmanager train:8081 808021820a80d008e8fcd7e72bba1029
Cancelling job 808021820a80d008e8fcd7e72bba1029.
Cancelled job 808021820a80d008e8fcd7e72bba1029.
           

檢視程式執行計劃

[[email protected] flink-1.10.0]# ./bin/flink info --class com.baizhi.quickstart.FlinkWordCountTestLocal --parallelism 4 /root/flink-1.0-SNAPSHOT.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":3,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"FORWARD","side":"second"}]},{"id":5,"type":"aggregation","pact":"Operator","contents":"aggregation","parallelism":4,"predecessors":[{"id":3,"ship_strategy":"HASH","side":"second"}]},{"id":6,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":4,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

No description provided.
           

⽤戶可以通路:

https://flink.apache.org/visualizer/

将json資料粘貼過去,檢視Flink執⾏計劃圖

Flink Source、Sink
跨平台釋出
//1.建立流計算執行環境
    var jars = "D:\\ideacores\\Flink\\flink\\target\\flink-1.0-SNAPSHOT.jar"

    val env = StreamExecutionEnvironment.createRemoteEnvironment("train",8081,jars)
    
    //設定預設并行度
    env.setParallelism(4)

    //2.建立DataStream
    val text = env.socketTextStream("train",9999)

    //3.執行DataStream的轉換算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
在運作之前需要使用mvn重新打包程式。直接運作main函數即可
Streaming(DataStream API)
DataSource

資料源是程式讀取資料的來源,使用者可以通過

env.addSource(SourceFunction)

,将SourceFunction添加到程式中。Flink内置許多已知實作的SourceFunction,但是使用者可以自定義實作

SourceFunction

(非并行化的接口)接口或者實作

ParallelSourceFunction

(并行化)接口,如果需要有狀态管理還可以繼承

RichParallelSourceFunction

File-based

  • readTextFile(path)

    - 讀取文本檔案(一次) 并以字元串的形式傳回它們
//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val text = env.readTextFile("hdfs://train:9000/demo/words")


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
  • readFile(fileInputFormat, path)

    -根據指定的檔案輸入讀取(一次)檔案

    格式

//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val inputFormat = new TextInputFormat(null)
    
    val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words")


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

    -

    這是前兩個方法在内部調用的方。 它讀取路徑中的檔案

    fileInputFormat

    . 根據所提供的watchType,此源可以定期調用

    監視(每隔ms)新資料的路徑

    (

    FileProcessingMode.PROCESS_CONTINUOUSLY

    ), 或處理目前路徑中的資料

    退出檔案處理模式。

    PROCESS_ONCE

    )。使用路徑過濾器,使用者可以進一步

    排除正在處理的檔案。

//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val inputFormat = new TextInputFormat(null)

    val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
該方法會檢查采集目錄下的檔案,如果檔案發生變化系統會重新采集。此時可能會導緻檔案的重複計算。一般來說不建議修改檔案内容,直接上傳新檔案即可。

Socket Based

  • socketTextStream

    - Reads from a socket. Elements can be separated by a delimiter.
//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val text = env.socketTextStream("train",9999,'\n',3)


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           

Collection-based

//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val inputFormat = new TextInputFormat(null)

    val text = env.fromCollection(List("this is a demo","hello word"))


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
UserDefinedSource
  • SourceFunction(非并行化)
import java.util.Random

import org.apache.flink.streaming.api.functions.source.SourceFunction

class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
  @volatile //防止線程拷貝變量
  var isRunning = true
  var lines = Array("this is a demo","hello world","ni hao ma")
  //在該方法中啟動線程,通過sourceContext的collect方法發送資料
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(100)
      //輸送資料給下遊
      sourceContext.collect(lines(new Random().nextInt(lines.size)))
    }
  }
  //釋放資源
  override def cancel(): Unit = {
    isRunning = false
  }
}
           

測試

//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val inputFormat = new TextInputFormat(null)

    val text = env.addSource(new UserDefinedNonParallelSourceFunction)


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
  • ParallelSourceFunction(并行化)
import java.util.Random

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class UserDefinedNonParallelSourceFunction extends ParallelSourceFunction[String]{
  @volatile //防止線程拷貝變量
  var isRunning = true
  var lines = Array("this is a demo","hello world","ni hao ma")
  //在該方法中啟動線程,通過sourceContext的collect方法發送資料
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(100)
      //輸送資料給下遊
      sourceContext.collect(lines(new Random().nextInt(lines.size)))
    }
  }
  //釋放資源
  override def cancel(): Unit = {
    isRunning = false
  }
}
           

測試

//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val inputFormat = new TextInputFormat(null)

    val text = env.addSource(new UserDefinedNonParallelSourceFunction)


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.print()

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
Kafka Source
  • 引入maven
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.10.0</version>
</dependency>
           
  • SimpleStringSchema

    該SimpleStringSchema方案隻會反序列化kafka中的value

//1.建立流計算執⾏環境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.建立DataStream - 細化
 val props = new Properties()
 props.setProperty("bootstrap.servers", "CentOS:9092")
 props.setProperty("group.id", "g1")
 val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
 //3.執⾏DataStream的轉換算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将計算的結果在控制列印
 counts.print()
 //5.執⾏流計算任務
 env.execute("Window Stream WordCount")
           
  • KafkaDeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class UserKafkaDeserialization extends KafkaDeserializationSchema[(String,String,Int,Long)]{
  override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
    if(consumerRecord.key()!=null){
      (new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }else{
      (null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }
  }

  override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
    createTypeInformation[(String,String,Int,Long)]
  }
}
           
//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val prop = new Properties()
    prop.setProperty("bootstrap.servers","train:9092")
    prop.setProperty("group.id", "g1")

   

    val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("topic01",new UserKafkaDeserialization,prop))

    text.flatMap(line=>line._2.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)
      .print()


    env.execute("Window Stream")
           
  • JSONKeyValueNodeDeserializationSchema
  • 要求Kafka中的topic的key和value都必須是json格式,也可以在使⽤的時候,指定是否讀取中繼資料

    (topic、分區、o!set等)

//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val prop = new Properties()
    prop.setProperty("bootstrap.servers","train:9092")
    prop.setProperty("group.id", "g1")



    val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new JSONKeyValueDeserializationSchema(true),prop))
      //t:{"value":{"id":1,"name":"zhangsan"},"metadata":{"offset":0,"topic":"topic01","partition":13}}
    text.map(word=>(word.get("value").get("id").asInt(),word.get("value").get("name").asText()))
      .print()


    env.execute("Window Stream")
           

參考:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

Data Sinks

Data Sink使用DataStreams并将其轉發到檔案,Socket,外部系統或列印它們。Flink帶有多種内置輸出格式,這些格式封裝在DataStreams的操作後面。

File-based

  • writeAsText()/

    TextOutputFormat

    :将元素按行寫入為字元串。這些字元串是通過調用每個元素的toString()方法獲得的。
  • writeAsCsv(…) /

    CsvOutputFormat

    :将元組寫入逗号分隔的值檔案。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法
  • writeUsingOutputFormat/

    FileOutputFormat

    :方法和自定義檔案輸出的基類。支援自定義對象到位元組的轉換。
注意DataStream上的write*()方法主要用于調試目的。
//1.建立流計算執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.建立DataStream - 細化
    val text = env.fromCollection(List("this is a demo","hello word"))


    //3.執行DataStreaem的轉換算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将計算的結果在控制台列印
    counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new Path("hdfs://train:9000/flink-results")))

    //5.執行流計算任務
    env.execute("Window Stream WordCount")
           
注意事項:如果改成HDFS,需要使用者自己産生大量資料,才能看到測試效果,原因是因為HDFS檔案系統寫入時的緩沖區比較大、以上寫入檔案系統的Sink不能夠參與系統檢查點,如果在生産環境下通常使用flink-connector-filesystem寫入到外圍系統。
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-filesystem_2.11</artifactId>
 <version>1.10.0</version>
</dependency>
           
val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")

    val bucketingSink = StreamingFileSink.forRowFormat(new Path("hdfs://train:9000/bucket-results"),
      new SimpleStringEncoder[(String, Int)]("UTF-8")
    ).withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd")) //動态産生寫入路徑
      .build()

    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(bucketingSink)
           

老版本寫法

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")

    val bucketingSink = new BucketingSink[(String,Int)]("hdfs://train:9000/bucket-results")
    bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
    bucketingSink.setBatchSize(1024)

    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(bucketingSink)
           
print()/printToErr()

Prints the toString() value of each element on the standard out / standard error stream. Optionally, aprefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print . If the parallelism is greater than 1, the output will also be prepended with theidentifier of the task which produced the output.

列印标準輸出/标準錯誤流中每個元素的toString()值。可選地,可以提供aprefix (msg),它預先寫入輸出。這有助于區分不同的列印調用。如果并行度大于1,輸出也将以産生輸出的任務的辨別符作為字首。

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")


    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.setParallelism(2).print("測試")

    env.execute("Window Stream")
           
UserDefinedSinkFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{
  override def open(parameters: Configuration): Unit = {
    println("打開連結...")
  }

  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println("輸出:"+value)
  }

  override def close(): Unit = {
    println("釋放連結")
  }

}
           
val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")


    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(new UserDefinedSinkFunction)

    env.execute("Window Stream WordCount")
           
RedisSink

參考:

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
           
//1.建立流計算執⾏環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //2.建立DataStream - 細化
    val text = env.readTextFile("hdfs://train:9000/demo/words")

    var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
      .setHost("train")
      .setPort(6379)
      .build()

    //3.執⾏DataStream的轉換算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedSinkFunction()))

    env.execute("Window Stream")
           
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

class UserDefinedSinkFunction extends RedisMapper[(String,Int)]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
  }

  override def getKeyFromData(t: (String, Int)): String = t._1

  override def getValueFromData(t: (String, Int)): String = t._2+""
}
           

繼續閱讀