天天看點

flink入門到實戰(5)flink流處理從0到1

一、DataStream API之Data Sources(消費者之資料源)

介紹:

source是程式的資料源輸入,你可以通過

StreamExecutionEnvironment.addSource(sourceFunction)來為你的程式添加一個source。 flink提供了大量的已經實作好的source方法,你也可以自定義source 通過實作sourceFunction接口來自定義無并行度的source, 或者你也可以通過實作ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義有并行度的source。

類型:

基于檔案

readTextFile(path) 讀取文本檔案,檔案遵循TextInputFormat 讀取規則,逐行讀取并傳回。

基于socket

socketTextStream從socker中讀取資料,元素可以通過一個分隔符切開。

基于集合

fromCollection(Collection) 通過java 的collection集合建立一個資料流,集合中的所有元素必須是相同類型的。

自定義輸入

addSource 可以實作讀取第三方資料源的資料 系統内置提供了一批connectors,連接配接器會提供對應的source支援【kafka】

代碼實作:

1、fromCollection

package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * 把collection集合作為資料源
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingFromCollection {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Integer> data = new ArrayList<>();
        data.add(10);
        data.add(15);
        data.add(20);
        


        //指定資料源
        DataStreamSource<Integer> collectionData = env.fromCollection(data);

        //通map對資料進行處理
        DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                return value + 1;
            }
        });

        //直接列印
        num.print().setParallelism(1);

        env.execute("StreamingFromCollection");


    }
}


           

2、 建立自定義單并行度為1的SourceFunction(addSource)

① 建立自定義單并行度為1的SourceFunction

package xuwei.tech.streaming.custormSource;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * 自定義實作并行度為1的source
 *
 * 模拟産生從1開始的遞增數字
 *
 *
 * 注意:
 * SourceFunction 和 SourceContext 都需要指定資料類型,如果不指定,代碼運作的時候會報錯
 * Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
 * The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred.
 * Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
 *
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class MyNoParalleSource implements SourceFunction<Long>{

    private long count = 1L;

    private boolean isRunning = true;

    /**
     * 主要的方法
     * 啟動一個source
     * 大部分情況下,都需要在這個run方法中實作一個循環,這樣就可以循環産生資料了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while(isRunning){
            ctx.collect(count);
            count++;
            //每秒産生一條資料
            Thread.sleep(1000);
        }
    }

    /**
     * 取消一個cancel的時候會調用的方法
     *
     */
    @Override
    public void cancel() {
        isRunning = false;
    }
}



折疊            

②實作自定義單并行度為1的SourceFunction

package xuwei.tech.streaming.custormSource;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 使用并行度為1的source
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoWithMyNoPralalleSource {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取資料源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度隻能設定為1

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接收到資料:" + value);
                return value;
            }
        });

        //每2秒鐘處理一次資料
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);

        //列印結果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
        env.execute(jobName);
    }
}

           

3、 建立自定義多并行度為1的ParallelSourceFunction (addSource)

① 建立自定義多并行度為1的ParallelSourceFunction

package xuwei.tech.streaming.custormSource;

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

/**
 * 自定義實作一個支援并行度的source
 * Created by xuwei.tech on 2018/10/23.
 */
public class MyParalleSource implements ParallelSourceFunction<Long> {

    private long count = 1L;

    private boolean isRunning = true;

    /**
     * 主要的方法
     * 啟動一個source
     * 大部分情況下,都需要在這個run方法中實作一個循環,這樣就可以循環産生資料了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while(isRunning){
            ctx.collect(count);
            count++;
            //每秒産生一條資料
            Thread.sleep(1000);
        }
    }

    /**
     * 取消一個cancel的時候會調用的方法
     *
     */
    @Override
    public void cancel() {
        isRunning = false;
    }
}




           

②實作自定義多并行度為1的ParallelSourceFunction

package xuwei.tech.streaming.custormSource;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 使用多并行度的source
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoWithMyPralalleSource {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取資料源
        DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接收到資料:" + value);
                return value;
            }
        });

        //每2秒鐘處理一次資料
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);

        //列印結果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName();
        env.execute(jobName);
    }
}



           

4、 建立自定義多并行度為1的RichParallelSourceFunction(addSource)

① 建立自定義多并行度為1的

RichParallelSourceFunction

package xuwei.tech.streaming.custormSource;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/**
 * 自定義實作一個支援并行度的source
 *
 * RichParallelSourceFunction 會額外提供open和close方法
 * 針對source中如果需要擷取其他連結資源,那麼可以在open方法中擷取資源連結,在close中關閉資源連結
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class MyRichParalleSource extends RichParallelSourceFunction<Long> {

    private long count = 1L;

    private boolean isRunning = true;

    /**
     * 主要的方法
     * 啟動一個source
     * 大部分情況下,都需要在這個run方法中實作一個循環,這樣就可以循環産生資料了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while(isRunning){
            ctx.collect(count);
            count++;
            //每秒産生一條資料
            Thread.sleep(1000);
        }
    }

    /**
     * 取消一個cancel的時候會調用的方法
     *
     */
    @Override
    public void cancel() {
        isRunning = false;
    }


    /**
     * 這個方法隻會在最開始的時候被調用一次
     * 實作擷取連結的代碼
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("open.............");
        super.open(parameters);
    }

    /**
     * 實作關閉連結的代碼
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
    }
}



折疊            

②實作自定義多并行度為1的

RichParallelSourceFunction

package xuwei.tech.streaming.custormSource;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 使用多并行度的source
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoWithMyRichPralalleSource {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取資料源
        DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接收到資料:" + value);
                return value;
            }
        });

        //每2秒鐘處理一次資料
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);

        //列印結果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();
        env.execute(jobName);
    }
}

           

二、DataStream API之Transformations

介紹:

  1. map:輸入一個元素,然後傳回一個元素,中間可以做一些清洗轉換等操作
  2. flatmap:輸入一個元素,可以傳回零個,一個或者多個元素
  3. filter:過濾函數,對傳入的資料進行判斷,符合條件的資料會被留下
  4. keyBy:根據指定的key進行分組,相同key的資料會進入同一個分區【典型用法見備注】
  5. reduce:對資料進行聚合操作,結合目前元素和上一次reduce傳回的值進行聚合操作,然後傳回一個新的值
  6. aggregations:sum(),min(),max()等
  7. window:在後面單獨詳解
  8. Union:合并多個流,新的流會包含所有流中的資料,但是union是一個限制,就是所有合并的流類型必須是一緻的。
  9. Connect:和union類似,但是隻能連接配接兩個流,兩個流的資料類型可以不同,會對兩個流中的資料應用不同的處理方法。
  10. CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函數,類似于map和flatmap
  11. Split:根據規則把一個資料流切分為多個流
  12. Select:和split配合使用,選擇切分後的流

代碼實作:

1、filter

package xuwei.tech.streaming.streamAPI;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import xuwei.tech.streaming.custormSource.MyNoParalleSource;

/**
 * Filter示範
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoFilter {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取資料源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度隻能設定為1

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("原始接收到資料:" + value);
                return value;
            }
        });

        //執行filter過濾,滿足條件的資料會被留下
        DataStream<Long> filterData = num.filter(new FilterFunction<Long>() {
            //把所有的奇數過濾掉
            @Override
            public boolean filter(Long value) throws Exception {
                return value % 2 == 0;
            }
        });

        DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("過濾之後的資料:" + value);
                return value;
            }
        });


        //每2秒鐘處理一次資料
        DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0);

        //列印結果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoFilter.class.getSimpleName();
        env.execute(jobName);
    }
}

折疊            

2、Split

package xuwei.tech.streaming.streamAPI;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import xuwei.tech.streaming.custormSource.MyNoParalleSource;

import java.util.ArrayList;

/**
 * split
 *
 * 根據規則把一個資料流切分為多個流
 *
 * 應用場景:
 * 可能在實際工作中,源資料流中混合了多種類似的資料,多種類型的資料處理規則不一樣,是以就可以在根據一定的規則,
 * 把一個資料流切分成多個資料流,這樣每個資料流就可以使用不用的處理邏輯了
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoSplit {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取資料源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度隻能設定為1

        //對流進行切分,按照資料的奇偶性進行區分
        SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
            @Override
            public Iterable<String> select(Long value) {
                ArrayList<String> outPut = new ArrayList<>();
                if (value % 2 == 0) {
                    outPut.add("even");//偶數
                } else {
                    outPut.add("odd");//奇數
                }
                return outPut;
            }
        });
        
        //選擇一個或者多個切分後的流
        DataStream<Long> evenStream = splitStream.select("even");
        DataStream<Long> oddStream = splitStream.select("odd");

        DataStream<Long> moreStream = splitStream.select("odd","even");


        //列印結果
        moreStream.print().setParallelism(1);

        String jobName = StreamingDemoSplit.class.getSimpleName();
        env.execute(jobName);
    }
}


折疊            

3、union(注意兩個資料源類型必須相同)

package xuwei.tech.streaming.streamAPI;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import xuwei.tech.streaming.custormSource.MyNoParalleSource;

/**
 * union
 * 合并多個流,新的流會包含所有流中的資料,但是union是一個限制,就是所有合并的流類型必須是一緻的
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoUnion {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取資料源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度隻能設定為1

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        //把text1和text2組裝到一起
        DataStream<Long> text = text1.union(text2);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("原始接收到資料:" + value);
                return value;
            }
        });



        //每2秒鐘處理一次資料
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);

        //列印結果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoUnion.class.getSimpleName();
        env.execute(jobName);
    }
}


           

4、Connect(可以合并兩種類型不一樣的資料流)

package xuwei.tech.streaming.streamAPI;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import xuwei.tech.streaming.custormSource.MyNoParalleSource;

/**
 * connect
 * 和union類似,但是隻能連接配接兩個流,兩個流的資料類型可以不同,會對兩個流中的資料應用不同的處理方法
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoConnect {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取資料源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度隻能設定為1

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "str_" + value;
            }
        });

        ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);

        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
            @Override
            public Object map1(Long value) throws Exception {
                return value;
            }

            @Override
            public Object map2(String value) throws Exception {
                return value;
            }
        });


        //列印結果
        result.print().setParallelism(1);

        String jobName = StreamingDemoConnect.class.getSimpleName();
        env.execute(jobName);
    }
}

折疊            

5、broadcast(broadcast分區規則)

package xuwei.tech.streaming.streamAPI;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import xuwei.tech.streaming.custormSource.MyNoParalleSource;

/**
 *  broadcast分區規則
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoWithMyNoPralalleSourceBroadcast {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //擷取資料源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度隻能設定為1

        DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                long id = Thread.currentThread().getId();
                System.out.println("線程id:"+id+",接收到資料:" + value);
                return value;
            }
        });

        //每2秒鐘處理一次資料
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);

        //列印結果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();
        env.execute(jobName);
    }
}

           

總結:

Flink 狀态(State)管理與恢複

一. 狀态(State)

介紹:

  • 我們前面寫的word count的例子,沒有包含狀态管理。如果一個task在處理過程中挂掉了,那麼它在記憶體中的狀态都會丢失,所有的資料都需要重新計算。從容錯和消息處理的語義上(at least once, exactly once),Flink引入了state和checkpoint。
  • 首先區分一下兩個概念
  • state一般指一個具體的task/operator的狀态【state資料預設儲存在java的堆記憶體中】
  • 而checkpoint【可以了解為checkpoint是把state資料持久化存儲了】,則表示了一個Flink Job在一個特定時刻的一份全局狀态快照,即包含了所有task/operator的狀态
  • 注意:task是Flink中執行的基本機關。operator指算子(transformation)。
  • State可以被記錄,在失敗的情況下資料還可以恢複
  • Flink中有兩種基本類型的State
  • Keyed State
  • Operator State
  • Keyed State和Operator State,可以以兩種形式存在:
  • 原始狀态(raw state)
  • 托管狀态(managed state)
  • 托管狀态是由Flink架構管理的狀态
  • 而原始狀态,由使用者自行管理狀态具體的資料結構,架構在做checkpoint的時候,使用byte[]來讀寫狀态内容,對其内部資料結構一無所知。
  • 通常在DataStream上的狀态推薦使用托管的狀态,當實作一個使用者自定義的operator時,會使用到原始狀态。

1. State-Keyed State

  • 顧名思義,就是基于KeyedStream上的狀态。這個狀态是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state。
  • stream.keyBy(…)
  • 儲存state的資料結構
  • ValueState<T>:即類型為T的單值狀态。這個狀态與對應的key綁定,是最簡單的狀态了。它可以通過update方法更新狀态值,通過value()方法擷取狀态值
  • ListState<T>:即key上的狀态值為一個清單。可以通過add方法往清單中附加值;也可以通過get()方法傳回一個Iterable<T>來周遊狀态值
  • ReducingState<T>:這種狀态通過使用者傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最後合并到一個單一的狀态值
  • MapState<UK, UV>:即狀态值為一個map。使用者通過put或putAll方法添加元素
  • 需要注意的是,以上所述的State對象,僅僅用于與狀态進行互動(更新、删除、清空等),而真正的狀态值,有可能是存在記憶體、磁盤、或者其他分布式存儲系統中。相當于我們隻是持有了這個狀态的句柄

2. State-Operator State

  • 與Key無關的State,與Operator綁定的state,整個operator隻對應一個state
  • 儲存state的資料結構
  • ListState<T>
  • 舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector執行個體中,儲存該執行個體中消費topic的所有(partition, offset)映射

二、狀态容錯

1、checkpoint

  • 依靠checkPoint機制
  • 保證exactly-once
  • 隻能保證Flink系統内的exactly-once
  • 對于source和sink需要依賴外部的元件一同保證

checkPoint介紹:

  • 為了保證state的容錯性,Flink需要對state進行checkpoint。
  • Checkpoint是Flink實作容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀态來生成快照,進而将這些狀态資料定期持久化存儲下來,當Flink程式一旦意外崩潰時,重新運作程式時可以有選擇地從這些快照進行恢複,進而修正因為故障帶來的程式資料異常
  • Flink的checkpoint機制可以與(stream和state)的持久化存儲互動的前提:
  • 持久化的source,它需要支援在一定時間内重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或檔案系統(比如HDFS,S3,GFS等)
  • 用于state的持久化存儲,例如分布式檔案系統(比如HDFS,S3,GFS等)

checkPoint配置:

  • 預設checkpoint功能是disabled的,想要使用的時候需要先啟用
  • checkpoint開啟之後,預設的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有兩種,Exactly-once和At-least-once
  • Exactly-once對于大多數應用來說是最合适的。At-least-once可能用在某些延遲超低的應用程式(始終延遲為幾毫秒)

checkpoint配置conf

  • 預設checkpoint功能是disabled的,想要使用的時候需要先啟用
  • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • // 每隔1000 ms進行啟動一個檢查點【設定checkpoint的周期】
  • env.enableCheckpointing(1000);
  • // 進階選項:
  • // 設定模式為exactly-once (這是預設值)
  • env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • // 確定檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
  • env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • // 檢查點必須在一分鐘内完成,或者被丢棄【checkpoint的逾時時間】
  • env.getCheckpointConfig().setCheckpointTimeout(60000);
  • // 同一時間隻允許進行一個檢查點
  • env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • // 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint【詳細解釋見備注】
  • env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2、State Backend(狀态的後端存儲)

介紹:

  • 預設情況下,state會儲存在taskmanager的記憶體中,checkpoint會存儲在JobManager的記憶體中。
  • state 的store和checkpoint的位置取決于State Backend的配置
  • env.setStateBackend(…)
  • 一共有三種State Backend
  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

分類:

1、 MemoryStateBackend

  • state資料儲存在java堆記憶體中,執行checkpoint的時候,會把state的快照資料儲存到jobmanager的記憶體中
  • 基于記憶體的state backend在生産環境下不建議使用

2、 FsStateBackend

  • state資料儲存在taskmanager的記憶體中,執行checkpoint的時候,會把state的快照資料儲存到配置的檔案系統中
  • 可以使用hdfs等分布式檔案系統

3、RocksDBStateBackend

  • RocksDB跟上面的都略有不同,它會在本地檔案系統中維護狀态,state會直接寫入本地rocksdb中。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候,會把本地的資料直接複制到filesystem中。fail over的時候從filesystem中恢複到本地
  • RocksDB克服了state受記憶體限制的缺點,同時又能夠持久化到遠端檔案系統中,比較适合在生産中使用

state配置:

修改State Backend的兩種方式

  • 第一種:單任務調整
  • 修改目前任務代碼
  • env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
  • 或者new MemoryStateBackend()
  • 或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
  • 第二種:全局調整
  • 修改flink-conf.yaml
  • state.backend: filesystem
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • 注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

三、Restart Strategies(重新開機政策)

介紹:

  • Flink支援不同的重新開機政策,以在故障發生時控制作業如何重新開機
  • 叢集在啟動時會伴随一個預設的重新開機政策,在沒有定義具體重新開機政策時會使用該預設政策。 如果在工作送出時指定了一個重新開機政策,該政策會覆寫叢集的預設政策
  • 預設的重新開機政策可以通過 Flink 的配置檔案 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個政策被使用。
  • 常用的重新開機政策
  • 固定間隔 (Fixed delay)
  • 失敗率 (Failure rate)
  • 無重新開機 (No restart)
  • 如果沒有啟用 checkpointing,則使用無重新開機 (no restart) 政策。
  • 如果啟用了 checkpointing,但沒有配置重新開機政策,則使用固定間隔 (fixed-delay) 政策,其中 Integer.MAX_VALUE 參數是嘗試重新開機次數
  • 重新開機政策可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動态指定,會覆寫全局配置

分類:

  • 1、重新開機政策之固定間隔 (Fixed delay)
  • 第一種:全局配置 flink-conf.yaml
  • restart-strategy: fixed-delay
  • restart-strategy.fixed-delay.attempts: 3
  • restart-strategy.fixed-delay.delay: 10 s
  • 第二種:應用代碼設定
  • env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  • 3, // 嘗試重新開機的次數
  • Time.of(10, TimeUnit.SECONDS) // 間隔 ));

2、重新開機政策之失敗率 (Failure rate)

  • 第一種:全局配置 flink-conf.yaml
  • restart-strategy: failure-rate
  • restart-strategy.failure-rate.max-failures-per-interval: 3
  • restart-strategy.failure-rate.failure-rate-interval: 5 min
  • restart-strategy.failure-rate.delay: 10 s
  • 第二種:應用代碼設定
  • env.setRestartStrategy(RestartStrategies.failureRateRestart(
  • 3, // 一個時間段内的最大失敗次數
  • Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段
  • Time.of(10, TimeUnit.SECONDS) // 間隔 ));

3、重新開機政策之無重新開機 (No restart)

  • 第一種:全局配置 flink-conf.yaml
  • restart-strategy: none
  • 第二種:應用代碼設定
  • env.setRestartStrategy(RestartStrategies.noRestart());

4、儲存多個Checkpoint

  • 預設情況下,如果設定了Checkpoint選項,則Flink隻保留最近成功生成的1個Checkpoint,而當Flink程式失敗時,可以從最近的這個Checkpoint來進行恢複。但是,如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢複,這樣會更加靈活,比如,我們發現最近4個小時資料記錄處理有問題,希望将整個狀态還原到4小時之前
  • Flink可以支援保留多個Checkpoint,需要在Flink的配置檔案conf/flink-conf.yaml中,添加如下配置,指定最多需要儲存Checkpoint的個數
  • state.checkpoints.num-retained: 20
  • 這樣設定以後就檢視對應的Checkpoint在HDFS上存儲的檔案目錄
  • hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
  • 如果希望回退到某個Checkpoint點,隻需要指定對應的某個Checkpoint路徑即可實作

6、從Checkpoint進行恢複

  • 如果Flink程式異常失敗,或者最近一段時間内資料處理錯誤,我們可以将程式從某一個Checkpoint點進行恢複
  • bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
  • 程式正常運作後,還會按照Checkpoint配置進行運作,繼續生成Checkpoint資料

7、savePoint

介紹:

  • Flink通過Savepoint功能可以做到程式更新後,繼續從更新前的那個點開始執行計算,保證資料不中斷
  • 全局,一緻性快照。可以儲存資料源offset,operator操作狀态等資訊
  • 可以從應用在過去任意做了savepoint的時刻開始繼續消費

配置使用:

  1. 在flink-conf.yaml中配置Savepoint存儲位置
  • 不是必須設定,但是設定後,後面建立指定Job的Savepoint時,可以不用在手動執行指令時指定Savepoint的位置
  • state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
  1. 觸發一個savepoint【直接觸發或者在cancel的時候觸發】
  • bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
  • bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
  1. 從指定的savepoint啟動job bin/flink run -s savepointPath [runArgs]

總結:checkPoint vs savePoint

  1. checkPoint
  • 應用定時觸發,用于儲存狀态,會過期
  • 内部應用失敗重新開機的時候使用
  1. savePoint
  • 使用者手動執行,是指向Checkpoint的指針,不會過期
  • 在更新的情況下使用
  • 注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利更新,強烈推薦程式員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 将用于确定每一個算子的狀态範圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。隻要這些 ID 沒有改變就能從儲存點(savepoint)将程式恢複回來。而這些自動生成的 ID 依賴于程式的結構,并且對代碼的更改是很敏感的。是以,強烈建議使用者手動的設定 ID。

9、checkpoint(檢查點)

package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * checkpoint
 *
 * Created by xuwei.tech on 2018/10/8.
 */
public class SocketWindowWordCountJavaCheckPoint {

    public static void main(String[] args) throws Exception{
        //擷取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9000;
        }

        //擷取flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔1000 ms進行啟動一個檢查點【設定checkpoint的周期】
        env.enableCheckpointing(1000);
        // 進階選項:
        // 設定模式為exactly-once (這是預設值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 確定檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 檢查點必須在一分鐘内完成,或者被丢棄【checkpoint的逾時時間】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一時間隻允許進行一個檢查點
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint【詳細解釋見備注】
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程式被cancel後,會删除Checkpoint資料,隻有job執行失敗的時候才會儲存checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        //設定statebackend

        //env.setStateBackend(new MemoryStateBackend());
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

        String hostname = "hadoop100";
        String delimiter = "\n";
        //連接配接socket擷取輸入的資料
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // a a c

        // a 1
        // a 1
        // c 1
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間視窗大小為2秒,指定時間間隔為1秒
                .sum("count");//在這裡使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把資料列印到控制台并且設定并行度
        windowCounts.print().setParallelism(1);

        //這一行代碼一定要實作,否則程式不執行
        env.execute("Socket window count");

    }

    public static class WordWithCount{
        public String word;
        public long count;
        public  WordWithCount(){}
        public WordWithCount(String word,long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

折疊            

三、DataStream API之partition

介紹:

  1. Random partitioning:随機分區
  2. dataStream.shuffle()
  3. Rebalancing:對資料集進行再平衡,重分區,消除資料傾斜
  4. dataStream.rebalance()
  5. Rescaling:解釋見備注
  6. dataStream.rescale()
  7. Custom partitioning:自定義分區
  8. 自定義分區需要實作Partitioner接口
  9. dataStream.partitionCustom(partitioner, "someKey")
  10. 或者dataStream.partitionCustom(partitioner, 0);
  11. Broadcasting:在後面單獨詳解

代碼實作:

1、Partitioner

①建立分區類

package xuwei.tech.streaming.custormPartition;

import org.apache.flink.api.common.functions.Partitioner;

/**
 * Created by xuwei.tech on 2018/10/23.
 */
public class MyPartition implements Partitioner<Long> {
    @Override
    public int partition(Long key, int numPartitions) {
        System.out.println("分區總數:"+numPartitions);
        if(key % 2 == 0){
            return 0;
        }else{
            return 1;
        }
    }
}


           

②實作分區類的對象化

package xuwei.tech.streaming.custormPartition;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import xuwei.tech.streaming.custormSource.MyNoParalleSource;

/**
 *
 * 使用自定義分析
 * 根據數字的奇偶性來分區
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class SteamingDemoWithMyParitition {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());

        //對資料進行轉換,把long類型轉成tuple1類型
        DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() {
            @Override
            public Tuple1<Long> map(Long value) throws Exception {
                return new Tuple1<>(value);
            }
        });
        //分區之後的資料
        DataStream<Tuple1<Long>> partitionData= tupleData.partitionCustom(new MyPartition(), 0);

        DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() {
            @Override
            public Long map(Tuple1<Long> value) throws Exception {
                System.out.println("目前線程id:" + Thread.currentThread().getId() + ",value: " + value);
                return value.getField(0);
            }
        });

        result.print().setParallelism(1);

        env.execute("SteamingDemoWithMyParitition");

    }
}


           

四、DataStream API之Data Sink(資料落地)

介紹:

  1. writeAsText():将元素以字元串形式逐行寫入,這些字元串通過調用每個元素的toString()方法來擷取
  2. print() / printToErr():列印每個元素的toString()方法的值到标準輸出或者标準錯誤輸出流中
  3. 自定義輸出addSink【kafka、redis】

1、内置Connectors

  1. Apache Kafka (source/sink)
  2. Apache Cassandra (sink)
  3. Elasticsearch (sink)
  4. Hadoop FileSystem (sink)
  5. RabbitMQ (source/sink)
  6. Apache ActiveMQ (source/sink)
  7. Redis (sink)

2. 自定義sink

  1. 實作自定義的sink
  2. 實作SinkFunction接口
  3. 或者繼承RichSinkFunction
  4. 參考org.apache.flink.streaming.connectors.redis.RedisSink

代碼

1、落地到Redis

package xuwei.tech.streaming.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * 接收socket資料,把資料儲存到redis中
 *
 * list
 *
 * lpush list_key value
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingDemoToRedis {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");

        //lpsuh l_words word

        //對資料進行組裝,把string轉化為tuple2<String,String>
        DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                return new Tuple2<>("l_words", value);
            }
        });

        //建立redis的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();

        //建立redissink
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());

        l_wordsData.addSink(redisSink);

        env.execute("StreamingDemoToRedis");
    }

    public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>>{
        //表示從接收的資料中擷取需要操作的redis key
        @Override
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
        //表示從接收的資料中擷取需要操作的redis value
        @Override
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }
    }

}


折疊            

2、落地到kafka(生産者)

package xuwei.tech.streaming;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

/**
 * kafkaSink
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingKafkaSink {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //checkpoint配置
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //設定statebackend

        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));


        DataStreamSource<String> text = env.socketTextStream("hadoop100", 9001, "\n");

        String brokerList = "hadoop110:9092";
        String topic = "t1";

        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers",brokerList);

        //第一種解決方案,設定FlinkKafkaProducer011裡面的事務逾時時間
        //設定事務逾時時間
        //prop.setProperty("transaction.timeout.ms",60000*15+"");

        //第二種解決方案,設定kafka的最大事務逾時時間

        //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());

        //使用僅一次語義的kafkaProducer
        FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
        text.addSink(myProducer);


        env.execute("StreamingFromCollection");


    }
}
折疊            

3、消費kafka資料

package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.ArrayList;
import java.util.Properties;

/**
 * kafkaSource
 *
 * Created by xuwei.tech on 2018/10/23.
 */
public class StreamingKafkaSource {

    public static void main(String[] args) throws Exception {
        //擷取Flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //checkpoint配置
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //設定statebackend

        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));


        String topic = "t1";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","hadoop110:9092");
        prop.setProperty("group.id","con1");

        FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);

        myConsumer.setStartFromGroupOffsets();//預設消費政策

        DataStreamSource<String> text = env.addSource(myConsumer);

        text.print().setParallelism(1);

        env.execute("StreamingFromCollection");


    }
}


折疊            

五、DataStream API之watermark

介紹:

  • 在使用eventTime的時候如何處理亂序資料?
  • 我們知道,流處理從事件産生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的資料都是按照事件産生的時間順序來的,但是也不排除由于網絡延遲等原因,導緻亂序的産生,特别是使用kafka的話,多個分區的資料無法保證有序。是以在進行window計算的時候,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特别的機制,就是watermark,watermark是用于處理亂序事件的。
  • watermark可以翻譯為水位線

代碼:

1、Watermark案例1

package xuwei.tech.streaming.watermark;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;


/**
 *
 * Watermark 案例
 *
 * Created by xuwei.tech.
 */
public class StreamingWindowWatermark {

    public static void main(String[] args) throws Exception {
        //定義socket的端口号
        int port = 9000;
        //擷取運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設定使用eventtime,預設是使用processtime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        //設定并行度為1,預設并行度是目前機器的cpu數量
        env.setParallelism(1);

        //連接配接socket擷取輸入的資料
        DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");

        //解析輸入的資料
        DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
            }
        });

        //抽取timestamp和生成watermark
        DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            /**
             * 定義生成watermark的邏輯
             * 預設100ms被調用一次
             */
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }

            //定義如何提取timestamp
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                long id = Thread.currentThread().getId();
                System.out.println("currentThreadId:"+id+",key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                        sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                return timestamp;
            }
        });

        DataStream<String> window = waterMarkStream.keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime配置設定視窗,和調用TimeWindow效果一樣
                .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                    /**
                     * 對window内的資料進行排序,保證資料的順序
                     * @param tuple
                     * @param window
                     * @param input
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                        String key = tuple.toString();
                        List<Long> arrarList = new ArrayList<Long>();
                        Iterator<Tuple2<String, Long>> it = input.iterator();
                        while (it.hasNext()) {
                            Tuple2<String, Long> next = it.next();
                            arrarList.add(next.f1);
                        }
                        Collections.sort(arrarList);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
                                + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
                        out.collect(result);
                    }
                });
        //測試-把結果列印到控制台即可
        window.print();

        //注意:因為flink是懶加載的,是以必須調用execute方法,上面的代碼才會執行
        env.execute("eventtime-watermark");

    }



}
折疊            

2、Watermark案例2

package xuwei.tech.streaming.watermark;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;


/**
 *
 * Watermark 案例
 *
 * sideOutputLateData 收集遲到的資料
 *
 * Created by xuwei.tech.
 */
public class StreamingWindowWatermark2 {

    public static void main(String[] args) throws Exception {
        //定義socket的端口号
        int port = 9000;
        //擷取運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設定使用eventtime,預設是使用processtime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //設定并行度為1,預設并行度是目前機器的cpu數量
        env.setParallelism(1);

        //連接配接socket擷取輸入的資料
        DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");

        //解析輸入的資料
        DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
            }
        });

        //抽取timestamp和生成watermark
        DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            /**
             * 定義生成watermark的邏輯
             * 預設100ms被調用一次
             */
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }

            //定義如何提取timestamp
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                        sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                return timestamp;
            }
        });

        //儲存被丢棄的資料
        OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
        //注意,由于getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,是以這裡的類型,不能使用它的父類dataStream。
        SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime配置設定視窗,和調用TimeWindow效果一樣
                //.allowedLateness(Time.seconds(2))//允許資料遲到2秒
                .sideOutputLateData(outputTag)
                .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                    /**
                     * 對window内的資料進行排序,保證資料的順序
                     * @param tuple
                     * @param window
                     * @param input
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                        String key = tuple.toString();
                        List<Long> arrarList = new ArrayList<Long>();
                        Iterator<Tuple2<String, Long>> it = input.iterator();
                        while (it.hasNext()) {
                            Tuple2<String, Long> next = it.next();
                            arrarList.add(next.f1);
                        }
                        Collections.sort(arrarList);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
                                + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
                        out.collect(result);
                    }
                });
        //把遲到的資料暫時列印到控制台,實際中可以儲存到其他存儲媒體中
        DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
        sideOutput.print();
        //測試-把結果列印到控制台即可
        window.print();

        //注意:因為flink是懶加載的,是以必須調用execute方法,上面的代碼才會執行
        env.execute("eventtime-watermark");

    }



}

折疊            

六、Flink Window和Time詳解

①window操作與介紹

介紹:

  1. window介紹:
  • 聚合事件(比如計數、求和)在流上的工作方式與批處理不同。
  • 比如,對流中的所有元素進行計數是不可能的,因為通常流是無限的(無界的)。是以,流上的聚合需要由 window 來劃定範圍,比如 “計算過去的5分鐘” ,或者 “最後100個元素的和” 。
  • window是一種可以把無限資料切割為有限資料塊的手段
  • 視窗可以是 時間驅動的 【Time Window】(比如:每30秒)或者 資料驅動的【Count Window】 (比如:每100個元素)。
  1. window類型
  • 視窗通常被區分為不同的類型:
  • tumbling windows:滾動視窗 【沒有重疊】
  • sliding windows:滑動視窗 【有重疊】
  • session windows:會話視窗

代碼:

1、window 全量聚合

介紹:

  • 全量聚合
  • 等屬于視窗的資料到齊,才開始進行聚合計算【可以實作對視窗内的資料進行排序等需求】
  • apply(windowFunction)
  • process(processWindowFunction)
  • processWindowFunction比windowFunction提供了更多的上下文資訊。
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * window 全量聚合
 */
public class SocketDemoFullCount {

    public static void main(String[] args) throws Exception{
        //擷取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9000;
        }

        //擷取flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "hadoop100";
        String delimiter = "\n";
        //連接配接socket擷取輸入的資料
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String value) throws Exception {
                return new Tuple2<>(1,Integer.parseInt(value));
            }
        });

        intData.keyBy(0)
                .timeWindow(Time.seconds(5))
                .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)
                            throws Exception {
                        System.out.println("執行process。。。");
                        long count = 0;
                        for(Tuple2<Integer,Integer> element: elements){
                            count++;
                        }
                        out.collect("window:"+context.window()+",count:"+count);
                    }
                }).print();


        //這一行代碼一定要實作,否則程式不執行
        env.execute("Socket window count");

    }
}

折疊            

2、window增量聚合

介紹:

  • 增量聚合
  • 視窗中每進入一條資料,就進行一次計算
  • reduce(reduceFunction)
  • aggregate(aggregateFunction)
  • sum(),min(),max()
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * window 增量聚合
 */
public class SocketDemoIncrAgg {

    public static void main(String[] args) throws Exception{
        //擷取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9000;
        }

        //擷取flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "hadoop100";
        String delimiter = "\n";
        //連接配接socket擷取輸入的資料
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
            @Override
            public Tuple2<Integer,Integer> map(String value) throws Exception {
                return new Tuple2<>(1,Integer.parseInt(value));
            }
        });

        intData.keyBy(0)
                .timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                        System.out.println("執行reduce操作:"+value1+","+value2);
                        return new Tuple2<>(value1.f0,value1.f1+value2.f1);
                    }
                }).print();


        //這一行代碼一定要實作,否則程式不執行
        env.execute("Socket window count");

    }
}

折疊            

8、視窗滑動計算

package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 滑動視窗計算
 *
 * 通過socket模拟産生單詞資料
 * flink對資料進行統計計算
 *
 * 需要實作每隔1秒對最近2秒内的資料進行彙總計算
 *
 *
 * Created by xuwei.tech on 2018/10/8.
 */
public class SocketWindowWordCountJava {

    public static void main(String[] args) throws Exception{
        //擷取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9000;
        }

        //擷取flink的運作環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "hadoop100";
        String delimiter = "\n";
        //連接配接socket擷取輸入的資料
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // a a c

        // a 1
        // a 1
        // c 1
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間視窗大小為2秒,指定時間間隔為1秒
                .sum("count");//在這裡使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把資料列印到控制台并且設定并行度
        windowCounts.print().setParallelism(1);

        //這一行代碼一定要實作,否則程式不執行
        env.execute("Socket window count");

    }

    public static class WordWithCount{
        public String word;
        public long count;
        public  WordWithCount(){}
        public WordWithCount(String word,long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

}

折疊            

window總結與感悟:

②time操作與介紹

介紹:

  • 針對stream資料中的時間,可以分為以下三種
  • Event Time:事件産生的時間,它通常由事件中的時間戳描述。
  • Ingestion time:事件進入Flink的時間
  • Processing Time:事件被處理時目前系統的時間

代碼:

1、Processing Time

package myflink.job;

import com.alibaba.fastjson.JSON;
import myflink.model.UrlInfo;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Date;
import java.util.Properties;

public class WindowTest {

    public static void main(String[] args) throws Exception {

        // 從kafka中擷取資料
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
                new FlinkKafkaConsumer010<String>(
                        "testjin",// topic
                        new SimpleStringSchema(),
                        properties
                )
        ).setParallelism(1)
                // map操作,轉換,從一個資料流轉換成另一個資料流,這裡是從string-->UrlInfo
                .map(string -> {
                    UrlInfo urlInfo = JSON.parseObject(string, UrlInfo.class);
                    urlInfo.setDomain(urlInfo.generateDomain());
                    return urlInfo;
                });

        // 根據domain做keyby
        KeyedStream<UrlInfo, String> keyedStream = dataStreamSource.keyBy(new KeySelector<UrlInfo, String>() {
            @Override
            public String getKey(UrlInfo urlInfo) throws Exception {
                return urlInfo.getDomain();
            }
        });

        // 設定時間類型為Processing Time
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 使用timeWindow
        SingleOutputStreamOperator<UrlInfo> windowReduceStream = keyedStream.timeWindow(Time.seconds(30))
        .reduce((ReduceFunction<UrlInfo>) (t1, t2) -> {
            UrlInfo urlInfo = new UrlInfo();

            // domain都是同一個partition,是以都是同一個
            urlInfo.setDomain(t1.getDomain());
            urlInfo.setUrl(urlInfo.getDomain() + "/reduce/" + DateFormatUtils.format(new Date(),"yyyy-MM-dd'T'HH:mm:ss"));
            urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));

            urlInfo.setCount(t1.getCount() + 1);// 在reduce中做累加計數

            return urlInfo;
        }).returns(UrlInfo.class);

        windowReduceStream.addSink(new PrintSinkFunction<>());

        env.execute("execute window reduce info");
    }
}           

繼續閱讀