前言
下面這篇文章是使用Flink的Sink 寫出資料到Redis和MySQL
Flink之Sink寫入Redis和MySQL
Flink需要添加Sink的時候,需要自己去添加寫Sink,我們可以實作SinkFunction,或者我們也可以繼承RichSinkFunction,RichSinkFunction是實作了SinkFunction和繼承了一個AbstractRichFunction,而增強主要是在AbstractRichFunction裡面是有生命周期函數,這個對我們使用Sink的時候非常重要
// --------------------------------------------------------------------------------------------
// Default life cycle methods
// --------------------------------------------------------------------------------------------
@Override
public void open(Configuration parameters) throws Exception {}
@Override
public void close() throws Exception {}
具體是否可以使用我們可以在官網裡面查詢你的資料庫是否可以支援Source和Sink,下面這個是在1.15的文檔下,可能後面社群會推出更多的支援,大家可以去官網中去看Overview | Apache Flink
Connectors | source | sink |
Kafka | 支援 | 支援 |
Cassandra | 不支援 | 支援 |
Kinesis | 支援 | 支援 |
Elasticsearch | 不支援 | 支援 |
FileSystem | 不支援 | 支援 |
RabbitMQ | 支援 | 支援 |
Google PubSub | 支援 | 支援 |
Hybrid Source | 支援 | 不支援 |
NiFi | 支援 | 支援 |
Pulsar | 支援 | 不支援 |
JDBC | 支援 | 不支援 |
ActiveMQ | 支援 | 支援 |
Flume | 不支援 | 支援 |
Redis | 不支援 | 支援 |
Akka | 不支援 | 支援 |
Netty | 支援 | 不支援 |
Sink
下面我們來看一個例子吧,這個是日志資料,本次例子也是自己來模拟的
202512120010,c.com,2000
202512120010,c.com,5000
202512120010,a.com,6000
202512120010,c.com,1000
202512120010,b.com,2000
202512120010,a.com,2000
下面的是一個例子,裡面有兩個例子,一個是寫入MySQL的,具體整個函數的處理就是根據域名進行點選量的統計,首先我們需要對資料進行轉化成一個Access實體,然後再進行FlatMap轉化,你可以看到添加一個Sink寫出資料也是通過stream.addSink()添加一個Sink來寫出資料。
public static void toMySql(StreamExecutionEnvironment env) {
DataStreamSource<String> source = env.readTextFile("D:/code/flink/coding510/com.dy.flink/data/access.log");
SingleOutputStreamOperator<Access> mapStream = source.map(new MapFunction<String, Access>() {
@Override
public Access map(String value) throws Exception {
String[] splits = value.split(",");
Long time = Long.parseLong(splits[0].trim());
String domain = splits[1].trim();
Double traffic = Double.parseDouble(splits[2].trim());
return new Access(time, domain, traffic);
}
});
SingleOutputStreamOperator<Tuple2<String, Double>> reduceStream = mapStream.flatMap(new FlatMapFunction<Access, Tuple2<String, Double>>() {
@Override
public void flatMap(Access value, Collector<Tuple2<String, Double>> out) throws Exception {
out.collect(Tuple2.of(value.getDomain(), value.getTraffic()));
}
}).keyBy(new KeySelector<Tuple2<String, Double>, String>() {
@Override
public String getKey(Tuple2<String, Double> value) throws Exception {
return value.f0;
}
}).reduce(new ReduceFunction<Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPassword("123456")
.setPort(6379).build();
reduceStream.addSink(new RedisSink<Tuple2<String, Double>>(conf, new PkRedisSink()));
//reduceStream.addSink(new PkMySqlSink());
}
下面先來看Redis的Sink,這種采用的是實作RedisMapper來實作Redis的寫出
public class PkRedisSink implements RedisMapper<Tuple2<String, Double>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "pk-traffic");
}
@Override
public String getKeyFromData(Tuple2<String, Double> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Double> data) {
return data.f1 + "";
}
}
public class PkMySqlSink extends RichSinkFunction<Tuple2<String, Double>> {
Connection connection;
PreparedStatement insertPstmt;
PreparedStatement updatePstmt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Connection connection = MySQLUtils.getConnection();
insertPstmt = connection.prepareStatement("insert into traffic(domain, traffic) values(?, ?)");
updatePstmt = connection.prepareStatement("update traffic set traffic = ? where domain = ?");
}
@Override
public void close() throws Exception {
super.close();
if (null != insertPstmt) {
insertPstmt.close();
}
if (null != updatePstmt) {
updatePstmt.close();
}
if (null != connection) {
connection.close();
}
}
@Override
public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
System.out.println("=====invoke======" + value.f0 + "==>" +value.f1);
updatePstmt.setString(2, value.f0);
updatePstmt.setDouble(1, value.f1);
updatePstmt.execute();
if (updatePstmt.getUpdateCount() == 0) {
insertPstmt.setString(1, value.f0);
insertPstmt.setDouble(2, value.f1);
insertPstmt.execute();
}
}