天天看點

Flink從入門到放棄(入門篇3)-DataSetAPI

首先我們來看一下程式設計結構:

程式設計結構

public class SocketTextStreamWordCount {

    public static void main(String[] args) throws Exception {
        if (args.length != 2){
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
        String hostName = args[0];
        Integer port = Integer.parseInt(args[1]);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        DataStream<String> text = env.socketTextStream(hostName, port);

        DataStream<Tuple2<String, Integer>> counts 
        text.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);
        counts.print();
        env.execute("Java WordCount from SocketTextStream Example");
    }           

上面的

SocketTextStreamWordCount

是一個典型的Flink程式,他由一下及格部分構成:

  • 獲得一個execution environment,
  • 加載/建立初始資料,
  • 指定此資料的轉換,
  • 指定放置計算結果的位置,
  • 觸發程式執行

DataSet API

分類:

  • Source: 資料源建立初始資料集,例如來自檔案或Java集合
  • Transformation: 資料轉換将一個或多個DataSet轉換為新的DataSet
  • Sink: 将計算結果存儲或傳回

DataSet Sources

基于檔案的

  • readTextFile(path)/ TextInputFormat

    - 按行讀取檔案并将其作為字元串傳回。
  • readTextFileWithValue(path)/ TextValueInputFormat

    - 按行讀取檔案并将它們作為StringValues傳回。StringValues是可變字元串。
  • readCsvFile(path)/ CsvInputFormat

    - 解析逗号(或其他字元)分隔字段的檔案。傳回元組或POJO的DataSet。支援基本java類型及其Value對應作為字段類型。
  • readFileOfPrimitives(path, Class)/ PrimitiveInputFormat

    - 解析新行(或其他字元序列)分隔的原始資料類型(如String或)的檔案Integer。
  • readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat

    - 解析新行(或其他字元序列)分隔的原始資料類型的檔案,例如String或Integer使用給定的分隔符。
  • readSequenceFile(Key, Value, path)/ SequenceFileInputFormat

    - 建立一個JobConf并從類型為SequenceFileInputFormat,Key class和Value類的指定路徑中讀取檔案,并将它們作為Tuple2 傳回。

基于集合

  • fromCollection(Collection)

    - 從Java Java.util.Collection建立資料集。集合中的所有資料元必須屬于同一類型。
  • fromCollection(Iterator, Class)

    - 從疊代器建立資料集。該類指定疊代器傳回的資料元的資料類型。
  • fromElements(T ...)

    - 根據給定的對象序列建立資料集。所有對象必須屬于同一類型。
  • fromParallelCollection(SplittableIterator, Class)

    - 并行地從疊代器建立資料集。該類指定疊代器傳回的資料元的資料類型。
  • generateSequence(from, to)

    - 并行生成給定間隔中的數字序列。

通用方法

  • readFile(inputFormat, path)/ FileInputFormat

    - 接受檔案輸入格式。
  • createInput(inputFormat)/ InputFormat

    - 接受通用輸入格式。

代碼示例

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 從本地檔案系統讀
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

// 讀取HDFS檔案
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

// 讀取CSV檔案
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class);

// 讀取CSV檔案中的部分
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class);

// 讀取CSV映射為一個java類
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode");

// 讀取一個指定位置序列化好的檔案
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// 從輸入字元建立
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

// 建立一個數字序列
DataSet<Long> numbers = env.generateSequence(1, 10000000);

// 從關系型資料庫讀取
DataSet<Tuple2<String, Integer> dbData =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")                   .setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish());           

DataSet Transformation

詳細可以參考官網: https://flink.sojb.cn/dev/batch/dataset_transformations.html#filter
  • Map

采用一個資料元并生成一個資料元。

data.map(new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});           
  • FlatMap

采用一個資料元并生成零個,一個或多個資料元。

data.flatMap(new FlatMapFunction<String, String>() {
  public void flatMap(String value, Collector<String> out) {
    for (String s : value.split(" ")) {
      out.collect(s);
    }
  }
});           
  • MapPartition

在單個函數調用中轉換并行分區。該函數将分區作為Iterable流來擷取,并且可以生成任意數量的結果值。每個分區中的資料元數量取決于并行度和先前的 算子操作。

data.mapPartition(new MapPartitionFunction<String, Long>() {
  public void mapPartition(Iterable<String> values, Collector<Long> out) {
    long c = 0;
    for (String s : values) {
      c++;
    }
    out.collect(c);
  }
});           
  • Filter

計算每個資料元的布爾函數,并儲存函數傳回true的資料元。

重要資訊:系統假定該函數不會修改應用謂詞的資料元。違反此假設可能會導緻錯誤的結果。

data.filter(new FilterFunction<Integer>() {
  public boolean filter(Integer value) { return value > 1000; }
});           
  • Reduce

通過将兩個資料元重複組合成一個資料元,将一組資料元組合成一個資料元。Reduce可以應用于完整資料集或分組資料集。

data.reduce(new ReduceFunction<Integer> {
  public Integer reduce(Integer a, Integer b) { return a + b; }
});           

如果将reduce應用于分組資料集,則可以通過提供CombineHintto 來指定運作時執行reduce的組合階段的方式 setCombineHint。在大多數情況下,基于散列的政策應該更快,特别是如果不同鍵的數量與輸入資料元的數量相比較小(例如1/10)。

  • ReduceGroup

将一組資料元組合成一個或多個資料元。ReduceGroup可以應用于完整資料集或分組資料集。

data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
  public void reduce(Iterable<Integer> values, Collector<Integer> out) {
    int prefixSum = 0;
    for (Integer i : values) {
      prefixSum += i;
      out.collect(prefixSum);
    }
  }
});           
  • Aggregate

将一組值聚合為單個值。聚合函數可以被認為是内置的reduce函數。聚合可以應用于完整資料集或分組資料集。

Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);           

您還可以使用簡寫文法進行最小,最大和總和聚合。

Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);           
  • Distinct

傳回資料集的不同資料元。它相對于資料元的所有字段或字段子集從輸入DataSet中删除重複條目。

data.distinct();           

使用reduce函數實作Distinct。您可以通過提供CombineHintto 來指定運作時執行reduce的組合階段的方式 setCombineHint。在大多數情況下,基于散列的政策應該更快,特别是如果不同鍵的數量與輸入資料元的數量相比較小(例如1/10)。

  • Join

通過建立在其鍵上相等的所有資料元對來連接配接兩個資料集。可選地使用JoinFunction将資料元對轉換為單個資料元,或使用FlatJoinFunction将資料元對轉換為任意多個(包括無)資料元。請參閱鍵部分以了解如何定義連接配接鍵。

result = input1.join(input2)
               .where(0)       // key of the first input (tuple field 0)
               .equalTo(1);    // key of the second input (tuple field 1)           

您可以通過Join Hints指定運作時執行連接配接的方式。提示描述了通過分區或廣播進行連接配接,以及它是使用基于排序還是基于散列的算法。

如果未指定提示,系統将嘗試估算輸入大小,并根據這些估計選擇最佳政策。

// This executes a join by broadcasting the first data set
// using a hash table for the broadcast data
result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
               .where(0).equalTo(1);           

請注意,連接配接轉換僅适用于等連接配接。其他連接配接類型需要使用OuterJoin或CoGroup表示。

  • OuterJoin

在兩個資料集上執行左,右或全外連接配接。外連接配接類似于正常(内部)連接配接,并建立在其鍵上相等的所有資料元對。此外,如果在另一側沒有找到比對的Keys,則儲存“外部”側(左側,右側或兩者都滿)的記錄。比對資料元對(或一個資料元和null另一個輸入的值)被賦予JoinFunction以将資料元對轉換為單個資料元,或者轉換為FlatJoinFunction以将資料元對轉換為任意多個(包括無)資料元。請參閱鍵部分以了解如何定義連接配接鍵。

input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
      .where(0)              // key of the first input (tuple field 0)
      .equalTo(1)            // key of the second input (tuple field 1)
      .with(new JoinFunction<String, String, String>() {
          public String join(String v1, String v2) {
             // NOTE:
             // - v2 might be null for leftOuterJoin
             // - v1 might be null for rightOuterJoin
             // - v1 OR v2 might be null for fullOuterJoin
          }
      });
           
  • CoGroup

reduce 算子操作的二維變體。将一個或多個字段上的每個輸入分組,然後關聯組。每對組調用轉換函數。

data1.coGroup(data2)
     .where(0)
     .equalTo(1)
     .with(new CoGroupFunction<String, String, String>() {
         public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) {
           out.collect(...);
         }
      });           
  • Cross

建構兩個輸入的笛卡爾積(交叉乘積),建立所有資料元對。可選擇使用CrossFunction将資料元對轉換為單個資料元

DataSet<Integer> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<Tuple2<Integer, String>> result = data1.cross(data2);           

注:交叉是一個潛在的非常計算密集型 算子操作它甚至可以挑戰大的計算叢集!建議使用crossWithTiny()和crossWithHuge()來提示系統的DataSet大小。

  • Union

生成兩個資料集的并集。

DataSet<String> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<String> result = data1.union(data2);           
  • Rebalance

均勻地Rebalance 資料集的并行分區以消除資料偏差。隻有類似Map的轉換可能會遵循Rebalance 轉換。

DataSet<String> in = // [...]
DataSet<String> result = in.rebalance()
                           .map(new Mapper());
                                      
  • Hash-Partition

散列分區給定鍵上的資料集。鍵可以指定為位置鍵,表達鍵和鍵選擇器函數。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByHash(0)
                            .mapPartition(new PartitionMapper());           
  • Range-Partition

Range-Partition給定鍵上的資料集。鍵可以指定為位置鍵,表達鍵和鍵選擇器函數。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByRange(0)
                            .mapPartition(new PartitionMapper());           
  • Custom Partitioning

手動指定資料分區。

注意:此方法僅适用于單個字段鍵。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)           
  • Sort Partition

本地按指定順序對指定字段上的資料集的所有分區進行排序。可以将字段指定為元組位置或字段表達式。通過連結sortPartition()調用來完成對多個字段的排序。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
                            .mapPartition(new PartitionMapper());           
  • First-n

傳回資料集的前n個(任意)資料元。First-n可以應用于正常資料集,分組資料集或分組排序資料集。分組鍵可以指定為鍵選擇器函數或字段位置鍵。

DataSet<Tuple2<String,Integer>> in = // [...]
// regular data set
DataSet<Tuple2<String,Integer>> result1 = in.first(3);
// grouped data set
DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0)                                     .first(3);
// grouped-sorted data set
DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0)                                     .sortGroup(1, Order.ASCENDING)                     .first(3);
           

DataSet Sink

資料接收器使用DataSet用于存儲或傳回。使用OutputFormat描述資料接收器算子操作 。Flink帶有各種内置輸出格式,這些格式封裝在DataSet上的算子操作中:

  • writeAsText()/ TextOutputFormat- 按字元串順序寫入資料元。通過調用每個資料元的toString()方法獲得字元串。
  • writeAsFormattedText()/ TextOutputFormat- 按字元串順序寫資料元。通過為每個資料元調用使用者定義的format()方法來擷取字元串。
  • writeAsCsv(...)/ CsvOutputFormat- 将元組寫為逗号分隔值檔案。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)- 在标準輸出/标準錯誤流上列印每個資料元的toString()值。可選地,可以提供字首(msg),其字首為輸出。這有助于區分不同的列印調用。如果并行度大于1,則輸出也将與生成輸出的任務的辨別符一起添加。
  • write()/ FileOutputFormat- 自定義檔案輸出的方法和基類。支援自定義對象到位元組的轉換。
  • output()/ OutputFormat- 大多數通用輸出方法,用于非基于檔案的資料接收器(例如将結果存儲在資料庫中)。

可以将DataSet輸入到多個 算子操作。程式可以編寫或列印資料集,同時對它們執行其他轉換。

示例:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });           

使用自定義輸出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );           

序列化器

  • Flink自帶了針對諸如int,long,String等标準類型的序列化器
  • 針對Flink無法實作序列化的資料類型,我們可以交給Avro和Kryo
  • 使用方法:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
使用avro序列化:env.getConfig().enableForceAvro();
使用kryo序列化:env.getConfig().enableForceKryo();
使用自定義序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
           

資料類型

  • Java Tuple 和 Scala case class
  • Java POJOs:java實體類
  • Primitive Types

    預設支援java和scala基本資料類型

  • General Class Types

    預設支援大多數java和scala class

  • Hadoop Writables

    支援hadoop中實作了org.apache.hadoop.Writable的資料類型

  • Special Types

    例如scala中的Either Option 和Try