天天看點

【Flink-需求】KafkaSource根據經緯度查詢高德API關聯位置資訊

一、需求分析

1.KafkaSource根據經緯度查詢高德API關聯位置資訊

2.查詢一條資料及時沒有及時的傳回,也可以異步Flink IO

1.1 資料樣式

user001,A1,2020-10-10 10:10:10,1,115.963,39.3659

user002,A2,2020-10-10 10:10:10,1,123.544,36.2356

user003,A3,2020-10-10 10:10:10,1,123.568,47.3215

二、高德地圖key準備

2.1 進入開發者中心

【Flink-需求】KafkaSource根據經緯度查詢高德API關聯位置資訊
【Flink-需求】KafkaSource根據經緯度查詢高德API關聯位置資訊

2.2 引入http的依賴

<!--引入httpClient的依賴-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.6</version>
        </dependency>      

三、Flink開發

3.1 ActivityBean.java

public class ActivityBean {
    public String uid;
    public String aid;
    public String activityName;
    public String time;
    public int eventType;
    public String province;

    public double longitude;
    public double latitude;

    public ActivityBean() {
    }

    public ActivityBean(String uid, String aid, String activityName, String time, int eventType, double longitude,double latitude,String province) {
        this.uid = uid;
        this.aid = aid;
        this.activityName = activityName;
        this.time = time;
        this.eventType = eventType;
        this.province = province;
        this.latitude = latitude;
        this.longitude = longitude;
    }

    @Override
    public String toString() {
        return "ActivityBean{" +
                "uid='" + uid + '\'' +
                ", aid='" + aid + '\'' +
                ", activityName='" + activityName + '\'' +
                ", time='" + time + '\'' +
                ", eventType=" + eventType +
                ", province='" + province + '\'' +
                ", longitude=" + longitude +
                ", latitude=" + latitude +
                '}';
    }

    public static ActivityBean of(String uid,String aid,String activityName,String time,int eventType,double longitude,double latitude,String province){
        return new ActivityBean(uid,aid,activityName,time,eventType,longitude,latitude,province);
    }
}      

3.2 RichMapFunction.java

public class GeoToActivityBeanFunction extends RichMapFunction<String,ActivityBean> {

    private CloseableHttpClient httpClient = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        httpClient = HttpClients.createDefault();

    }

    @Override
    public ActivityBean map(String line) throws Exception {
        String[] fields = line.split(",");
        String uid = fields[0];
        String aid = fields[1];
        String time = fields[2];
        int eventType = Integer.parseInt(fields[3]);
        String longitude = fields[4];
        String latitude = fields[5];

        String url = "https://restapi.amap.com/v3/geocode/regeo?key=32451f2e804ded152cef65c3cd16452d&location=" + longitude + "," + latitude;
        String province = null;
        HttpGet httpGet = new HttpGet(url);
        CloseableHttpResponse response = httpClient.execute(httpGet);
        try{
            int status = response.getStatusLine().getStatusCode();
            if (status == 200) {
                //擷取請求的json字元串
                String result = EntityUtils.toString(response.getEntity());
                System.out.println(result);
                JSONObject jsonObject = JSONObject.parseObject(result);
                JSONObject regeocode = jsonObject.getJSONObject("regeocode");

                if (regeocode != null && !regeocode.isEmpty()) {
                    JSONObject address = regeocode.getJSONObject("addressComponent");
                    //擷取省市區
                    province = address.getString("province");
                    //String city = address.getString("city");
                    //String businessAreas = address.getString("businessAreas");
                }
            }
        }finally {
            response.close();
        }

        return ActivityBean.of(uid,aid,"未查詢資料庫",time,eventType,Double.parseDouble(longitude),Double.parseDouble(latitude),province);
    }

    @Override
    public void close() throws Exception {
        super.close();
        httpClient.close();
    }
}      

3.3 Flink執行程式

public class GeoQueryLocationActivityCount {
        public static void main(String[] args) throws Exception {
            //1.擷取環境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            //2.kafka配置
            String topic = "activity";
            Properties prop = new Properties();
            prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多個的話可以指定
            prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("auto.offset.reset", "earliest");
            prop.setProperty("group.id", "consumer1");

            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop);
            //3.擷取資料
            DataStream<String> lines = env.addSource(myConsumer);

            SingleOutputStreamOperator<ActivityBean> beans = lines.map(new GeoToActivityBeanFunction());

            beans.print();

            //7.執行
            env.execute("GeoQueryLocationActivityCount");

        }
}      

3.4 測試

1.開啟kafka生産者

bin/kafka-console-producer.sh --broker-list 192.168.52.200:9092,1.168.52.201:9092,192.168.52.202:9092 --topic activity      

測試結果:

【Flink-需求】KafkaSource根據經緯度查詢高德API關聯位置資訊

這裡我們想一個問題,每次flink查詢計算時候,需要依賴于查詢高德地圖的傳回結果,如果沒有傳回結果,那麼是不是flink程式就會一直卡在這裡,不會執行下去

四、Flink異步Async I/O

4.1 引入異步的httpclient依賴

<!--引入高效的異步httpClient的依賴-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.4</version>
        </dependency>      

4.2 RichAsyncFunction.java

public class AsyncGeoToActivityBeanFunction extends RichAsyncFunction<String,ActivityBean> {

    private transient CloseableHttpAsyncClient httpAsyncClient;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //1.初始化異步的httpclient
        RequestConfig requestConfig = RequestConfig.custom()
                .setSocketTimeout(3000)
                .setConnectTimeout(3000)
                .build();
        httpAsyncClient = HttpAsyncClients.custom()
                .setMaxConnTotal(20)
                .setDefaultRequestConfig(requestConfig).build();
        httpAsyncClient.start();
    }

    @Override
    public void asyncInvoke(String line, ResultFuture<ActivityBean> resultFuture) throws Exception {
        String[] fields = line.split(",");
        String uid = fields[0];
        String aid = fields[1];
        String time = fields[2];
        int eventType = Integer.parseInt(fields[3]);
        String longitude = fields[4];
        String latitude = fields[5];
        String url = "https://restapi.amap.com/v3/geocode/regeo?key=32451f2e804ded152cef65c3cd16452d&location=" + longitude + "," + latitude;

        HttpGet httpGet = new HttpGet(url);
        Future<HttpResponse> future = httpAsyncClient.execute(httpGet, null);
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try{
                    HttpResponse response = future.get();
                    String province = null;
                    if (response.getStatusLine().getStatusCode() == 200) {
                        //擷取請求的json字元串
                        String result = EntityUtils.toString(response.getEntity());
                        System.out.println(result);
                        JSONObject jsonObject = JSONObject.parseObject(result);
                        JSONObject regeocode = jsonObject.getJSONObject("regeocode");

                        if (regeocode != null && !regeocode.isEmpty()) {
                            JSONObject address = regeocode.getJSONObject("addressComponent");
                            //擷取省市區
                            province = address.getString("province");
                            //String city = address.getString("city");
                            //String businessAreas = address.getString("businessAreas");
                        }
                    }
                    return province;
                } catch (Exception e) {
                    return null;
                }
            }
        }).thenAccept((String province) ->{
            resultFuture.complete(Collections.singleton(ActivityBean.of(uid,aid,null,time,eventType,0,0,province)));
        });
    }

    @Override
    public void close() throws Exception {
        super.close();
        httpAsyncClient.close();
    }
}      

4.3 Flink主程式

public class AsyncGeoQueryLocationActivityCount {
        public static void main(String[] args) throws Exception {
            //1.擷取環境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            //2.kafka配置
            String topic = "activity";
            Properties prop = new Properties();
            prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多個的話可以指定
            prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("auto.offset.reset", "earliest");
            prop.setProperty("group.id", "consumer1");

            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop);
            //3.擷取資料
            DataStream<String> lines = env.addSource(myConsumer);

            // 無序的異步
            SingleOutputStreamOperator<ActivityBean> beans = AsyncDataStream.unorderedWait(lines, new AsyncGeoToActivityBeanFunction(), 0, TimeUnit.MICROSECONDS, 10);

            beans.print();

            //7.執行
            env.execute("AsyncGeoQueryLocationActivityCount");
        }
}      

4.4 總結