天天看點

flink之Sink to MySQL和Redis

前言

下面這篇文章是使用Flink的Sink 寫出資料到Redis和MySQL

Flink之Sink寫入Redis和MySQL

Flink需要添加Sink的時候,需要自己去添加寫Sink,我們可以實作SinkFunction,或者我們也可以繼承RichSinkFunction,RichSinkFunction是實作了SinkFunction和繼承了一個AbstractRichFunction,而增強主要是在AbstractRichFunction裡面是有生命周期函數,這個對我們使用Sink的時候非常重要

// --------------------------------------------------------------------------------------------
    //  Default life cycle methods
    // --------------------------------------------------------------------------------------------

    @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    public void close() throws Exception {}      

具體是否可以使用我們可以在官網裡面查詢你的資料庫是否可以支援Source和Sink,下面這個是在1.15的文檔下,可能後面社群會推出更多的支援,大家可以去官網中去看​​Overview | Apache Flink​​

Connectors source sink
Kafka 支援 支援
Cassandra 不支援 支援
Kinesis 支援 支援
Elasticsearch 不支援 支援
FileSystem 不支援 支援
RabbitMQ 支援 支援
Google PubSub 支援 支援
Hybrid Source 支援 不支援
NiFi 支援 支援
Pulsar 支援 不支援
JDBC 支援 不支援
ActiveMQ 支援 支援
Flume 不支援 支援
Redis 不支援 支援
Akka 不支援 支援
Netty 支援 不支援

Sink

下面我們來看一個例子吧,這個是日志資料,本次例子也是自己來模拟的

202512120010,c.com,2000
202512120010,c.com,5000
202512120010,a.com,6000
202512120010,c.com,1000
202512120010,b.com,2000
202512120010,a.com,2000      

下面的是一個例子,裡面有兩個例子,一個是寫入MySQL的,具體整個函數的處理就是根據域名進行點選量的統計,首先我們需要對資料進行轉化成一個Access實體,然後再進行FlatMap轉化,你可以看到添加一個Sink寫出資料也是通過stream.addSink()添加一個Sink來寫出資料。

public static void toMySql(StreamExecutionEnvironment env) {
        DataStreamSource<String> source = env.readTextFile("D:/code/flink/coding510/com.dy.flink/data/access.log");
        SingleOutputStreamOperator<Access> mapStream = source.map(new MapFunction<String, Access>() {
            @Override
            public Access map(String value) throws Exception {
                String[] splits = value.split(",");
                Long time = Long.parseLong(splits[0].trim());
                String domain = splits[1].trim();
                Double traffic = Double.parseDouble(splits[2].trim());
                return new Access(time, domain, traffic);
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Double>> reduceStream = mapStream.flatMap(new FlatMapFunction<Access, Tuple2<String, Double>>() {
            @Override
            public void flatMap(Access value, Collector<Tuple2<String, Double>> out) throws Exception {
                out.collect(Tuple2.of(value.getDomain(), value.getTraffic()));
            }
        }).keyBy(new KeySelector<Tuple2<String, Double>, String>() {
            @Override
            public String getKey(Tuple2<String, Double> value) throws Exception {
                return value.f0;
            }
        }).reduce(new ReduceFunction<Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        });
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPassword("123456")
                .setPort(6379).build();
        reduceStream.addSink(new RedisSink<Tuple2<String, Double>>(conf, new PkRedisSink()));
        //reduceStream.addSink(new PkMySqlSink());

    }      

下面先來看Redis的Sink,這種采用的是實作RedisMapper來實作Redis的寫出

public class PkRedisSink implements RedisMapper<Tuple2<String, Double>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "pk-traffic");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Double> data) {
        return data.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Double> data) {
        return data.f1 + "";
    }
}      
public class PkMySqlSink extends RichSinkFunction<Tuple2<String, Double>> {

    Connection connection;
    PreparedStatement insertPstmt;
    PreparedStatement updatePstmt;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Connection connection = MySQLUtils.getConnection();
        insertPstmt = connection.prepareStatement("insert into traffic(domain, traffic) values(?, ?)");
        updatePstmt = connection.prepareStatement("update traffic set traffic = ? where domain = ?");

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (null != insertPstmt) {
            insertPstmt.close();
        }

        if (null != updatePstmt) {
            updatePstmt.close();
        }

        if (null != connection) {
            connection.close();
        }

    }

    @Override
    public void invoke(Tuple2<String, Double> value, Context context) throws Exception {

        System.out.println("=====invoke======" + value.f0 + "==>" +value.f1);
        updatePstmt.setString(2, value.f0);
        updatePstmt.setDouble(1, value.f1);
        updatePstmt.execute();

        if (updatePstmt.getUpdateCount() == 0) {
            insertPstmt.setString(1, value.f0);
            insertPstmt.setDouble(2, value.f1);
            insertPstmt.execute();
        }

    }      

最後