文章目录
-
-
-
- 一、前言
- 二、数据输出
-
- 2.1 输出到文件
- 2.2 输出到Kafka
- 2.3 输出到Redis
- 2.4 输出到ElasticSearch
- 2.5 输出到MySQL
- 三、总结
-
-
一、前言
前面我们已经学习了Flink的读取及转换,这篇文章将讲讲如何将转换后的数据输出,也就是数据落地,落地后给第三方进行使用
二、数据输出
数据落地多种方式,我们将从以下几个方面来说明
2.1 输出到文件
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
object StreamSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromCollection(List(1, 2, 3))
// 通过调用writeAS方法将Flink数据写入文件,使用简单方便,不过已经不建议使用,以后可能会弃用,推荐使用addSink方法
// dataStream.writeAsText("/opt/out.txt")
dataStream.addSink(
StreamingFileSink.forRowFormat(new Path("/opt/out.txt"),
new SimpleStringEncoder[Int]()
).build()
)
env.execute("Stream Sink")
}
}
2.2 输出到Kafka
添加Flink连接Kafka的依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
object StreamSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromCollection(List("hello", "flink", "are"))
dataStream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "test", new SimpleStringSchema()))
env.execute("Stream Sink")
}
}
2.3 输出到Redis
添加Flink连接Redis的依赖包
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
case class student(name: String, age: Int)
object StreamSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromCollection(List(
student("zhang", 20),
student("li", 27)
))
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
dataStream.addSink(new RedisSink[student](conf, new MyRedisMapper))
env.execute("Stream Sink")
}
}
class MyRedisMapper extends RedisMapper[student]() {
// 设置写入Redis方式
override def getCommandDescription: RedisCommandDescription =
new RedisCommandDescription(RedisCommand.SET)
// Key的字段
override def getKeyFromData(t: student): String = t.name
// value的字段
override def getValueFromData(t: student): String = t.age.toString
}
2.4 输出到ElasticSearch
添加Flink连接ElasticSearch的依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.12.0</version>
</dependency>
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
case class student(name: String, age: Int)
object StreamSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromCollection(List(
student("zhang", 27),
student("li", 24)
))
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("localhost", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[student](
httpHosts,
new ElasticsearchSinkFunction[student] {
override def process(t: student, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val json = new util.HashMap[String, String]()
json.put("name", t.name)
json.put("age", t.age.toString)
val request = Requests.indexRequest().index("test").`type`("info").source(json)
requestIndexer.add(request)
}
})
dataStream.addSink(esSinkBuilder.build())
env.execute("Stream Sink")
}
}
2.5 输出到MySQL
添加连接MySQL的驱动依赖包
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
case class student(name: String, age: Int)
object StreamSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromCollection(List(
student("zhang", 27),
student("li", 24)
))
dataStream.addSink(new JdbcSink)
env.execute("Stream Sink")
}
}
class JdbcSink extends RichSinkFunction[student] {
var conn:Connection = _
var insertStmt:PreparedStatement = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("insert into stu(name, age) values(?,?)")
}
override def invoke(value: student, context: SinkFunction.Context): Unit = {
insertStmt.setString(1, value.name)
insertStmt.setInt(2, value.age)
insertStmt.execute()
}
override def close(): Unit = {
insertStmt.close()
conn.close()
}
}
三、总结
以上就是今天要讲的内容,本文主要对Flink常用落地进行了示例说明,Flink给我们提供了很多便利的方法供我们使用