天天看點

Flink入門第十七課:PageView(pv)和UniqueVisitor(uv)的統計

資料檔案:

用到的資料檔案
連結:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 
提取碼:2hmu
           

輸入bean

package com.atguigu.networkflow_analysis.beans;

public class UserBehavior {
    public Long userId;
    public Long itemId;
    public Integer categoryId;
    public String behavior;
    public Long timestamp;

    public UserBehavior() {
    }

    public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Integer getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(Integer categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}
           

輸出bean

package com.atguigu.networkflow_analysis.beans;

public class PageViewCount {
    private String url;
    private Long windowEnd;
    private Long count;

    public PageViewCount() {
    }

    public PageViewCount(String url, Long windowEnd, Long count) {
        this.url = url;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    public String getUrl() {
        return url;
    }

    public Long getWindowEnd() {
        return windowEnd;
    }

    public Long getCount() {
        return count;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setWindowEnd(Long windowEnd) {
        this.windowEnd = windowEnd;
    }

    public void setCount(Long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "PageViewCount{" +
                "url='" + url + '\'' +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }
}
           

PageView(pv)統計

package com.atguigu.networkflow_analysis.Ahotpages;

import com.atguigu.networkflow_analysis.beans.PageViewCount;
import com.atguigu.networkflow_analysis.beans.UserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.Random;

/**
 *  每一小時輸出一次pageview:頁
 *      一個使用者一小時内通路10次頁面,pv為10,uv為1
 *
 *
 */
public class PageView {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        URL resource = PageView.class.getResource("/UserBehavior.csv");//讀取檔案
        DataStreamSource<String> inputStream = env.readTextFile(resource.getPath());

        DataStream<UserBehavior> dataStream=inputStream.map(
                line ->{
                    String [] words=line.split(",");
                    return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4]));
                })
                .assignTimestampsAndWatermarks(
                        new AscendingTimestampExtractor<UserBehavior>() { //升序
                            @Override
                            public long extractAscendingTimestamp(UserBehavior userBehavior) {//擷取事件時間戳,秒級轉毫秒級
                                return userBehavior.getTimestamp()*1000L;
                            }
                        });

        //分組聚合得到結果資料
        SingleOutputStreamOperator<PageViewCount> pvCountStream = dataStream
                .filter(data -> "pv".equals(data.getBehavior())) //過濾“pv”行為
                .map(new MapFunction<UserBehavior, Tuple2<Integer, Long>>() {//建立[1,4]随機key,讓資料均勻遍布4個分區

                    @Override
                    public Tuple2<Integer, Long> map(UserBehavior value) throws Exception {
                        Random random = new Random();
                        return new Tuple2<>(random.nextInt(4)+1,1L);
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow(Time.minutes(60)) //每1小時更新一次視窗資料
                .aggregate(new PvCountAgg(),new PvCountResult());

        SingleOutputStreamOperator<PageViewCount> countPvResult = pvCountStream
                .keyBy(PageViewCount::getWindowEnd)
                .process(new TotalPvCount());

        //輸出并執行
        countPvResult.print();
        env.execute("hot items analysis");

    }

    //泛型1:輸入類型   泛型2:聚合狀态類型   泛型3:輸出類型
    public static class PvCountAgg implements AggregateFunction<Tuple2<Integer, Long>,Long,Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Tuple2<Integer, Long> value, Long accumulator) {
            return accumulator+1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a+b;
        }
    }

    //參數1:輸入類型  參數2:輸出類型  參數3:keyBy的傳回值鍵值對中value的類型  參數4: 視窗類型
    public static class PvCountResult implements WindowFunction<Long,PageViewCount,Integer,TimeWindow> {

        @Override
        public void apply(Integer integer, TimeWindow window, Iterable<Long> iterable, Collector<PageViewCount> collector) throws Exception {
            collector.collect(new PageViewCount(integer.toString(),window.getEnd(),iterable.iterator().next()));
        }
    }

    //參數1:keyBy傳回值類型  參數2:輸入類型  參數3:輸出類型
    public static class TotalPvCount extends KeyedProcessFunction<Long,PageViewCount,PageViewCount> {
        ValueState <Long> totalCountState; //儲存目前總的count值

        @Override
        public void open(Configuration parameters) throws Exception {
            totalCountState=getRuntimeContext().getState(new ValueStateDescriptor<Long>("total_count",Long.class,0L));
        }

        @Override
        public void processElement(PageViewCount pageViewCount, Context context, Collector<PageViewCount> collector) throws Exception {
            totalCountState.update(pageViewCount.getCount()+totalCountState.value()); //count緻疊加
            context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd());//注冊定時器
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<PageViewCount> out) throws Exception {
            Long totalCount = totalCountState.value();
            out.collect(new PageViewCount("pv_count",ctx.getCurrentKey(),totalCount));

            //清空狀态
            totalCountState.clear();
        }

        @Override
        public void close() throws Exception {
            totalCountState.clear();
        }
    }
}
           

UniqueVisitor(uv)統計

package com.atguigu.networkflow_analysis.Ahotpages;

import com.atguigu.networkflow_analysis.beans.PageViewCount;
import com.atguigu.networkflow_analysis.beans.UserBehavior;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.net.URL;

/**
 * UniqueVisitorVersion1将userId存在set集合中,實際上是在記憶體中儲存資料,對記憶體是一種消耗。
 *      是以我們可以考慮就userId存在redis中。但是當使用者特别多,比如十個億,這樣的一個任務光存儲userId占用的空間就特别大。
 *      這時候我們可以考慮使用布隆過濾器。bloom filter說白了就是将存儲userId變成了判斷這個userId存不存在。
 *      原本存一個userId需要幾十B的空間,但判斷一個userId存不存在隻需要用0和1來表示,即隻需要1B空間。
 *
 * 本來借助redis和布隆過濾器來實作uv的統計。需要引入redis的依賴。
 *      g:redis.clients  a:jedis  v:2.8.1
 *
 */
public class UniqueVisitorVersion2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        URL resource = UniqueVisitorVersion1.class.getResource("/UserBehavior.csv");//讀取檔案
        DataStreamSource<String> inputStream = env.readTextFile(resource.getPath());

        DataStream<UserBehavior> dataStream=inputStream.map(
                line ->{
                    String [] words=line.split(",");
                    return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4]));
                })
                .assignTimestampsAndWatermarks(
                        new AscendingTimestampExtractor<UserBehavior>() { //升序
                            @Override
                            public long extractAscendingTimestamp(UserBehavior userBehavior) {//擷取事件時間戳,秒級轉毫秒級
                                return userBehavior.getTimestamp()*1000L;
                            }
                        });

        //處理
        SingleOutputStreamOperator<PageViewCount> uvCountStream = dataStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .timeWindowAll(Time.minutes(60))
                .trigger(new MyTrigger())
                .process(new UvCountResultWithBloomFilter());
        //執行
        uvCountStream.print();
        env.execute("統計uv");
    }

    /**
     * 自定義觸發器
     * 泛型1:輸入類型   泛型二:視窗類型
     *
     */
    public static class MyTrigger extends Trigger<UserBehavior, TimeWindow>{
        //每來一個元素觸發一次
        @Override
        public TriggerResult onElement(UserBehavior userBehavior, long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; //即每來一條資料觸發視窗計算又清空視窗資料
        }
        //在處理時間上自定義觸發
        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;//什麼都不幹
        }
        //在事件時間上觸發
        @Override
        public TriggerResult onEventTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;//什麼都不幹
        }
        //主要用來清除一些自定義狀态
        @Override
        public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception {
        }
    }

    /**
     * 自定義布隆過濾器
     */
    public static class MyBloomFilter{
        private Integer cap;//位圖的大小,一般需要時2的倍數

        public MyBloomFilter(Integer cap){
            this.cap=cap;
        }

        /**
         * @param value 需要hash的字元串
         * @param seed 随機數種子
         * @return  字元串哈希值
         */
        public Long hashCode(String value,Integer seed){
            Long result=1L;
            for(int i=0;i<value.length();i++){
                result=result * seed + value.charAt(i);
            }
            return result & (cap -1);
        }
    }

    /**
     *泛型: 輸入  輸出 視窗類型
     */
    public static class UvCountResultWithBloomFilter extends ProcessAllWindowFunction<UserBehavior, PageViewCount,TimeWindow> {
        private Jedis jedis;
        private MyBloomFilter mybloomFilter;

        @Override
        public void open(Configuration parameters) throws Exception {
            jedis=new Jedis("localhost",6379);
            mybloomFilter=new MyBloomFilter(1<<29);//約為1億位
        }

        @Override
        public void process(Context context, Iterable<UserBehavior> iterable, Collector<PageViewCount> collector) throws Exception {
            Long windowEnd = context.window().getEnd();
            String bitMapKey = windowEnd.toString();

            String countHashName="uv_count";
            String countKey=windowEnd.toString();

            //取出目前useId
            Long userId = iterable.iterator().next().userId;
            //計算出目前userId在位圖中的offset
            Long offset = mybloomFilter.hashCode(userId.toString(), 61);
            //用redis的getbit指令,判斷對應位置上的值
            Boolean isExist = jedis.getbit(bitMapKey, offset);

            if(!isExist){
                //如果對應offset上沒有值,則對應位置設為1
                jedis.setbit(bitMapKey,offset,true);

                //更新redis儲存的count值
                Long uvCount=0L;
                String uvCountString = jedis.hget(countHashName, countKey);//擷取count值
                if(uvCountString !=null && !"".equals(uvCountString)){
                    uvCount = Long.valueOf(uvCountString);
                }
                jedis.hset(countHashName,countKey,String.valueOf(uvCount+1L));//設定新的count值

                //輸出
                collector.collect(new PageViewCount("uv",windowEnd,uvCount+1L));
            }
        }

        @Override
        public void close() throws Exception {
            jedis.close();
        }
    }

}

           

依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>UserBehaviorAnalysis</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>HotItemsAnalysis</module>
        <module>NetworkFlowAnalysis</module>
        <module>KafkaDemo</module>
        <module>JavaReview</module>
    </modules>
    <!--全局依賴的版本-->
    <properties>
        <flink.version>1.10.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>
    <!--具體引入了哪些依賴-->
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_${scala.binary.version}</artifactId>
                <version>${kafka.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--flink 1.11及之後使用的都是blink的計劃器,這兒引入的也是blink的-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--flink 1.10(含)之前預設的計劃器-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--flink讀取csv檔案需要的依賴-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--kafka-connector連接配接器 1.10.1是flink版本,0.11是通用kafka連接配接器版本 2.11是scala版本-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
                <version>1.1.5</version>
            </dependency>
            <!--Elasticsearch-connector連接配接器-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!--自定義flink mysql sink 需要的mysql-conector的依賴-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.44</version>
            </dependency>
            <!--kafka依賴-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>
            <!--redis相關依賴-->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.8.1</version>
            </dependency>
        </dependencies>
    <!--引入一些插件-->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
           

繼續閱讀