天天看點

Apache Flink流處理(五)

本章介紹Flink的DataStream API的基礎知識。我們将展示一個标準的Flink流式應用程式的結構群組件,還會讨論Flink的類型系統及其支援的資料類型,并給出資料轉換和分區轉換。我們将在下一章将讨論視窗操作符【windows operator】、基于時間的轉換【time-based transformations】、有狀态操作符【stateful operators】和連接配接器【connectors】。在閱讀本章之後,您将了解如何實作一個具有基本功能的流處理應用程式。我們的執行個體代碼使用的是Scala,但是Java API基本上是類似的(例外或特殊情況,我們會特别指出的)。我們還在Github存儲庫中提供了用Java和Scala實作的完整示例應用程式。

5.1 Hello Flink

讓我們從一個簡單的例子開始,初步了解用DataStream API編寫流應用程式是什麼樣的。 我們将使用此示例來展示一個Flink程式的基本結構,并介紹DataStream API的一些重要特性。 我們的這個示例應用程式從多個傳感器攝取溫度測量的資料流。

首先,讓我們看一看用來表示傳感器讀數的資料類型:

case class SensorReading(
	  id: String,
	  timestamp: Long,
	  temperature: Double)
           

Example 5-1. Scala case class for sensor data(表示傳感器資料的Scala樣本類)

下面的程式将溫度從華氏溫度轉換為攝氏溫度,并為每個傳感器每五秒鐘計算一次平均溫度。

//V1.7 
// Scala object that defines the DataStream program in the main() method.
object AverageSensorReadings {

 // main() defines and executes the DataStream program
 def main(args: Array[String]) {

   // set up the streaming execution environment
   val env = StreamExecutionEnvironment.getExecutionEnvironment

   // use event time for the application
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

   // create a DataStream[SensorReading] from a stream source
   val sensorData: DataStream[SensorReading] = env
     // ingest sensor readings with a SensorSource SourceFunction
     .addSource(new SensorSource)
     // assign timestamps and watermarks (required for event time)
     .assignTimestampsAndWatermarks(new SensorTimeAssigner)

   val avgTemp: DataStream[SensorReading] = sensorData
     // convert Fahrenheit to Celsius with an inline lambda function
     .map( r => {
         val celsius = (r.temperature - 32) * (5.0 / 9.0)
         SensorReading(r.id, r.timestamp, celsius)
       } )
     // organize readings by sensor id
     .keyBy(_.id)
     // group readings in 5 second tumbling windows
     .timeWindow(Time.seconds(5))
     // compute average temperature using a user-defined function
     .apply(new TemperatureAverager)

   // print result stream to standard out
   avgTemp.print()

   // execute application
   env.execute("Compute average sensor temperature")
 }
}
           

Example 5-2. Compute the average temperature every 5 seconds for a stream of sensors

您可能已經注意到Flink程式是用正常的Scala或Java方法定義并送出執行的。 最常見的做法是在靜态main方法中完成。 在我們的示例中,我們定義了一個Object類:AverageSensorReadings ,并在main()中包含了大部分應用程式邏輯。

标準的流式Flink應用程式的結構由下面的這幾點組成:

  1. 設定執行環境
  2. 從資料源讀取一個或多個流
  3. 應用流式轉換來實作應用程式邏輯
  4. 可選地将結果輸出到一個或者多個資料接收器【data sinks】
  5. 執行程式(run)

現在我們使用上面的示例詳細研究流式Flink應用程式的各個組成部分。

5.1.1 Set up the Execution Environment:設定執行環境

Flink應用程式需要做的第一件事是設定其執行環境。執行環境決定該程式是在本地機器上運作還是在叢集上運作。在DataStream API中,應用程式的執行環境由StreamExecutionEnvironment來表現的。在我們的示例中,我們通過調用靜态getExecutionEnvironment()來檢出執行環境。該方法将傳回的是一個本地環境或遠端環境,具體取決于調用該方法的上下文。如果該方法的調用者是一個連接配接到遠端叢集的負責送出的用戶端,那麼該方法傳回的是一個遠端執行環境。否則,它傳回一個本地環境。

當然Flink也可以顯式地建立本地或遠端執行環境,如下所示:

// create a local stream execution environment
val localEnv: StreamExecutionEnvironment.createLocalEnvironment()

// create a remote stream execution environment
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
  "host",                // hostname of JobManager
  1234,                  // port of JobManager process
  "path/to/jarFile.jar)  // JAR file to ship to the JobManager
           

Example 5-3. Create a local or remote execution environment

傳輸到JobManager的JAR檔案必須包含執行流式應用程式所需的所有資源.

接下來,我們使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)來訓示我們的程式使用事件時間【event time】來解釋時間語義。執行環境還允許更多的配置選項,例如設定程式并行度和啟用容錯功能。

5.1.2 Read an input stream:讀取輸入流

一旦配置了執行環境,就應該執行一些實際工作并開始處理流了。StreamExecutionEnvironment提供了方法用于建立流源/發生器【stream source】,該流源/發生器可以将資料流【data streams】攝入到應用程式中。資料流【data streams】可以從消息隊列、檔案等源上擷取,也可以動态生成。

在本例中,我們這樣使用:

val sensorData: DataStream[SensorReading] = env.addSource(new SensorSource)
           

通過這樣的方法調用,我們可以連接配接到傳感器測量資料源/發生器并建立一個資料類型為SensorReading的初始DataStream。Flink支援多種資料類型,我們将在下一節中詳細講解。現在,我們使用在前面定義的Scala樣本類:SensorReading,來作為資料的類型。該樣本類SensorReading 包含:①傳感器的ID ②表示溫度測量時的時間戳 ③測量的溫度值。

接下來我們通過調用setParallelism(4)将輸入資料源的并行度配置為4,并使用 assignTimestampsAndWatermarks(new SensorTimeAssigner)配置設定事件時間【event time】所需的時間戳和水印。SensorTimeAssigner 的實作細節目前不需要關注,我們後面會講解。

5.1.3 應用轉換【Apply transformations】

一旦我們有了一個DataStream,我們就可以對它應用轉換。這裡有多種不同類型的轉換。有的轉換可以生成新的DataStream,甚至可能是生成不同類型的DataStream,而其他的轉換則不會修改DataStream中的記錄,而是通過分區或分組對其進行重新組織。應用程式的邏輯是通過串鍊轉換來定義的。

在我們的這個樣例中,我們首先應用了一個map()轉換,将每個傳感器讀數中的溫度讀數由華氏度轉換為攝氏度。然後,我們使用 keyBy()轉換來根據傳感器的ID對傳感器讀數進行分區。随後,我們定義了一個timeWindow()轉換,它将每個傳感器ID分區的傳感器讀數分組為每5秒一撮的滾動視窗。

val avgTemp: DataStream[SensorReading] = sensorData
     .map( r => {
         val celsius = (r.temperature - 32) * (5.0 / 9.0)
         SensorReading(r.id, r.timestamp, celsius)
       } )
     .keyBy(_.id)
     .timeWindow(Time.seconds(5))
     .apply(new TemperatureAverager)
           

下一章我們将較長的描述視窗轉換。最後,我們應用了一個使用者定義函數(UDF)來計算每個視窗的平均溫度。我們将在本章後面讨論如何在DataStream API中定義使用者定義函數(UDF)。

5.1.4 結果輸出

流式應用程式通常将結果釋出到一些外部系統,如Apache Kafka、檔案系統或資料庫。Flink提供了一個維護良好的流接收器【stream sinks】集合,可用于将資料寫入不同的系統。當然Flink也允許我們實作自己的流式接收器。還有一些應用程式不發出結果,而是将結果保留在内部,以便通過Flink的可查詢狀态特性【queryable state feature】提供結果。

在我們的示例中,結果是DataStream[SensorReading],其中的值是每5秒内的平均測量溫度。結果流會可以通過調用print()寫入到标準輸出。

avgTemp.print()
           

請注意,流式接收器的選擇會影響應用程式的端到端一緻性,即,應用程式的結果是具有至少一次語義【at-least once semantics】還是具有唯一語義【exactly-once semantics】。應用程式的端到端一緻性取決于所選流接收器與Flink檢查點算法的內建。我們将在第7章的“Application Consistency Guarantees”中更詳細地讨論這個主題。

5.1.5 執行

當應用程式被完全定義之後,可以通過調用StreamExecutionEnvironment.execute()來執行它。

env.execute("Compute average sensor temperature")
           

Flink程式是惰性執行的。也就是說,到目前為止,所有建立流資料源和轉換的方法都沒有産生任何資料處理。相反,執行環境建構了一個執行計劃,該計劃從從環境建立的所有流資料源開始,并包括對這些資料源臨時應用的所有轉換。隻有在調用execute()時,系統才觸發對執行計劃的執行。根據執行環境的類型,應用程式或者在本地執行,或者被發送到遠端JobManager執行。

構造的計劃被轉換成JobGraph ,并送出給JobManager 執行。根據執行環境的類型,JobManager 将作為本地線程(本地執行環境)啟動,或者将JobGraph 發送到遠端JobManager。如果JobManager在遠端運作,則必須将JobGraph與一個Jar檔案一起提供,該Jar檔案包含應用程式的所有類和所需的依賴項。

5.2 類型【Types】

Flink DataStream應用程式處理事件流,其中事件是以資料對象來表現的。在資料對象上應用函數,并發射出現資料對象。在内部,Flink需要能夠處理這些對象。需要對事件進行序列化和反序列化,以便通過網絡傳輸它們,或者将它們與狀态後端、檢查點和儲存點之間執行寫入和讀取互動。為了有效地做到這一點,Flink需要關于應用程式處理的資料類型的詳細知識。Flink具有類型資訊的概念,用于表示資料類型并為每種資料類型生成特定的序列化器、反序列化器和比較器。

Flink提供了一個類型提取系統,它分析函數的輸入和傳回類型,進而自動獲得類型資訊,進而獲得序列化器和反序列化器。然而,在某些情況下,例如lambda函數或泛型類型,有必要顯式地提供類型資訊,以使應用程式工作或提高其性能。

在本節中,我們将讨論Flink支援的類型,如何為資料類型建立類型資訊,以及如何在Flink的類型系統無法自動推斷函數的傳回類型時使用提示【hints 】來幫助它進行類型推斷。

5.2.1 支援的資料類型

Flink支援許多常見的資料類型。最廣泛使用的類型大緻可分為以下幾類:

  • Primitives
  • Java and Scala tuples
  • Scala case classes
  • POJOs, including classes generated by Apache Avro
  • Flink Value types
  • Some special types

沒有特别處理的類型被視為泛型類型,并使用Kryo序列化架構進行序列化。

讓我們通過示例研究每個類型類别。

5.2.1.1 PRIMITIVES(基本資料類型)

所有Java和Scala所支援的基本類型,如Int(或Java的Integer)、String和Double,都支援作為DataStream的類型。下面例子中處理Long類型記錄的資料流,并對流中的記錄應用一個遞增操作。

val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1)
           

Example 5-4. Increment a stream of Long values

5.2.1.2 JAVA AND SCALA TUPLES

元組是由固定數量的類型化字段組成的複合資料類型。

Scala DataStream API使用規範的Scala元組。下面是一個過濾二進制元組類型的DataStream的示例。我們将在下一節中讨論filter轉換的語義:

// DataStream of Tuple2[String, Integer] for Person(name, age)
val persons: DataStream[(String, Integer)] = env.fromElements(
  ("Adam", 17), 
  ("Sarah", 23))

// filter for persons of age > 18
persons.filter(p => p._2 > 18)
           

Example 5-5. Filtering tuples in the Scala DataStream API

Flink提供了Java元組的有效實作。Flink的Java元組最多可以支援有25個屬性字段,每種長度的元組都通過一個單獨的類來實作,即, Tuple1,Tuple2,直到Tuple25。元組類是強類型的。

我們可以在Java DataStream API中重寫上面的過濾示例:

// DataStream of Tuple2<String, Integer> for Person(name, age)
DataStream<Tuple2<String, Integer>> persons = env.fromElements(
  Tuple2.of("Adam", 17), 
  Tuple2.of("Sarah", 23));

// filter for persons of age > 18
persons.filter(new FilterFunction<Tuple2<String, Integer>() {
   @Override
   public boolean filter(Tuple2<String, Integer> p) throws Exception {
       return p.f1 > 18;
   }
})
           

Example 5-6. Filtering tuples in the Java DataStream API

Tuple中的屬性字段可以通過其公共屬性字段的名稱來進行類型安全的通路,如f0、f1、f2等,如上面例子中這樣;除此之外,我們還也可以使用Object getField(int pos)方法,通過指定索引進行通路,其中索引從0開始:

Tuple2<String, Integer> personTuple = Tuple2.of("Alex", "42");
Integer age = personTuple.getField(1); // age = 42
           

Example 5-7. Accessing a tuple field in the Java DataStream API

與Scala的元組不同,Flink支援的Java元組是可變的,這樣我們可以重新配置設定屬性字段的值。是以,函數可以重用Java元組,以減少對垃圾回收的壓力:

personTuple.f1 = 42;         // set the 2nd field to 42
personTuple.setField(43, 1); // set the 2nd field to 43
           

Example 5-8. Setting tuple fields in the Java DataStream API

5.2.1.3 SCALA CASE CLASSES

Flink支援Scala的樣本類;即可以通過模式比對分解的類。樣本類的屬性字段是按名稱通路的。在下面的例子中,我們定義了一個樣本類:Person,它有兩個字段,即name和age。與元組類似,我們按年齡篩選DataStream:

case class Person(name: String, age: Int)

val persons: DataStream[Person] = env.fromElements(
  Person("Adam", 17), 
  Person("Sarah", 23))

// filter for persons with age > 18
persons.filter(p => p.age > 18)
           

Example 5-9. Filtering persons by age using a Scala case class

5.2.1.4 POJOS

Flink對不屬于任何類别的每種類型進行分析,并檢查是否可以将其辨別為POJO類型并作為POJO類型處理。當一個類滿足下面的條件時,Flink接受其作為POJO:

  • 該類是一個public類
  • 該類有public的無參構造器
  • 所有屬性需要是public聲明,或者可以通過getter和setter通路到。而且getter和setter方法必須遵守預設的命名方案,比如屬性字段x的類型是Y,那麼他的getter和setter方案應該是Y getX() 和 setX(Y x)
  • 所有屬性字段都具有Flink支援的類型

例如,下面的Java類将被Flink辨別為POJO:

public class Person {
  // both fields are public
  public String name;
  public int age;

  // default constructor is present
  public Person() {}

  public Person(String name, int age) {
      this.name = name;
      this.age = age;
  }
}

DataStream<Person> persons = env.fromElements(
   new Person("Alex", 42),
   new Person("Wendy", 23)); 
           

Example 5-10. A Java class which is identified by Flink as a POJO

Avro生成的類由Flink自動識别,并作為POJO處理.

5.2.1.5 FLINK VALUE TYPES

實作了org.apache.flink.types.Value接口的類型被視為Flink值類型。該接口由read()和write()兩個方法組成,用于實作序列化和反序列化邏輯。例如,可以利用這些方法實作比通用序列化器更有效地對公共值進行編碼。

Flink提供了一些内置的值類型,例如IntValue、DoubleValue和StringValue,它們為Java和Scala的不可變基本類型提供了一種可變的替代選擇。

5.2.1.6 ARRAYS, LISTS, MAPS, ENUMS, AND OTHER SPECIAL TYPES

Flink支援幾種特殊用途的類型,例如Scala的Either、Option和Try類型,以及Flink的Java版本的Either類型。與Scala的Either類型類似,它表示兩種可能類型之一的值,Left或者Right。此外,Flink還支援基本類型和對象Array類型、Java的Enum類型和Hadoop的Writable類型。

5.2.2 TypeInformation

Flink類型系統的核心類是TypeInformation。 它為系統提供了用于生成序列化器和比較器所需的必要資訊。 例如,當您通過某個鍵加入(join)或分組(group)時,這個類允許Flink執行語義檢查,檢查被用作鍵的字段是否有效。

當應用程式送出執行時,Flink的類型系統嘗試為架構處理的每個資料類型自動派生類型資訊。所謂的類型提取器分析所有函數的泛型類型和傳回類型,以獲得相應的類型資訊對象。是以,您可以暫時使用Flink,而不必擔心資料類型的類型資訊。然而,有時類型提取器會失敗,或者您可能希望定義自己的類型并告訴Flink如何有效地處理它們。在這種情況下,需要為特定的資料類型生成類型資訊。

Flink使用靜态方法為Java和Scala提供了兩個實用程式類來生成類型資訊。對于Java, 這個助手類是org.apache.flink.api.common.typeinfo.Types,如下面的例子所示:

// TypeInformation for primitive types
TypeInformation<Integer> intType = Types.INT;

// TypeInformation for Java Tuples
TypeInformation<Tuple2<Long, String>> tupleType = 
  Types.TUPLE(Types.LONG, Types.STRING);

// TypeInformation for POJOs
TypeInformation<Person> personType = Types.POJO(Person.class);
           

對于Scala API,類型資訊助手類是org.apache.flink.api.scala.typeutils。類型和它的使用如下代碼示例所示:

// TypeInformation for primitive types
val stringType: TypeInformation[String] = Types.STRING

// TypeInformation for Scala Tuples
val tupleType: TypeInformation[(Int, Long)] = Types.TUPLE[(Int, Long)]

// TypeInformation for case classes
val caseClassType: TypeInformation[Person] = Types.CASE_CLASS[Person]
           

NOTE:在Scala API中,Flink使用在編譯時運作的宏。 要通路’createTypeInformation’宏函數,請確定始終添加以下import語句:

import org.apache.flink.streaming.api.scala._
           

顯式地提供TypeInformation

在大多數情況下,Flink能夠自動推斷類型并生成正确的TypeInformation。 Flink的類型提取器利用反射和對函數的簽名于子類資訊進行分許,來擷取使用者定義函數的正确輸出類型。但是,有時無法提取必要的資訊,例如Java會擦除泛型類型資訊。此外,在某些情況下,Flink可能不會選擇生成最有效的序列化器和反序列化器的TypeInformation。

是以,對于應用程式中使用的某些資料類型,您可能需要向Flink顯式提供TypeInformation對象。有兩種方法可以提供TypeInformation。

首先,您可以通過實作ResultTypeQueryable接口來擴充函數類,以顯式提供其傳回類型的TypeInformation。以下示例顯示了一個提供其傳回類型的MapFunction。

class Tuple2ToPersonMapper extends MapFunction[(String, Int), Person]
    with ResultTypeQueryable[Person] {

  override def map(v: (String, Int)): Person = Person(v._1, v._2)

  // provide the TypeInformation for the output data type
  override def getProducedType: TypeInformation[Person] = Types.CASE_CLASS[Person]
}
           

在Java DataStream API中,還可以使用returns()方法在定義資料流時顯式指定操作符的傳回類型,如下例所示:

DataStream<Tuple2<String, Integer>> tuples = ...
DataStream<Person> persons = tuples
   .map(t -> new Person(t.f0, t.f1))
   // provide TypeInformation for the map lambda function's return type
   .returns(Types.POJO(Person.class));
           

可以用Class類來提供類型提示,如下例所示:

DataStream<MyType> result = input
	   .map(new MyMapFunction<Long, MyType>())
	   .returns(MyType.class);
           

Example 5-11. Providing type hints with a class in the Java API

如果函數在傳回類型中使用不能從輸入類型推斷的泛型類型變量,則需要提供一個TypeHint:

DataStream<Integer> result = input
   .flatMap(new MyFlatMapFunction<String, Integer>())
   .returns(new TypeHint<Integer>(){});
           
class MyFlatMapFunction<T, O> implements FlatMapFunction<T, O> {

   public void flatMap(T value, Collector<O> out) { ... }
}
           

Example 5-12. Providing type hints with a TypeHint in the Java API

5.3 轉換

在本節中,我們将概述DataStream API的基本的轉換。我們将在下面的章節中介紹與時間相關的操作符(如視窗運算符),以及進一步的特定轉換。 流轉換可以被應用于一個或多個輸入流上,并将它們轉換為一個或多個輸出流。 編寫DataStream API程式本質上歸結為組合這些轉換以建立實作應用程式邏輯的資料流圖。

大多數流轉換都是基于使用者定義的函數的(UDF)。 UDF封裝使用者應用程式邏輯,并定義了如何将輸入流的元素轉換為輸出流的元素。 UDF通過類的形式定義,該類需要擴充自特定轉換的函數接口,例如以下示例中的FilterFunction和MapFunction:

class MyFilterFunction extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = {
    value > 0;
  }
}

class MyMapFunction extends MapFunction[Int, Int] {
  override def map(value: Int): Int =  value + 1
}
           

Example 5-14. A DataStream UDF class

函數接口中定義了需要由使用者實作的轉換方法,比如上例中的filter()、map()方法。

大多數函數接口設計為SAM(單一抽象方法)接口。是以,它們可以在Java 8中實作為lambda函數。Scala DataStream API也提供了對lanmbda函數的内置支援。在介紹DataStream API的轉換時,我們将展示所有實作了的函數類的接口,但為了簡潔起見,我們在代碼中還是使用更簡潔的lambda,而不是函數類。

DataStream API為常見的資料轉換操作提供了轉換。如果您熟悉批處理資料API、函數式程式設計語言或SQL,就會發現API概念非常容易掌握。下面,我們将DataStream API的轉換分為四組:

  1. 基本轉換【Basic transformations】是對單個事件的轉換
  2. KeyedStream轉換【KeyedStream transformations】是應用于鍵上下文中的事件的轉換
  3. 多流轉換【Multi-stream transformations】将多個流合并到一個流中,或者将一個流拆分為多個流
  4. 分區轉換則會重新組織流事件

5.3.1 Basic transformations

基本轉換用于處理單個事件。我們下面來解釋它們的語義并展示代碼示例:

FILTER [DATASTREAM -> DATASTREAM]

過濾器轉換通過計算每個輸入事件的布爾條件,來決定丢棄或轉發流中的事件。 傳回值為true會保留輸入事件并将其轉發到輸出,而如果傳回值為false則會導緻事件被丢棄。 通過調用DataStream.filter()方法來指定對一個過濾器轉換的使用。 圖5.1顯示了一個僅保留白色方塊的過濾操作。

Apache Flink流處理(五)

Figure 5-1. A filter operation that only retains white values.

布爾條件可以使用FilterFunction接口或lambda函數任一方式實作為一個UDF。 FilterFunction根據輸入流的類型進行類型化,并定義filter()方法,該方法使用輸入事件調用,并傳回一個布爾值。

// T: the type of elements
FilterFunction[T]
    > filter(T): Boolean
           

下面的例子展示了一個過濾器,它将溫度低于25度的所有傳感器測量資料丢棄掉:

val readings: DataStream[SensorReadings] = ...
val filteredSensors = readings
    .filter( r =>  r.temperature >= 25 )
           

Example 5-15. A filter transformation that drops sensor measurements with temperature below 25 degrees

MAP [DATASTREAM -> DATASTREAM]

映射轉換是通過調用DataStream.map()方法來指定的。它将每個傳入事件傳遞給一個使用者定義的映射器【mapper】,該映射器恰好傳回一個輸出事件,但是它可能是不同類型的輸出事件。圖5.1展示了一個将每個正方形轉換為圓行的映射轉換。

Apache Flink流處理(五)

Figure 5.2 A map operation that transforms every square into a circle of the same color.

映射器【mapper】的類型是根據輸入和輸出事件的類型來類型化的,可以使用MapFunction接口指定。 該接口定義了一個map()方法,該方法将輸入事件轉換為一個輸出事件。

// T: the type of input elements
// O: the type of output elements
MapFunction[T, O]
    > map(T): O
           

下面是一個簡單的映射器,它在輸入流中投影每個SensorReading 中的id屬性:

val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(new MyMapFunction)

class MyMapFunction extends ProjectionMap[SensorReading, String] {
  override def map(r: SensorReading): String = r.id
}
           

Example 5-16. A mapper that projects the first field of each SensorReading in the input stream

當使用Scala API或者Java8,該映射器可以使用lambda函數來表達。

val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(r => r.id)
           

Example 5-17. A mapper using a lambda function

FLATMAP [DATASTREAM -> DATASTREAM]

FlatMap與map類似,但是它可以為每個傳入事件生成0個、1個或多個輸出事件。事實上,flatMap是filter和map的一種泛化,可以用來實作這兩種操作。圖5.3展示了一個flatMap操作,該操作根據傳入事件的顔色區分其輸出。如果輸入是白色方塊,則輸出未經修改的事件。黑色方塊将會被Copy,灰色方塊則被過濾掉。

Apache Flink流處理(五)

Figure 5-3. A flatMap operation that outputs white squares, duplicates black squares, and drops gray squares.

flatMap轉換對每個傳入事件應用UDF。對應的FlatMapFunction定義了一個flatMap()方法,該方法可以通過将它們傳遞給Collector對象來傳回零個,一個或多個事件。

// T: the type of input elements
// O: the type of output elements
FlatMapFunction[T, O]
    > flatMap(T, Collector[O]): Unit
           

下面的示例展示了flatMap轉換,它被用于轉換傳感器的字元串類型的ID。用于生成傳感器讀數的簡版事件源所生成的ID的形式為“sensor_N”,其中N是整數。 下面的flatMap函數則根據“_”将該ID分割為字首“sensor”和傳感器編号,并将二者都發射出來:

val sensorIds: DataStream[String] = ...
val splitIds: DataStream[String] = sensorIds
  .flatMap(id => id.split("_"))
           

Example 5-18. A flatMap that splits the incoming sensor ids into their prefix and their number.

注意,每個字元串都将作為一個單獨的記錄發出,即flatMap扁平化了輸出集合。

5.3.2 KeyedStream transformations

許多應用程式的一個常見需求是一起處理共享某個屬性的事件組。DataStream API具有KeyedStream的抽象,KeyedStream是一個DataStream,它在邏輯上被分區為 共享相同鍵(key)的事件 的不相交子流。

應用到KeyedStream上的有狀态的轉換【Stateful transformations 】,會從目前處理的事件的鍵【key】的上下文中讀取和寫入狀态。這意味着具有相同鍵【key】的所有事件可以通路到相同的狀态,進而一起處理。需要注意的是,必須謹慎使用有狀态的轉換【Stateful transformations 】和鍵控聚合【keyed aggregates 】。如果鍵域【key domain 】持續增長,例如因為鍵【key】是一個惟一的事務ID,那麼應用程式最終可能會遇到記憶體問題。請參閱第8章:Implementing Stateful Functions” ,其中詳細讨論了有狀态函數。

可以使用之前看到的map,flatMap和filter轉換來處理KeyedStream。 在下文中,您将了解如何使用keyBy轉換将DataStream轉換為KeyedStream以及鍵控轉換【keyed transformations】(如滾動聚合和reduce)。

KEYBY [DATASTREAM -> KEYEDSTREAM]

keyBy轉換使用指定的鍵【key】将DataStream轉換為KeyedStream。它根據鍵【key】,将流中的事件配置設定給不同的分區。雖然具有不同鍵【key】的事件可以配置設定給同一分區,但我們可以保證具有相同鍵【key】的元素始終位于同一分區。 是以,一個分區可能由多個邏輯子流組成,每個子流具有一個唯一的鍵【key】。

假設以輸入事件中的顔色屬性作為鍵【key】,下面的圖5.4将白色和灰色事件配置設定給一個分區,将黑色事件配置設定給另一個分區:

Apache Flink流處理(五)

Figure 5-4. A keyBy operation that partitions together events based on their color.

keyBy()方法接收一個參數,該參數指定用于分組的鍵【key】(或keys)并傳回一個KeyedStream。 有不同的方法來指定這個鍵【key】。 我們将在本章後面的“Defining Keys”部分中看到它們。以下示例根據ID對傳感器讀數流進行分組:

val readings: DataStream[SensorReading] = ...
val keyed: KeyedStream[SensorReading, String] = readings
  .keyBy(r => r.id)
           

Example 5-19. Group a DataStream of SensorReadings by id

ROLLING AGGREGATIONS [KEYEDSTREAM -> DATASTREAM]

滾動聚合轉換應用于一個KeyedStream并生成聚合流,這種轉換有sum、minimum和maximum。 滾動聚合操作符為每個觀察到的鍵【key】存放一個聚合值。 對于每個傳入事件,操作符更新相應的聚合值,并使用更新後的值發出事件。 滾動聚合不需要使用者定義的函數,但它也接收一個參數,該參數用于指定使用哪個字段來計算聚合。 DataStream API提供以下滾動聚合方法:

  • sum(): 輸入流在指定字段上滾動求和
  • min(): 輸入流在指定字段上滾動求得最小值
  • max(): 輸入流在指定字段上滾動求得最大值
  • minBy(): 輸入流的滾動最小值,傳回到目前為止觀察到的具有最低值的事件
  • maxBy(): 輸入流的滾動最大值,傳回到目前為止觀察到的具有最高值的事件

不可以組合多個滾動聚合方法,即,一次隻能計算一個滾動聚合。

考慮下面例子:

val inputStream: DataStream[(Int, Int, Int)] = env.fromElements(
  (1, 2, 2), (2, 3, 1), (2, 2, 4), (1, 5, 3))

val resultStream: DataStream[(Int, Int, Int)] = inputStream
  .keyBy(0) // key on first field of the tuple
  .sum(1)   // sum the second field of the tuple in place

resultStream.print()
           

Example 5-20. Computing a rolling sum on a tuple and printing the result

在本例中,元組輸入流根據第一個字段鍵控分組,并基于第二個字段進行滾動求和計算。對于key=1的分組中,輸出是(1,2,2),然後是(1,7,2);對于key=2的分組中,輸出則是(2,3,1),然後是(2,5,1)。示例中inputStream流所輸出元組中:第一個字段是公共的鍵【key】,第二個字段是待求和的值,第三個字段沒有定義。

NOTE隻在限界的鍵域上才使用滾動聚合:滾動聚合操作符為處理的每個鍵儲存一個狀态。由于這種狀态從未被清除,是以您應該隻在具有限界鍵域的流上應用滾動聚合操作符

REDUCE [KEYEDSTREAM -> DATASTREAM]

歸約【reduce】轉換是滾動聚合的一種推廣。它在KeyedStream上應用了一個使用者定義的函數,該函數将每個傳入事件與目前歸約得到的值結合起來。歸約【reduce】轉換不會改變流的類型,即,輸出流的類型與輸入流的類型相同。

可以使用實作了ReduceFunction接口的類來指定UDF。 ReduceFunction定義了educe()方法,該方法接受兩個輸入事件并傳回相同類型的事件。

// T: the element type
ReduceFunction[T]
    > reduce(T, T): T

val readings: DataStream[SensorReading] = ...

// a rolling reduce that computes the highest temperature of the each group's readings
val reducedSensors = readings
  .keyBy(_.id)
  .reduce((r1, r2) => {
    val highestTimestamp = Math.max(r1.timestamp, r2.timestamp)
    SensorReading(r1.id, highestTimestamp, r1.temperature)
  })
           

在下面的例子中,流是根據語言【language】鍵控分組的,結果是關于每種語言不斷的不斷更新的單詞清單:

val inputStream: DataStream[(String, List[String])] = env.fromElements(
  ("en", List("tea")), ("fr", List("vin")), ("en", List("cake")))

val resultStream: DataStream[(String, List[String])] = inputStream
  .keyBy(0)
  .reduce((x, y) => (x._1, x._2 ::: y._2))
           

Example 5-21. A rolling reduce that computes the highest timestamp of the each sensor group’s readings

5.3.3 Multi-stream transformations

許多應用程式會攝取需要聯合處理的多個流,或者需要将流拆分以便将對不同的子流應用不同的邏輯。 在下文中,我們将讨論處理多個輸入流或發出多個輸出流的DataStream API轉換。

UNION [DATASTREAM* -> DATASTREAM]

Union将一個或多個輸入流合并到一個輸出流中。圖5.5展示了一個union操作,它将黑白事件合并到單個輸出流中:

Apache Flink流處理(五)

Figure 5-5. A union operation that merges two input streams into one.

DataStream.union()方法接收一個或者多個具有相同輸入類型的DataStreams ,并生成一個相同類型的新DataStream, 随後的轉換會處理輸入流中所有的元素。

事件以先入先出的方式合并,即,則操作符不會生成特定的順序或事件。此外,union操作符不執行重複消除。每個輸入事件都被發送給下一個操作符。

下面的例子展示了如何将三種SensorReading類型的流讀入到一個流中:

val parisStream: DataStream[SensorReading] = ...
val tokyoStream: DataStream[SensorReading] = ...
val rioStream: DataStream[SensorReading] = ...
val allCities: DataStream[SensorReading] = parisStream
  .union(tokyoStream, rioStream)
           

Example 5-22. A union transformation on three sensor streams

CONNECT, COMAP, AND COFLATMAP [CONNECTEDSTREAMS -> DATASTREAM] 本章翻譯比較水,可以暫時忽略,尤其最後兩段

在流進行中,組合兩個流的事件是一個非常常見的需求。考慮這樣一個應用程式,該應用程式監視森林區域,并在發生火災的高風險時輸出警報。該應用程式接收您之前看到的溫度傳感器讀數流和一個額外的煙霧水準測量流。當溫度超過給定的門檻值且煙霧水準很高時,應用程式發出火災警報。

DataStream API提供了聯結【connect】轉換來支援這種用例。DataStream.connect()方法接收一個DataStream并傳回一個ConnectedStreams對象,該傳回值對象表示一個由兩個流聯結後的流。

// first stream
val first: DataStream[Int] = ...
// second stream
val second: DataStream[String] = ...

// connect streams
val connected: ConnectedStreams[Int, String] = first.connect(second)
           

ConnectedStreams提供map()和flatMap()方法,它們分别需要CoMapFunction和CoFlatMapFunction作為參數。

這兩個函數都是根據第一個和第二個輸入流的類型以及輸出流的類型來類型化的(即接口的泛型類型),并定義了兩個方法,每個方法對應一個輸入流。調用map1()和flatMap1()來處理第一個輸入中的事件,調用map2()和flatMap2()來處理第二個輸入中的事件。

// IN1: the type of the first input stream
// IN2: the type of the second input stream
// OUT: the type of the output elements
CoMapFunction[IN1, IN2, OUT]
    > map1(IN1): OUT
    > map2(IN2): OUT
           
// IN1: the type of the first input stream
// IN2: the type of the second input stream
// OUT: the type of the output elements
CoFlatMapFunction[IN1, IN2, OUT]
    > flatMap1(IN1, Collector[OUT]): Unit
    > flatMap2(IN2, Collector[OUT]): Unit
           

NOTE 函數不能選擇要從哪個已連接配接的流中讀取:請注意,我們無法控制調用CoMapFunction和CoFlatMapFunction方法的順序。相反,隻要事件通過相應的輸入到達,就會調用相應的方法。

兩個流的聯合處理通常要求在這兩個流上的事件,基于某些條件被确切的路由,使得可以在操作符所并行運作的同一個執行個體(其實就是任務)上被執行。 預設情況下,connect()不會在兩個流的事件之間建立關系,這樣兩個流中的事件會被随機配置設定給操作符執行個體(其實就是任務)。 但是這種行為會産生不确定性的結果,而這通常是我們所不希望看到的。為了在ConnectedStreams上實作确定性的轉換,connect()可以與keyBy()或broadcast()結合使用,如下所示:

val one: DataStream[(Int, Long)] = ...
val two: DataStream[(Int, String)] = ...

// keyBy two connected streams
val keyedConnect1: ConnectedStreams[(Int, Long), (Int, String)] = one
  .connect(two)
  .keyBy(0, 0) // key both input streams on first attribute

// alternative: connect two keyed streams
val keyedConnect2: ConnectedStreams[(Int, Long), (Int, String)] = one.keyBy(0)
  .connect(two.keyBy(0)
           

無論您是對ConnectedStreams應用了keyBy()還是對KeyedStreams應用了connect(),connect() 轉換都會将來自兩個流中的使用相同鍵的資料記錄路由到相同的操作符執行個體上。注意,這兩個流的鍵應該引用相同的實體類,就像SQL查詢中的join謂詞一樣。應用于既聯結又鍵控的流上的操作符可以通路鍵控狀态【keyed state】(第八章詳細講解)

下一個例子展示了如何将一個(非鍵控的)DataStream 與一個廣播流聯結起來:

val first: DataStream[(Int, Long)] = ...
val second: DataStream[(Int, String)] = ...

// connect streams with broadcast
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
  // broadcast second input stream
  .connect(second.broadcast())
           

将廣播流的所有事件複制并發送到後續處理函數的所有并行操作符執行個體上。而非廣播流中的事件隻是簡單的被轉發。 是以,可以聯合處理兩個輸入流的所有元素。

NOTE:USE BROADCAST STATE IF YOU NEED CONNECT A KEYED AND A BROADCASTED STREAM

Broadcast state是Broadcast ()-connect()轉換的改進版本。它還支援聯結鍵控和廣播流,并将廣播事件存儲在托管狀态。有了它,您可以實作通過資料流動态配置的操作符,例如,添加或删除過濾規則或更新機器學習模型。在“第七章:Using Connected Broadcast State”.中詳細讨論了廣播狀态。

下面的示例代碼展示了火災警報場景的簡化版實作:

// ingest sensor stream
val tempReadings: DataStream[SensorReading] = env
  .addSource(new SensorSource)
  .assignTimestampsAndWatermarks(new SensorTimeAssigner)

// ingest smoke level stream
val smokeReadings: DataStream[SmokeLevel] = env
  .addSource(new SmokeLevelSource)
  .setParallelism(1)

// group sensor readings by their id
val keyed: KeyedStream[SensorReading, String] = tempReadings
  .keyBy(_.id)

// connect the two streams and raise an alert
// if the temperature and smoke levels are high
val alerts = keyed
  .connect(smokeReadings.broadcast)
  .flatMap(new RaiseAlertFlatMap)

alerts.print()
           

Example 5-23. Fire alert application using connected streams

class RaiseAlertFlatMap extends CoFlatMapFunction[SensorReading, SmokeLevel, Alert] {

  var smokeLevel = SmokeLevel.Low

  override def flatMap1(in1: SensorReading, collector: Collector[Alert]): Unit = {
    // high chance of fire => true
    if (smokeLevel.equals(SmokeLevel.High) && in1.temperature > 100) {
      collector.collect(Alert("Risk of fire!", in1.timestamp))
    }
  }

  override def flatMap2(in2: SmokeLevel, collector: Collector[Alert]): Unit = {
    smokeLevel = in2
  }
}
           

Example 5-24. A CoFlatMapFunction that raises an alert if the temperature and the smoke level are both high

請注意,此示例中的狀态(smokeLevel)沒有被檢查點,那麼在發生故障時将會丢失。

SPLIT [DATASTREAM -> SPLITSTREAM] AND SELECT [SPLITSTREAM -> DATASTREAM]

Split是對union轉換的逆轉換。 它将輸入流拆分為兩個或多個輸出流。 每個傳入的事件都可以路由到0個,一個或多個輸出流。 是以,split也可用于過濾或複制事件。 圖5.6展示了一個操作符,它将所有白色事件放入一個單獨的流中。

Apache Flink流處理(五)

Figure 5-6. A split operation that splits the input stream into a stream of white events and a stream of others.

DataStream.split() 方法接收一個OutputSelector,它定義如何将流元素配置設定給指定名字的輸出【named outputs】。 OutputSelector中定義了一個為每個輸入事件調用的select()方法,該方法傳回一個 java.lang.Iterable [String]。 其中的字元串表示的是元素路由到的輸出【output】的名稱。

// IN: the type of the split elements
OutputSelector[IN]
    > select(IN): Iterable[String]
           

DataStream.split() 方法傳回一個SplitStream,它提供一個select()方法,我們可以通過指定一個輸出名稱【output name】的清單作為該方法的入參,以從SplitStream中選擇一個或多個流。

下面的示例将數字流分為大數流和小數流。

Example 5-25. Split a tuple stream into a stream with large numbers and a stream with small numbers.

val inputStream: DataStream[(Int, String)] = ...

val splitted: SplitStream[(Int, String)] = inputStream
  .split(t => if (t._1 > 1000) Seq("large") else Seq("small"))

val large: DataStream[(Int, String)] = splitted.select("large")
val small: DataStream[(Int, String)] = splitted.select("small")
val all: DataStream[(Int, String)] = splitted.select("small", "large") 
           

Split轉換的一個限制是所有輸出流都與輸入類型相同。在“Emitting to Side Outputs”中,我們給出了ProcessFunction的side output特性,它能夠從一個函數發射多個不同類型的流

5.2.4 Partitioning transformations

分區轉換對應于我們在第2章中介紹的資料交換政策。這些操作【operations】定義了如何将事件配置設定給任務。在使用DataStream API建構應用程式時,系統根據操作語義和配置的并行度自動選擇資料分區政策,并将資料路由到正确的目的地。有時,有必要或希望在應用程式級别中控制分區政策或定義自定義分區器。 例如,如果我們知道DataStream的并行分區的負載是傾斜的,我們可能希望重新對資料進行負載均衡,以便均勻地配置設定後續操作符的計算負載。 或者,應用程式邏輯可能要求操作的所有任務接收相同的資料,或者按照自定義政策分發事件。 在本節中,我們将介紹DataStream中相應的方法,使使用者能夠控制分區政策或定義自己的分區政策。

請注意,keyBy()與本節中讨論的分區轉換不同。 此部分中的所有轉換都會生成DataStream,而keyBy()生成的是KeyedStream,可以在其上應用可以通路鍵控狀态的轉換。

RANDOM

随機資料交換政策是通過DataStream API的shuffle()方法實作的。該方法将資料事件呈均勻分布的随機分布到下遊操作符的并行任務中。

ROUND-ROBIN

rebalance()方法對輸入流進行分區,以便事件以輪詢【round-robin】的方式均勻地分布到後繼任務【successor task】上。

RESCALE

rescale()方法亦以輪詢【round-robin】的方式分發事件,但僅将任務配置設定給後繼任務【successor task】的子集上。 從本質上講,重新縮放分區政策【rescale partitioning strategy】提供了一種在資料流圖包含扇出模式時執行輕量級負載再平衡的方法。 rebalance()和rescale()之間的根本差別在于形成的任務連接配接的方式。 雖然rebalance()将在所有發送端任務與所有接收端任務之間建立通信通道【communication channels 】,但rescale()将僅建立從每個任務到下遊操作符的某些任務的通信通道【communication channels 】。rebalance和rescale之間的連接配接模式差異如下圖所示:

Apache Flink流處理(五)

Figure 5-7. Rebalance transformation

Apache Flink流處理(五)

Figure 5-8. Rescale transformation

BROADCAST

broadcast()方法複制輸入資料流,以便将所有事件發送到下遊操作符的所有并行任務上。

GLOBAL

global()方法将輸入資料流的所有事件發送到下遊操作符的第一個并行任務。必須小心使用這種分區政策,因為将所有事件路由到同一個任務可能會影響應用程式的性能。

CUSTOM

當預定義的分區政策都不合适時,可以使用partitionCustom()方法定義自己的自定義分區政策。該方法接收一個Partitioner對象,該對象實作分區邏輯以及對流進行分區所使用的字段(field)或鍵位置(例如清單的索引)。下面的示例對整數流進行分區,以便将所有負數發送到第一個任務,并将所有其他數字發送到一個随機任務:

val numbers: DataStream[(Int)] = ...
numbers.partitionCustom(myPartitioner, 0)

object myPartitioner extends Partitioner[Int] {
  val r = scala.util.Random

  override def partition(key: Int, numPartitions: Int): Int = {
    if (key < 0) 0 else r.nextInt(numPartitions)
  }
}
           

Example 5-26. A custom partitioning example

5.2.4.Setting the parallelism

Flink應用程式通常在并行環境下執行,例如機器叢集。 當将使用DataStream編寫的程式送出給JobManager執行時,系統會建立資料流圖【dataflow graph】并準備操作符以供執行。 每個操作符被并行化為一個或多個并行任務,每個任務處理輸入流的一個子集。 操作符的并行任務數稱為 操作符的并行度。它決定了操作符的處理工作量可以配置設定多少,以及它可以處理多少資料。

您可以通過在執行環境中設定并行度或通過設定各個操作符的并行度來控制Flink應用程式的操作符并行度。執行環境為它所執行的所有操作符、資料發生器/資料源和資料接收器定義預設的并行性。環境的并行性(以及所有運算符的預設并行性)根據應用程式啟動的上下文自動初始化。如果應用程式在本地執行環境中運作,則将并行性設定為與CPU核心數相比對。當向正在運作的Flink叢集送出應用程式時,除非通過送出客戶機顯式指定環境并行度,否則将環境并行度設定為叢集的預設并行度

通常,将操作符的并行度配置為一個相對于環境的預設并行度的值,這是一個好主意。這使得您可以通過送出用戶端【submission client】,來調整并行度輕松地擴充應用程式。您可以通路環境的預設并行性,如下例所示:

val env: StreamExecutionEnvironment.getExecutionEnvironment
// get default parallelism as configured in the cluster config or 
//   explicitly specified via the submission client.
val defaultP = env.env.getParallelism
           

Example 5-27. Setting the default parallelism for all operators to 4

您還可以覆寫環境的預設并行性,但是您将無法再通過送出用戶端【submission client】來控制應用程式的并行度。它是使用StreamExecutionEnvironment.setParallelism()方法設定的。下面的例子展示了如何将所有操作符的預設并行度設定為32:

val env: StreamExecutionEnvironment.getExecutionEnvironment
// set parallelism of the environment
env.setParallelism(32)
           

您可以通過設定單個操作符的并行度來覆寫執行環境的預設并行度。在下面的例子中,資料發生器/資料源操作符将以預設并行度執行,而映射轉換的并行度為預設并行度的2倍,接收器操作将由2個并行任務執行:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// get default parallelism
val defaultP = env.getParallelism

// the source runs with the default parallelism
val result: = env.addSource(new CustomSource)
  // the map parallelism is set to double the default parallelism
  .map(new MyMapper).setParallelism(defaultP * 2)
  // the print sink parallelism is fixed to 2
  .print().setParallelism(2)
           

Example 5-28. Setting different parallelism for different operators

當您通過送出用戶端送出應用程式并指定并行度為16時,資料源/發生器程式将以并行度為16運作,映射程式将以32個任務運作,接收器将以2個任務運作。如果您在本地環境中運作應用程式,例如在IDE中,在一台有8個核心的機器上,源任務将運作8個任務,映射器運作16個任務,接收器運作2個任務。

5.2.5 Defining keys and referencing fields

您在上一節中看到的某些轉換需要建立在輸入流類型基礎上的鍵規範或字段引用。在Flink中,keys并不像使用鍵值對的系統那樣在輸入流中預定義, 而是将key定義為建立在輸入資料之上的函數。 是以,沒有必要定義資料類型來儲存鍵和值,這避免了許多樣闆代碼。

下面我們将讨論引用字段的不同方法和定義資料類型的鍵。

FIELD POSITIONS

如果資料類型是元組,則可以通過使用對應元組元素的屬性字段位置來定義鍵。以下示例通過輸入元組的第二個屬性字段鍵入輸入流:

val input: DataStream[(Int, String, Long)] = ...
val keyed = input.keyBy(1)
           

Example 5-29. Key by field position

還可以定義由多個元組字段組成的組合鍵。在這種情況下,我們要提供這些元組屬性字段的位置清單,一個接一個。我們可以通過下面方式,使用元組的第二個和第三個屬性字段來鍵控輸入流:

val keyed2 = input.keyBy(1, 2)
           

FIELD EXPRESSIONS

另一種定義鍵【define keys】和選擇屬性字段【select fields】的方法是使用基于字元串的字段表達式。字段表達式适用于元組、POJOs和樣本類。它們還支援嵌套字段的選擇。

在本章的介紹性示例中,我們定義了以下樣本類:

case class SensorReading(
  id: String, 
  timestamp: Long, 
  temperature: Double)
           

我們就可以将字段名“id”傳遞給keyBy()函數,以指定我們希望通過傳感器id對輸入流進行鍵控。

val sensorStream: DataStream[SensorReading] = ...
val keyedSensors = sensorStream.keyBy("id")
           

POJO或樣本類是根據它們的屬性字段名來選擇字段的,就如上面的示例所示。而元組則可以通過字段名(Scala中元組是以_1開始,而java版的元組則以_0開始)或字段偏移索引(均從0開始偏移)來選擇字段。

val input: DataStream[(Int, String, Long)] = ...
val keyed1 = input.keyBy("2") // key by 3rd field
val keyed2 = input.keyBy("_1") // key by 1st field
           
DataStream<Tuple3<Integer, String, Long>> javaInput = ...
javaInput.keyBy(“f2”) // key Java tuple by 3rd field
           

通過用“.”表示嵌套級别,可以選擇POJO和元組中的嵌套字段。例如,考慮下面這個樣本類:

case class Address(
  address: String, 
  zip: String
  country: String)

case class Person(
  name: String,
  birthday: (Int, Int, Int), // year, month, day
  address: Address)
           

如果我們想引用一個人的郵政編碼(ZIP code),我們可以使用字段表達式address.zip。如下所示:

val persons: DataStream[Person] = ...
persons.keyBy("address.zip") // key by nested POJO field
           

也可以在混合類型上嵌套表達式:比如birthday._1表達式引用的則是birthday 這個元組的第一個字段,即,也就是出生的年份。

persons.keyBy("birthday._1") // key by field of nested tuple
           

可以使用通配符字段表達式:,來選擇完整的資料類型。例如birthday.,引用的是整個birthday 元組。通配符字段表達式對于Flink所有支援的資料類型都是有效的。

persons.keyBy("birthday._") // key by all fields of nested tuple
           

KEY SELECTORS

指定鍵的第三種方式是KeySelector函數。KeySelector 函數用于從輸入事件中提取鍵。

// T: the type of input elements
// KEY: the type of the key
KeySelector[IN, KEY]
  > getKey(IN): KEY
           

這個介紹性示例實際上在keyBy()方法中使用了一個簡單的KeySelector函數:

val sensorData: DataStream[SensorReading] = ...
val byId: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)
           

KeySelector函數接收一個輸入條目并傳回一個鍵。鍵并不需要是輸入事件中的某一字段,它可以由任意計算派生出。在以下代碼示例中,KeySelector函數傳回元組字段中的最大值作為鍵:

val input : DataStream[(Int, Int)] = ...
val keyedStream = input.keyBy(value => math.max(value._1, value._2))
           

與字段位置【field positions】和字段表達式【field expressions】這兩種方法相比,KeySelector函數的優點是:得益于KeySelector類的泛型類型,生成的鍵是強類型的。

5.2.6 Defining UDFs

到目前為止,您已經在本章的代碼示例中看到了使用者定義函數【UDF】的作用。在本節中,我們将更詳細地解釋在DataStream API中定義和參數化UDF的不同方法。

Function Classes

Flink将使用者定義函數(如MapFunction、FilterFunction和ProcessFunction)的所有接口公開為接口或抽象類。

函數是通過實作接口或擴充抽象類來實作的。在下面的例子中,我們實作了一個過濾器函數,用于過濾包含單詞“flink”的字元串:

class FlinkFilter extends FilterFunction[String] {
  override def filter(value: String): Boolean = {
    value.contains("flink") 
  }
}
           

然後,函數類的一個執行個體可以作為參數傳遞給filter轉換:

val flinkTweets = tweets.filter(new FlinkFilter)
           

函數也可以實作為匿名類:

val flinkTweets = tweets.filter(
  new RichFilterFunction[String] {
    override def filter(value: String): Boolean = {
      value.contains("flink") 
    } 
  })
           

函數可以通過它們的構造函數接收參數。我們可以對上面的例子進行參數化,并将字元串“flink”作為參數傳遞給KeywordFilter構造函數,如下所示:

val tweets: DataStream[String] = ???
val flinkTweets = tweets.filter(new KeywordFilter("flink"))

class KeywordFilter(keyWord: String) extends FilterFunction[String] {
  override def filter(value: String): Boolean = {
    value.contains(keyWord)
  }
}
           

當一個程式被送出執行時,所有函數對象都使用Java序列化進行序列化,并被傳送到其相應操作符的所有并行任務中。是以,在對象反序列化之後,所有配置值都将保留。

NOTE: FUNCTIONS MUST BE JAVA SERIALIZABLE

Flink使用Java序列化序列化所有函數對象,将它們發送到工作程序【worker processes】。使用者函數中包含的所有内容都必須是可序列化的。

如果函數需要一個不可序列化的對象執行個體,則可以将其實作為一個富函數并在open()方法中初始化不可序列化字段,或者覆寫Java序列化和反序列化方法。

LAMBDA FUNCTIONS

大多數DataStream API方法都接受lambda函數形式的UDF。Lambda函數可用于Scala和Java 8,當不需要通路狀态和配置等這樣的進階操作時,它提供了一種簡單而簡潔的方式來實作應用程式邏輯:

val tweets: DataStream[String] = ...
// a filter lambda function that checks if tweets contains the 
// word “flink”
val flinkTweets = tweets.filter(_.contains("flink"))
           

RICH FUNCTIONS CLASSES

定義UDF的一種更強大的方法是富函數。富函數定義了額外的方法,用于UDF的初始化和拆解,并提供鈎子【hooks】來通路UDF執行的上下文。前面的lambda函數示例可以使用富函數來重寫,如下所示:

class FlinkFilterFunction extends RichFilterFunction[String] {
  override def filter(value: String): Boolean = {
    value.contains("flink") 
  }
}
           

然後,可以将富函數實作的執行個體對象,作為參數傳遞給filter轉換:

val flinkTweets = tweets.filter(new FlinkFilterFunction)
           

另一種定義富函數的方法是作為匿名類:

val flinkTweets = tweets.filter(
  new RichFilterFunction[String] {
    override def filter(value: String): Boolean = {
      value.contains(“flink”) 
    }
  })
           

所有DataStream API轉換函數都有富函數的版本,是以您可以在可以使用lambda函數的地方使用它們。命名約定是,函數名以Rich開頭,後面跟着轉換名,例如Filter,并以Function結尾,比如RichMapFunction, RichFlatMapFunction,等等。

UDF可以通過其構造函數接收參數。 這些參數将作為函數對象的一部分使用正常Java序列化進行序列化,并傳遞到将執行該函數的所有并行任務執行個體上。

PS: UDF SERIALIZATION

Flink使用Java序列化序列化所有udf,将它們發送給工作者程序【worker processes】。使用者函數中包含的所有内容都必須是可序列化的。

我們可以對上面的例子進行參數化,并将字元串“flink”作為參數傳遞給FlinkFilterFunction構造函數,如下所示:

val tweets: DataStream[String] = …
val flinkTweets = tweets.filter(new MyFilterFunction(“flink”))

class MyFilterFunction(keyWord: String) extends RichFilterFunction[String] {
  override def filter(value: String): Boolean = {
    value.contains(keyWord)
  }
}
           

當使用富函數時,您可以實作兩個額外的方法,它們提供對函數生命周期的通路:

  • open()方法是富函數的初始化方法。在調用像filter、map和fold這樣的轉換方法之前,每個任務都會調用一次open方法。open()通常用于隻需要完成一次的設定工作。請注意,Configuration參數隻可被DataSet API使用,而不能被DataStream API使用。是以,在這裡它應該被忽略。
  • close()方法是函數的終結方法,在最後一次調用轉換方法之後,每個任務都會調用一次close方法。 是以,它通常用于清理和釋放資源

此外,getRuntimeContext()方法提供對函數的RuntimeContext的通路。RuntimeContext可用于檢索諸如函數并行度、子任務索引以及目前執行UDF的任務的名稱等資訊。此外,它還包括通路分區狀态的方法。Flink中的有狀态的流處理将在第8章中詳細讨論。下面的示例代碼展示了如何使用RichFlatMapFunction的方法:

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
  var subTaskIndex = 0

  override def open(configuration: Configuration): Unit = {
    subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
    // do some initialization
    // e.g. establish a connection to an external system
  }

  override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
    // subtasks are 0-indexed
    if(in % 2 == subTaskIndex) {
      out.collect((subTaskIndex, in))
    }
    // do some more processing
  }

  override def close(): Unit = {
    // do some cleanup, e.g. close connections to external systems

  }
}
           

Example 5-30. The open() and close() methods of a RichFlatMapFunction

open()和getRuntimeContext()方法還可以通過環境ExecutionConfig用于配置。我們可以使用RuntimeContext的getExecutionConfig()方法檢出ExecutionConfig,它允許我們設定可以被所有富UDF通路的全局配置選項。

下面的示例程式使用全局配置将“keyWord”參數設定為“flink”,然後在RichFilterFunction中讀取該參數:

def main(args: Array[String]) : Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  // create a configuration object
  val conf = new Configuration()

  // set the parameter “keyWord” to “flink”
  conf.setString("keyWord", "flink")

  // set the configuration as global
  env.getConfig.setGlobalJobParameters(conf)

  // create some data
  val input: DataStream[String] = env.fromElements(
   "I love flink", "bananas", "apples", "flinky")

  // filter the input stream and print it to stdout
  input.filter(new MyFilterFunction).print()

  env.execute()
}

class MyFilterFunction extends RichFilterFunction[String] {
  var keyWord = ""

  override def open(configuration: Configuration): Unit = {
    // retrieve the global configuration
    val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters

    // cast to a Configuration object
    val globConf = globalParams.asInstanceOf[Configuration]

    // retrieve the keyWord parameter
    keyWord = globConf.getString("keyWord", null)
  }

  override def filter(value: String): Boolean = {
    // use the keyWord parameter to filter out elements
    value.contains(keyWord)
  }
}
           

Example 5-31. Using the global configuration to set parameters in a RichFilterFunction

Including External and Flink Dependencies

在實作Flink應用程式時,添加外部依賴項是常見的要求。 有許多流行的庫,例如Apache Commons或Google Guava,它們可以解決和簡化各種用例。 此外,大多數Flink應用程式依賴于Flink的一個或多個連接配接器以用來從外部系統(如Apache Kafka,檔案系統或Apache Cassandra)中提取資料或向其發送資料。 某些應用程式還利用Flink的特定領域的庫,例如Table API,SQL或CEP庫。 是以,大多數Flink應用程式不僅依賴于Flink的DataStream API依賴項和Java SDK,還依賴于其他第三方和Flink内部包。

當一個應用程式被執行時,它的所有依賴關系必須對應用程式可用。預設情況下,Flink叢集隻加載核心API依賴項(DataStream和DataSet API)。應用程式需要的所有其他依賴項都必須顯式地提供。

這種設計的原因是将預設依賴項的數量保持在較低的水準。大多數連接配接器和庫依賴于一個或多個庫,這些庫通常具有多個額外的傳遞依賴項。這些通常包括經常使用的庫,如Apache Commons或谷歌的Guava。許多問題源自同一庫的不同版本之間的不相容性,這些版本從不同的連接配接器或直接從使用者應用程式引入。

有兩種方法可以確定應用程式在執行時所有依賴項都可用:

  • 将所有依賴的庫綁定到應用程式的JAR檔案中。這将生成一個自包含的、但通常相當大的應用程式JAR檔案。
  • 依賴的JAR檔案可以添加到Flink設定的./lib檔案夾中。在這種情況下,當Flink程序啟動時,依賴項被加載到類路徑中。像這樣添加到類路徑的依賴項對于在Flink設定上運作的所有應用程式都是可用的(并且可能會幹擾)

建構所謂的胖JAR檔案是處理應用程式依賴關系的首選方法。我們在第4章中介紹的使用Flink的Maven原型生成的Maven項目,這些項目被配置為生成包含所有必需依賴項的應用程式胖JAR。預設情況下包含在Flink程序類路徑【classpath】中的依賴項将自動從JAR檔案中排除。pom.xmll檔案包含解釋如何添加額外依賴項的注釋。

Summary

在本章中,我們介紹了Flink的DataStream API的基礎知識。 您已經檢查了Flink程式的結構,并且已經學習如何組合資料和分區轉換以建構流應用程式。 您還研究了受支援的資料類型以及指定鍵和使用者定義函數的不同方法。 如果您現在退後一步并再次閱讀介紹性示例,那麼您就有了一個清楚的概念。 在下一章中,随着您學習如何使用視窗運算符和時間語義來豐富我們的程式,事情會變得更加有趣。

  • 有關鍵控狀态的詳細資訊,請參閱第8章。
  • Flink還旨在将其自身的外部依賴降到最低限度,并對使用者應用程式隐藏大部分依賴(包括傳遞依賴),以防止版本沖突。

繼續閱讀