天天看点

Flink Source、Sink

程序部署

本地执行
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.createLocalEnvironment(3)

    //2.创建DataStream
    val text = env.socketTextStream("train",9999)

    //3.执行DataStream的转换算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
远程部署
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream
    val text = env.socketTextStream("train",9999)

    //3.执行DataStream的转换算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
StreamExecutionEnvironment.getExecutionEnvironment自动识别运行环境,如果运行环境是idea,系统会自动切换成本地模式,默认系统的并行度使用系统最大线程数,等价于Spark中设置的

local[*]

,如果是生产环境,需要用户在提交任务的时候指定并行度

--parallelism

  • 部署方式
    • WEB UI部署(略)
    • 通过脚本部署
[[email protected] ~]# cd /usr/soft/flink-1.10.0/
[[email protected] flink-1.10.0]# ./bin/flink run 
						--class com.baizhi.quickstart.FlinkWordCountTestLocal 
						--detached --parallelism 4 
						--jobmanager train:8081 /root/flink-1.0-SNAPSHOT.jar
Job has been submitted with JobID 808021820a80d008e8fcd7e72bba1029
           

查看现有任务

[[email protected] flink-1.10.0]# ./bin/flink list --running --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------

[[email protected] flink-1.10.0]# ./bin/flink list --all --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
           

取消指定任务

[[email protected] flink-1.10.0]# ./bin/flink cancel --jobmanager train:8081 808021820a80d008e8fcd7e72bba1029
Cancelling job 808021820a80d008e8fcd7e72bba1029.
Cancelled job 808021820a80d008e8fcd7e72bba1029.
           

查看程序执行计划

[[email protected] flink-1.10.0]# ./bin/flink info --class com.baizhi.quickstart.FlinkWordCountTestLocal --parallelism 4 /root/flink-1.0-SNAPSHOT.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":3,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"FORWARD","side":"second"}]},{"id":5,"type":"aggregation","pact":"Operator","contents":"aggregation","parallelism":4,"predecessors":[{"id":3,"ship_strategy":"HASH","side":"second"}]},{"id":6,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":4,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

No description provided.
           

⽤户可以访问:

https://flink.apache.org/visualizer/

将json数据粘贴过去,查看Flink执⾏计划图

Flink Source、Sink
跨平台发布
//1.创建流计算执行环境
    var jars = "D:\\ideacores\\Flink\\flink\\target\\flink-1.0-SNAPSHOT.jar"

    val env = StreamExecutionEnvironment.createRemoteEnvironment("train",8081,jars)
    
    //设置默认并行度
    env.setParallelism(4)

    //2.创建DataStream
    val text = env.socketTextStream("train",9999)

    //3.执行DataStream的转换算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
在运行之前需要使用mvn重新打包程序。直接运行main函数即可
Streaming(DataStream API)
DataSource

数据源是程序读取数据的来源,用户可以通过

env.addSource(SourceFunction)

,将SourceFunction添加到程序中。Flink内置许多已知实现的SourceFunction,但是用户可以自定义实现

SourceFunction

(非并行化的接口)接口或者实现

ParallelSourceFunction

(并行化)接口,如果需要有状态管理还可以继承

RichParallelSourceFunction

File-based

  • readTextFile(path)

    - 读取文本文件(一次) 并以字符串的形式返回它们
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val text = env.readTextFile("hdfs://train:9000/demo/words")


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
  • readFile(fileInputFormat, path)

    -根据指定的文件输入读取(一次)文件

    格式

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)
    
    val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words")


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

    -

    这是前两个方法在内部调用的方。 它读取路径中的文件

    fileInputFormat

    . 根据所提供的watchType,此源可以定期调用

    监视(每隔ms)新数据的路径

    (

    FileProcessingMode.PROCESS_CONTINUOUSLY

    ), 或处理当前路径中的数据

    退出文件处理模式。

    PROCESS_ONCE

    )。使用路径过滤器,用户可以进一步

    排除正在处理的文件。

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
该方法会检查采集目录下的文件,如果文件发生变化系统会重新采集。此时可能会导致文件的重复计算。一般来说不建议修改文件内容,直接上传新文件即可。

Socket Based

  • socketTextStream

    - Reads from a socket. Elements can be separated by a delimiter.
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val text = env.socketTextStream("train",9999,'\n',3)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           

Collection-based

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.fromCollection(List("this is a demo","hello word"))


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
UserDefinedSource
  • SourceFunction(非并行化)
import java.util.Random

import org.apache.flink.streaming.api.functions.source.SourceFunction

class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
  @volatile //防止线程拷贝变量
  var isRunning = true
  var lines = Array("this is a demo","hello world","ni hao ma")
  //在该方法中启动线程,通过sourceContext的collect方法发送数据
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(100)
      //输送数据给下游
      sourceContext.collect(lines(new Random().nextInt(lines.size)))
    }
  }
  //释放资源
  override def cancel(): Unit = {
    isRunning = false
  }
}
           

测试

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.addSource(new UserDefinedNonParallelSourceFunction)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
  • ParallelSourceFunction(并行化)
import java.util.Random

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class UserDefinedNonParallelSourceFunction extends ParallelSourceFunction[String]{
  @volatile //防止线程拷贝变量
  var isRunning = true
  var lines = Array("this is a demo","hello world","ni hao ma")
  //在该方法中启动线程,通过sourceContext的collect方法发送数据
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(100)
      //输送数据给下游
      sourceContext.collect(lines(new Random().nextInt(lines.size)))
    }
  }
  //释放资源
  override def cancel(): Unit = {
    isRunning = false
  }
}
           

测试

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.addSource(new UserDefinedNonParallelSourceFunction)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
Kafka Source
  • 引入maven
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.10.0</version>
</dependency>
           
  • SimpleStringSchema

    该SimpleStringSchema方案只会反序列化kafka中的value

//1.创建流计算执⾏环境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val props = new Properties()
 props.setProperty("bootstrap.servers", "CentOS:9092")
 props.setProperty("group.id", "g1")
 val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")
           
  • KafkaDeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class UserKafkaDeserialization extends KafkaDeserializationSchema[(String,String,Int,Long)]{
  override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
    if(consumerRecord.key()!=null){
      (new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }else{
      (null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }
  }

  override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
    createTypeInformation[(String,String,Int,Long)]
  }
}
           
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val prop = new Properties()
    prop.setProperty("bootstrap.servers","train:9092")
    prop.setProperty("group.id", "g1")

   

    val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("topic01",new UserKafkaDeserialization,prop))

    text.flatMap(line=>line._2.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)
      .print()


    env.execute("Window Stream")
           
  • JSONKeyValueNodeDeserializationSchema
  • 要求Kafka中的topic的key和value都必须是json格式,也可以在使⽤的时候,指定是否读取元数据

    (topic、分区、o!set等)

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val prop = new Properties()
    prop.setProperty("bootstrap.servers","train:9092")
    prop.setProperty("group.id", "g1")



    val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new JSONKeyValueDeserializationSchema(true),prop))
      //t:{"value":{"id":1,"name":"zhangsan"},"metadata":{"offset":0,"topic":"topic01","partition":13}}
    text.map(word=>(word.get("value").get("id").asInt(),word.get("value").get("name").asText()))
      .print()


    env.execute("Window Stream")
           

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

Data Sinks

Data Sink使用DataStreams并将其转发到文件,Socket,外部系统或打印它们。Flink带有多种内置输出格式,这些格式封装在DataStreams的操作后面。

File-based

  • writeAsText()/

    TextOutputFormat

    :将元素按行写入为字符串。这些字符串是通过调用每个元素的toString()方法获得的。
  • writeAsCsv(…) /

    CsvOutputFormat

    :将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法
  • writeUsingOutputFormat/

    FileOutputFormat

    :方法和自定义文件输出的基类。支持自定义对象到字节的转换。
注意DataStream上的write*()方法主要用于调试目的。
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val text = env.fromCollection(List("this is a demo","hello word"))


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new Path("hdfs://train:9000/flink-results")))

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
           
注意事项:如果改成HDFS,需要用户自己产生大量数据,才能看到测试效果,原因是因为HDFS文件系统写入时的缓冲区比较大、以上写入文件系统的Sink不能够参与系统检查点,如果在生产环境下通常使用flink-connector-filesystem写入到外围系统。
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-filesystem_2.11</artifactId>
 <version>1.10.0</version>
</dependency>
           
val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")

    val bucketingSink = StreamingFileSink.forRowFormat(new Path("hdfs://train:9000/bucket-results"),
      new SimpleStringEncoder[(String, Int)]("UTF-8")
    ).withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd")) //动态产生写入路径
      .build()

    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(bucketingSink)
           

老版本写法

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")

    val bucketingSink = new BucketingSink[(String,Int)]("hdfs://train:9000/bucket-results")
    bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
    bucketingSink.setBatchSize(1024)

    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(bucketingSink)
           
print()/printToErr()

Prints the toString() value of each element on the standard out / standard error stream. Optionally, aprefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print . If the parallelism is greater than 1, the output will also be prepended with theidentifier of the task which produced the output.

打印标准输出/标准错误流中每个元素的toString()值。可选地,可以提供aprefix (msg),它预先写入输出。这有助于区分不同的打印调用。如果并行度大于1,输出也将以产生输出的任务的标识符作为前缀。

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")


    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.setParallelism(2).print("测试")

    env.execute("Window Stream")
           
UserDefinedSinkFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{
  override def open(parameters: Configuration): Unit = {
    println("打开链接...")
  }

  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println("输出:"+value)
  }

  override def close(): Unit = {
    println("释放链接")
  }

}
           
val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")


    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(new UserDefinedSinkFunction)

    env.execute("Window Stream WordCount")
           
RedisSink

参考:

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
           
//1.创建流计算执⾏环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //2.创建DataStream - 细化
    val text = env.readTextFile("hdfs://train:9000/demo/words")

    var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
      .setHost("train")
      .setPort(6379)
      .build()

    //3.执⾏DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedSinkFunction()))

    env.execute("Window Stream")
           
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

class UserDefinedSinkFunction extends RedisMapper[(String,Int)]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
  }

  override def getKeyFromData(t: (String, Int)): String = t._1

  override def getValueFromData(t: (String, Int)): String = t._2+""
}
           

继续阅读