天天看點

Flink SQL實戰背景代碼 注意點

最近在研究Flink SQL流,這裡寫一個簡單的實戰。

背景

每1分鐘統計過去1個小時,各個商品的購買量。

資料格式為{"behavior":"cart","itemId":19,"nowTime":1562314387553}

behavior:為使用者行為,有cart,pv,buy。我們這裡要的就是buy

代碼

import com.alibaba.fastjson.JSON;
import com.closeli.writetohbase.fun.SQLKEYPro;
import com.closeli.writetohbase.fun.UDFToTime;
import com.closeli.writetohbase.fun.UserWaterMark;
import com.closeli.writetohbase.pojo.UserData;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;


import java.util.Properties;

public class SQLTOKafka {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties prop = new Properties();
        prop.put("group.id","aaaaaaa");
        prop.put("bootstrap.servers","node223:6667,node224:6667,node225:6667");

        FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("mydata", new SimpleStringSchema(), prop);
        DataStreamSource<String> dataSource = env.addSource(consumer);


//        dataSource.print();

        SingleOutputStreamOperator<UserData> filterData = dataSource.map(new MapFunction<String, UserData>() {
            @Override
            public UserData map(String value) throws Exception {
                UserData userData = null;
                try {
                    userData = JSON.parseObject(value, UserData.class);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return userData;
            }
        }).filter(x -> x != null && x.getBehavior().equals("pv")).assignTimestampsAndWatermarks(new UserWaterMark());

        //自定義UDF,将時間戳改成東八區的形式
        tEnv.registerFunction("timetoformat",new UDFToTime());
        
        tEnv.registerDataStream("userTable",filterData,"behavior,itemId,nowTime.rowTime");
//        Table table = tEnv.sqlQuery("select url,count(1) from userTable group by TUMBLE(nowTime,INTERVAL '5' SECOND),url");
        Table table = tEnv.sqlQuery("select itemId,count(1),timetoformat(HOP_END(nowTime,  INTERVAL '10' SECOND, INTERVAL '30' MINUTE)) from userTable group by HOP(nowTime, INTERVAL '10' SECOND, INTERVAL '30' MINUTE),itemId");

//        DataStream<Tuple> resultData = tEnv.toAppendStream(table, Types.TUPLE(Types.INT, Types.LONG ,Types.SQL_TIMESTAMP));
//        DataStream<Tuple> resultData = tEnv.toAppendStream(table, Types.TUPLE(Types.INT, Types.LONG ,Types.LONG));

        DataStream<Tuple3<Integer, Long, Long>> resultData = tEnv.toAppendStream(table, TypeInformation.of(new TypeHint<Tuple3<Integer, Long, Long>>() {}));

        resultData.print();

        
        env.execute("SQLTOKafka");
    }
}
           

UserData

是一個pojo類,屬性是behavior、itemId、nowTime

UDFToTime

這個是自定義的UDF,負責将timestamp轉成東八區的時間,Flink SQL中的時間距離中原標準時間差了8個小時,是以,如果後續有需要,可以進行轉換。

import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;

public class UDFToTime extends ScalarFunction {

    public Long eval(Timestamp s){
        return s.getTime() + 28800000;
    }

}
           

 注意點

1、目前Flinik SQL在流式進行中不支援limit 。對于order by,說是隻可以對time進行排序,但我試了下,發現并沒有成功。感覺支援的不是很好,可能是我沒有用好。

2、對于時間,如何要使用event time,那麼在前面的datastream先注冊好,然後在注冊table的時候,在對應的屬性後  .rowTime,這樣就表示這個是event time了。如果用的是processingTime,那麼就需要在系統資料庫的時候,屬性那裡,增加一個proctime.proctime,這個時候proctime就是系統目前時間,可以直接拿來用。

對于其他的知識點的話,大家可以多看看官網

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html

繼續閱讀