天天看点

Flink基础学习(Scala):数据输出Sink

文章目录

        • 一、前言
        • 二、数据输出
          • 2.1 输出到文件
          • 2.2 输出到Kafka
          • 2.3 输出到Redis
          • 2.4 输出到ElasticSearch
          • 2.5 输出到MySQL
        • 三、总结

一、前言

前面我们已经学习了Flink的读取及转换,这篇文章将讲讲如何将转换后的数据输出,也就是数据落地,落地后给第三方进行使用

二、数据输出

数据落地多种方式,我们将从以下几个方面来说明

2.1 输出到文件
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

object StreamSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromCollection(List(1, 2, 3))
    // 通过调用writeAS方法将Flink数据写入文件,使用简单方便,不过已经不建议使用,以后可能会弃用,推荐使用addSink方法
    // dataStream.writeAsText("/opt/out.txt")
    dataStream.addSink(
      StreamingFileSink.forRowFormat(new Path("/opt/out.txt"),
        new SimpleStringEncoder[Int]()
      ).build()
    )
    env.execute("Stream Sink")
  }
}
           
2.2 输出到Kafka

添加Flink连接Kafka的依赖包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
</dependency>
           
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

object StreamSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromCollection(List("hello", "flink", "are"))

    dataStream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "test", new SimpleStringSchema()))
    env.execute("Stream Sink")
  }
}
           
2.3 输出到Redis

添加Flink连接Redis的依赖包

<dependency>
   <groupId>org.apache.bahir</groupId>
   <artifactId>flink-connector-redis_2.11</artifactId>
   <version>1.0</version>
</dependency>
           
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

case class student(name: String, age: Int)

object StreamSink {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromCollection(List(
      student("zhang", 20),
      student("li", 27)
    ))
    val conf = new FlinkJedisPoolConfig.Builder()
        .setHost("localhost")
        .setPort(6379)
        .build()

    dataStream.addSink(new RedisSink[student](conf, new MyRedisMapper))
    env.execute("Stream Sink")
  }
}

class MyRedisMapper extends RedisMapper[student]() {
  // 设置写入Redis方式
  override def getCommandDescription: RedisCommandDescription =
    new RedisCommandDescription(RedisCommand.SET)
  // Key的字段
  override def getKeyFromData(t: student): String = t.name
  // value的字段
  override def getValueFromData(t: student): String = t.age.toString
}
           
2.4 输出到ElasticSearch

添加Flink连接ElasticSearch的依赖包

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
   <version>1.12.0</version>
</dependency>
           
import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


case class student(name: String, age: Int)

object StreamSink {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromCollection(List(
      student("zhang", 27),
      student("li", 24)
    ))

    val httpHosts = new util.ArrayList[HttpHost]
    httpHosts.add(new HttpHost("localhost", 9200, "http"))
    val esSinkBuilder = new ElasticsearchSink.Builder[student](
      httpHosts,
      new ElasticsearchSinkFunction[student]   {
        override def process(t: student, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val json = new util.HashMap[String, String]()
          json.put("name", t.name)
          json.put("age", t.age.toString)
          val request = Requests.indexRequest().index("test").`type`("info").source(json)
          requestIndexer.add(request)
      }
    })
    dataStream.addSink(esSinkBuilder.build())

    env.execute("Stream Sink")
  }
}
           
2.5 输出到MySQL

添加连接MySQL的驱动依赖包

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.39</version>
</dependency>
           
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._


case class student(name: String, age: Int)

object StreamSink {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromCollection(List(
      student("zhang", 27),
      student("li", 24)
    ))

    dataStream.addSink(new JdbcSink)
    
    env.execute("Stream Sink")
  }
}

class JdbcSink extends RichSinkFunction[student] {
  var conn:Connection = _
  var insertStmt:PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
    insertStmt = conn.prepareStatement("insert into stu(name, age) values(?,?)")
  }

  override def invoke(value: student, context: SinkFunction.Context): Unit = {
    insertStmt.setString(1, value.name)
    insertStmt.setInt(2, value.age)
    insertStmt.execute()
  }

  override def close(): Unit = {
    insertStmt.close()
    conn.close()
  }

}
           

三、总结

以上就是今天要讲的内容,本文主要对Flink常用落地进行了示例说明,Flink给我们提供了很多便利的方法供我们使用

继续阅读