天天看點

Flink Kafka Connector 與 Exactly Once 剖析

作者:史天舒

Flink Kafka Connector 是 Flink 内置的 Kafka 連接配接器,它包含了從 Kafka Topic 讀入資料的 Flink Kafka Consumer 以及向 Kafka Topic 寫出資料的 Flink Kafka Producer,除此之外 Flink Kafa Connector 基于 Flink Checkpoint 機制提供了完善的容錯能力。本文從 Flink Kafka Connector 的基本使用到 Kafka 在 Flink 中端到端的容錯原理展開讨論。

1.Flink Kafka 的使用

在 Flink 中使用 Kafka Connector 時需要依賴 Kafka 的版本,Flink 針對不同的 Kafka 版本提供了對應的 Connector 實作。

1.1 版本依賴

既然 Flink 對不同版本的 Kafka 有不同實作,在使用時需要注意區分,根據使用環境引入正确的依賴關系。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>${flink_kafka_connector_version}</artifactId>
  <version>${flink_version}</version>
</dependency>           

在上面的依賴配置中 ${flink_version} 指使用 Flink 的版本,${flink_connector_kafka_version} 指依賴的 Kafka connector 版本對應的 artifactId。下表描述了截止目前為止 Kafka 服務版本與 Flink Connector 之間的對應關系。

Flink 官網内容 Apache Kafka Connector 中也有詳細的說明。

連結:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html
Flink Kafka Connector 與 Exactly Once 剖析

從 Flink 1.7 版本開始為 Kafka 1.0.0 及以上版本提供了全新的 Kafka Connector 支援,如果使用的 Kafka 版本在 1.0.0 及以上可以忽略因 Kafka 版本差異帶來的依賴變化。

1.2 基本使用

明确了使用的 Kafka 版本後就可以編寫一個基于 Flink Kafka 讀/寫的應用程式「本文讨論内容全部基于 Flink 1.7 版本和 Kafka 1.1.0 版本」。根據上面描述的對應關系在工程中添加 Kafka Connector 依賴。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.7.0</version>
</dependency>           

下面的代碼片段是從 Kafka Topic「flink_kafka_poc_input」中消費資料,再寫入 Kafka Topic「flink_kafka_poc_output」的簡單示例。示例中除了讀/寫 Kafka Topic 外,沒有做其他的邏輯處理。

public static void main(String[] args) {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  /** 初始化 Consumer 配置 */
  Properties consumerConfig = new Properties();
  consumerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
  consumerConfig.setProperty("group.id", "flink_poc_k110_consumer");

  /** 初始化 Kafka Consumer */
  FlinkKafkaConsumer<String> flinkKafkaConsumer = 
    new FlinkKafkaConsumer<String>(
      "flink_kafka_poc_input", 
      new SimpleStringSchema(), 
      consumerConfig
    );
  /** 将 Kafka Consumer 加入到流處理 */
  DataStream<String> stream = env.addSource(flinkKafkaConsumer);

  /** 初始化 Producer 配置 */
  Properties producerConfig = new Properties();
  producerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");

  /** 初始化 Kafka Producer */
  FlinkKafkaProducer<String> myProducer = 
    new FlinkKafkaProducer<String>(
      "flink_kafka_poc_output", 
      new MapSerialization(), 
      producerConfig
    );
  /** 将 Kafka Producer 加入到流處理 */
  stream.addSink(myProducer);

  /** 執行 */
  env.execute();
}

class MapSerialization implements SerializationSchema<String> {
  public byte[] serialize(String element) {
    return element.getBytes();
  }
}           

Flink API 使用起來确實非常簡單,調用 addSource 方法和 addSink 方法就可以将初始化好的 FlinkKafkaConsumer 和 FlinkKafkaProducer 加入到流進行中。execute 執行後,KafkaConsumer 和 KafkaProducer 就可以開始正常工作了。

2.Flink Kafka 的容錯

衆所周知,Flink 支援 Exactly-once semantics。什麼意思呢?翻譯過來就是「恰好一次語義」。流處理系統中,資料源源不斷的流入到系統、被處理、最後輸出結果。我們都不希望系統因人為或外部因素産生任何意想不到的結果。對于 Exactly-once 語義達到的目的是指即使系統被人為停止、因故障 shutdown、無故關機等任何因素停止運作狀态時,對于系統中的每條資料不會被重複處理也不會少處理。

2.1 Flink Exactly-once

Flink 宣稱支援 Exactly-once 其針對的是 Flink 應用内部的資料流處理。但 Flink 應用内部要想處理資料首先要有資料流入到 Flink 應用,其次 Flink 應用對資料處理完畢後也理應對資料做後續的輸出。在 Flink 中資料的流入稱為 Source,資料的後續輸出稱為 Sink,對于 Source 和 Sink 完全依靠外部系統支撐(比如 Kafka)。

Flink 自身是無法保證外部系統的 Exactly-once 語義。但這樣一來其實并不能稱為完整的 Exactly-once,或者說 Flink 并不能保證端到端 Exactly-once。而對于資料精準性要求極高的系統必須要保證端到端的 Exactly-once,所謂端到端是指 Flink 應用從 Source 一端開始到 Sink 一端結束,資料必經的起始和結束兩個端點。

那麼如何實作端到端的 Exactly-once 呢?Flink 應用所依賴的外部系統需要提供 Exactly-once 支撐,并結合 Flink 提供的 Checkpoint 機制和 Two Phase Commit 才能實作 Flink 端到端的 Exactly-once。對于 Source 和 Sink 的容錯保障,Flink 官方給出了具體說明:

Fault Tolerance Guarantees of Data Sources and Sinks:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/guarantees.html

2.2 Flink Checkpoint

在讨論基于 Kafka 端到端的 Exactly-once 之前先簡單了解一下 Flink Checkpoint。Flink Checkpoint 是 Flink 用來實作應用一緻性快照的核心機制,當 Flink 因故障或其他原因重新開機後可以通過最後一次成功的 Checkpoint 将應用恢複到當時的狀态。如果在應用中啟用了 Checkpoint,會由 JobManager 按指定時間間隔觸發 Checkpoint,Flink 應用内所有帶狀态的 Operator 會處理每一輪 Checkpoint 生命周期内的幾個狀态。

  • initializeState

由 CheckpointedFunction 接口定義。Task 啟動時擷取應用中所有實作了CheckpointedFunction 的 Operator,并觸發執行 initializeState 方法。在方法的實作中一般都是從狀态後端将快照狀态恢複。

  • snapshotState

由 CheckpointedFunction 接口定義。JobManager 會定期發起 Checkpoint,Task 接收到 Checkpoint 後擷取應用中所有實作了 CheckpointedFunction 的 Operator 并觸發執行對應的 snapshotState 方法。

JobManager 每發起一輪 Checkpoint 都會攜帶一個自增的 checkpointId,這個 checkpointId 代表了快照的輪次。

public interface CheckpointedFunction {
  void snapshotState(FunctionSnapshotContext context) throws Exception;
  void initializeState(FunctionInitializationContext context) throws Exception;
}           
  • notifyCheckpointComplete

由 CheckpointListener 接口定義。當基于同一個輪次(checkpointId 相同)的Checkpoint 快照全部處理成功後擷取應用中所有實作了 CheckpointListener 的 Operator 并觸發執行 notifyCheckpointComplete 方法。觸發 notifyCheckpointComplete 方法時攜帶的 checkpointId 參數用來告訴 Operator 哪一輪 Checkpoint 已經完成。

public interface CheckpointListener {
  void notifyCheckpointComplete(long checkpointId) throws Exception;
}           

3. Flink Kafka 端到端 Exactly-once

Kafka 是非常受歡迎的分布式消息系統,在 Flink 中它可以作為 Source,同時也可以作為 Sink。Kafka 0.11.0 及以上版本提供了對事務的支援,這讓 Flink 應用搭載 Kafka 實作端到端的 exactly-once 成為了可能。下面我們就來深入了解提供了事務支援的 Kafka 是如何與 Flink 結合實作端到端 exactly-once 的。

本文忽略了 Barrier 機制,是以示例和圖中都以單線程為例。Barrier 在《Flink Checkpoint 原理》有較多讨論。

3.1 Flink Kafka Consumer

Kafka 自身提供了可重複消費消息的能力,Flink 結合 Kafka 的這個特性以及自身 Checkpoint 機制,得以實作 Flink Kafka Consumer 的容錯。

Flink Kafka Consumer 是 Flink 應用從 Kafka 擷取資料流消息的一個實作。除了資料流擷取、資料發送下遊算子這些基本功能外它還提供了完善的容錯機制。這些特性依賴了其内部的一些元件以及内置的資料結構協同處理完成。這裡,我們先簡單了解這些元件和内置資料結構的職責,再結合 Flink 運作時 和 故障恢複時 兩個不同的處理時機來看一看它們之間是如何協同工作的。

Kafka Topic 中繼資料

從 Kafka 消費資料的前提是需要知道消費哪個 topic,這個 topic 有多少個 partition。元件 AbstractPartitionDiscoverer 負責獲得指定 topic 的中繼資料資訊,并将擷取到的 topic 中繼資料資訊封裝成 KafkaTopicPartition 集合。

  • KafkaTopicPartition

KafkaTopicPartition 結構用于記錄 topic 與 partition 的對應關系,内部定義了 String topic 和 int partition 兩個主要屬性。假設 topic A 有 2 個分區,通過元件 AbstractPartitionDiscoverer 處理後将得到由兩個 KafkaTopicPartition 對象組成的集合:KafkaTopicPartition(topic:A, partition:0) 和 KafkaTopicPartition(topic:A, partition:1)

  • Kafka 資料消費

作為 Flink Source,Flink Kafka Consumer 最主要的職責就是能從 Kafka 中擷取資料,交給下遊處理。在 Kafka Consumer 中 AbstractFetcher 元件負責完成這部分功能。除此之外 Fetcher 還負責 offset 的送出、KafkaTopicPartitionState 結構的資料維護。

  • KafkaTopicPartitionState

KafkaTopicPartitionState 是一個非常核心的資料結構,基于内部的 4 個基本屬性,Flink Kafka Consumer 維護了 topic、partition、已消費 offset、待送出 offset 的關聯關系。Flink Kafka Consumer 的容錯機制依賴了這些資料。

除了這 4 個基本屬性外 KafkaTopicPartitionState 還有兩個子類,一個是支援 PunctuatedWatermark 的實作,另一個是支援 PeriodicWatermark 的實作,這兩個子類在原有基礎上擴充了對水印的支援,我們這裡不做過多讨論。

Flink Kafka Connector 與 Exactly Once 剖析
  • 狀态持久化

Flink Kafka Consumer 的容錯性依靠的是狀态持久化,也可以稱為狀态快照。對于Flink Kafka Consumer 來說,這個狀态持久化具體是對 topic、partition、已消費 offset 的對應關系做持久化。

在實作中,使用 ListState> 定義了狀态存儲結構,在這裡 Long 表示的是 offset 類型,是以實際上就是使用 KafkaTopicPartition 和 offset 組成了一個對兒,再添加到狀态後端集合。

  • 狀态恢複

當狀态成功持久化後,一旦應用出現故障,就可以用最近持久化成功的快照恢複應用狀态。在實作中,狀态恢複時會将快照恢複到一個 TreeMap 結構中,其中 key 是 KafkaTopicPartition,value 是對應已消費的 offset。恢複成功後,應用恢複到故障前 Flink Kafka Consumer 消費的 offset,并繼續執行任務,就好像什麼都沒發生一樣。

3.1.1 運作時

我們假設 Flink 應用正常運作,Flink Kafka Consumer 消費 topic 為 Topic-A,Topic-A 隻有一個 partition。在運作期間,主要做了這麼幾件事:

KafkaFetcher 不斷的從 Kafka 消費資料,消費的資料會發送到下遊算子并在内部記錄已消費過的 offset。下圖描述的是 Flink Kafka Consumer 從消費 Kafka 消息到将消息發送到下遊算子的一個處理過程。

Flink Kafka Connector 與 Exactly Once 剖析

接下來我們再結合消息真正開始處理後,KafkaTopicPartitionState 結構中的資料變化。

Flink Kafka Connector 與 Exactly Once 剖析

可以看到,随着應用的運作,KafkaTopicPartitionState 中的 offset 屬性值發生了變化,它記錄了已經發送到下遊算子消息在 Kafka 中的 offset。在這裡由于消息 P0-C 已經發送到下遊算子,是以 KafkaTopicPartitionState.offset 變更為 2。

  • 狀态快照處理

如果 Flink 應用開啟了 Checkpoint,JobManager 會定期觸發 Checkpoint。FlinkKafkaConsumer 實作了 CheckpointedFunction,是以它具備快照狀态(snapshotState)的能力。在實作中,snapshotState 具體幹了這麼兩件事。

下圖描述當一輪 Checkpoint 開始時 FlinkKafkaConsumer 的處理過程。在例子中,FlinkKafkaConsumer 已經将 offset=3 的 P0-D 消息發送到下遊,當checkpoint 觸發時将 topic=Topic-A;partition=0;offset=3 作為最後的狀态持久化到外部存儲。

  • 将目前快照輪次(CheckpointId)與 topic、partition、offset 寫入到一個待送出 offset 的 Map 集合,其中 key 是 CheckpointId。
  • 将 FlinkKafkaConsumer 目前運作狀态持久化,即将 topic、partition、offset 持久化。一旦出現故障,就可以根據最新持久化的快照進行恢複。

下圖描述當一輪 Checkpoint 開始時 FlinkKafkaConsumer 的處理過程。在例子中,FlinkKafkaConsumer 已經将 offset=3 的 P0-D 消息發送到下遊,當 checkpoint 觸發時将 topic=Topic-A;partition=0;offset=3 作為最後的狀态持久化到外部存儲。

Flink Kafka Connector 與 Exactly Once 剖析
  • 快照結束處理

當所有算子基于同一輪次快照處理結束後,會調用 CheckpointListener.notifyCheckpointComplete(checkpointId) 通知算子 Checkpoint 完成,參數 checkpointId 指明了本次通知是基于哪一輪 Checkpoint。在 FlinkKafkaConsumer 的實作中,接到 Checkpoint 完成通知後會變更 KafkaTopicPartitionState.commitedOffset 屬性值。最後再将變更後的 commitedOffset 送出到 Kafka brokers 或 Zookeeper。

在這個例子中,commitedOffset 變更為 4,因為在快照階段,将 topic=Topic-A;partition=0;offset=3 的狀态做了快照,在真正送出 offset 時是将快照的 offset + 1 作為結果送出的。「源代碼 KafkaFetcher.java 207 行 doCommitInternalOffsetsToKafka 方法」。

Flink Kafka Connector 與 Exactly Once 剖析

3.1.2 故障恢複

Flink 應用崩潰後,開始進入恢複模式。假設 Flink Kafka Consumer 最後一次成功的快照狀态是 topic=Topic-A;partition=0;offset=3,在恢複期間按照下面的先後順序執行處理。

  • 狀态初始化

狀态初始化階段嘗試從狀态後端加載出可以用來恢複的狀态。它由 CheckpointedFunction.initializeState 接口定義。在 FlinkKafkaConsumer 的實作中,從狀态後端獲得快照并寫入到内部存儲結構 TreeMap,其中 key 是由 KafkaTopicPartition 表示的 topic 與 partition,value 為 offset。下圖描述的是故障恢複的第一個階段,從狀态後端獲得快照,并恢複到内部存儲。

Flink Kafka Connector 與 Exactly Once 剖析
  • function 初始化

function 初始化階段除了初始化 OffsetCommitMode 和 partitionDiscoverer 外,還會初始化一個 Map 結構,該結構用來存儲應用待消費資訊。如果應用需要從快照恢複狀态,則從待恢複狀态中初始化這個 Map 結構。下圖是該階段從快照恢複的處理過程。

Flink Kafka Connector 與 Exactly Once 剖析

function 初始化階段相容了正常啟動和狀态恢複時 offset 的初始化。對于正常啟動過程,StartupMode 的設定決定待消費資訊中的結果。該模式共有 5 種,預設為 StartupMode.GROUP_OFFSETS。

Flink Kafka Connector 與 Exactly Once 剖析
  • 開始執行

在該階段中,将 KafkaFetcher 初始化、初始化内部消費狀态、啟動消費線程等等,其目的是為了将 FlinkKafkaConsumer 運作起來,下圖描述了這個階段的處理流程。

Flink Kafka Connector 與 Exactly Once 剖析

這裡對圖中兩個步驟做個描述:

  • 步驟 3,使用狀态後端的快照結果 topic=Topic-A;partition=0;offset=3 初始化 Flink Kafka Consumer 内部維護的 Kafka 處理狀态。因為是恢複流程,是以這個内部維護的處理狀态也應該随着快照恢複。
  • 步驟 4,在真正消費 Kafka 資料前(指調用 KafkaConsumer.poll 方法),使用Kafka 提供的 seek 方法将 offset 重置到指定位置,而這個 offset 具體算法就是狀态後端 offset + 1。在例子中,消費 Kafka 資料前将 offset 重置為 4,是以狀态恢複後 KafkaConsumer 是從 offset=4 位置開始消費。「源代碼 KafkaConsumerThread.java 428 行」

3.1.3 總結

上述的 3 個步驟是恢複期間主要的處理流程,一旦恢複邏輯執行成功,後續處理流程與正常運作期間一緻。最後對 FlinkKafkaConsumer 用一句話做個總結。

「将 offset 送出權交給 FlinkKafkaConsumer,其内部維護 Kafka 消費及送出的狀态。基于 Kafka 可重複消費能力并配合 Checkpoint 機制和狀态後端存儲能力,就能實作 FlinkKafkaConsumer 容錯性,即 Source 端的 Exactly-once 語義」。

3.2 Flink Kafka Producer

Flink Kafka Producer 是 Flink 應用向 Kafka 寫出資料的一個實作。在 Kafka 0.11.0 及以上版本中提供了事務支援,這讓 Flink 搭載 Kafka 的事務特性可以輕松實作 Sink 端的 Exactly-once 語義。關于 Kafka 事務特性在《Kafka 幂等與事務》中做了詳細讨論。

在 Flink Kafka Producer 中,有一個非常重要的元件 FlinkKafkaInternalProducer,這個元件代理了 Kafka 用戶端 org.apache.kafka.clients.producer.KafkaProducer,它為 Flink Kafka Producer 操作 Kafka 提供了強有力的支撐。在這個元件内部,除了代理方法外,還提供了一些關鍵操作。個人認為,Flink Kafka Sink 能夠實作 Exactly-once 語義除了需要 Kafka 支援事務特性外,同時也離不開

FlinkKafkaInternalProducer 元件提供的支援,尤其是下面這些關鍵操作:

  • 事務重置 FlinkKafkaInternalProducer 元件中最關鍵的處理當屬事務重置,事務重置由 resumeTransaction 方法實作「源代碼 FlinkKafkaInternalProducer.java 144 行」。由于 Kafka 用戶端未暴露針對事務操作的 API,是以在這個方法内部,大量的使用了反射。方法中使用反射獲得 KafkaProducer 依賴的 transactionManager 對象,并将狀态後端快照的屬性值恢複到 transactionManager 對象中,這樣以達到讓 Flink Kafka Producer 應用恢複到重新開機前的狀态。

下面我們結合 Flink 運作時 和 故障恢複 兩個不同的處理時機來了解 Flink Kafka Producer 内部如何工作。

3.2.1 運作時

我們假設 Flink 應用正常運作,Flink Kafka Producer 正常接收上遊資料并寫到 Topic-B 的 Topic 中,Topic-B 隻有一個 partition。在運作期間,主要做以下幾件事:

  • 資料發送到 Kafka

上遊算子不斷的将資料 Sink 到 FlinkKafkaProducer,FlinkKafkaProducer 接到資料後封裝 ProducerRecord 對象并調用 Kafka 用戶端 KafkaProducer.send 方法将 ProducerRecord 對象寫入緩沖「源代碼 FlinkKafkaProducer.java 616 行」。下圖是該階段的描述:

Flink Kafka Connector 與 Exactly Once 剖析

Flink 1.7 及以上版本使用 FlinkKafkaProducer 作為 Kafka Sink,它繼承抽象類 TwoPhaseCommitSinkFunction,根據名字就能知道,這個抽象類主要實作兩階段送出。為了內建 Flink Checkpoint 機制,抽象類實作了 CheckpointedFunction 和 CheckpointListener,是以它具備快照狀态(snapshotState)能力。狀态快照處理具體做了下面三件事:

調用 KafkaProducer 用戶端 flush 方法,将緩沖區内全部記錄發送到 Kafka,但不送出。這些記錄寫入到 Topic-B,此時這些資料的事務隔離級别為 UNCOMMITTED,也就是說如果有個服務消費 Topic-B,并且設定的 isolation.level=read_committed,那麼此時這個消費端還無法 poll 到 flush 的資料,因為這些資料尚未 commit。什麼時候 commit 呢?在快照結束處理階段進行 commit,後面會提到。

将快照輪次與目前事務記錄到一個 Map 表示的待送出事務集合中,key 是目前快照輪次的 CheckpointId,value 是由 TransactionHolder 表示的事務對象。TransactionHolder 對象内部記錄了 transactionalId、producerId、epoch 以及 Kafka 用戶端 kafkaProducer 的引用。

持久化目前事務處理狀态,也就是将目前處理的事務詳情存入狀态後端,供應用恢複時使用。

下圖是狀态快照處理階段處理過程。

Flink Kafka Connector 與 Exactly Once 剖析

TwoPhaseCommitSinkFunction 實作了 CheckpointListener,應用中所有算子的快照處理成功後會收到基于某輪 Checkpoint 完成的通知。當 FlinkKafkaProducer 收到通知後,主要任務就是送出上一階段産生的事務,而具體要送出哪些事務是從上一階段生成的待送出事務集合中擷取的。

Flink Kafka Connector 與 Exactly Once 剖析

圖中第 4 步執行成功後,flush 到 Kafka 的資料從 UNCOMMITTED 變更為 COMMITTED,這意味着此時消費端可以 poll 到這批資料了。

2PC(兩階段送出)理論的兩個階段分别對應了 FlinkKafkaProducer 的狀态快照處理階段和快照結束處理階段,前者是通過 Kafka 的事務初始化、事務開啟、flush 等操作預送出事務,後者是通過 Kafka 的 commit 操作真正執行事務送出。

3.2.2 故障恢複

Flink 應用崩潰後,FlinkKafkaProducer 開始進入恢複模式。下圖為應用崩潰前的狀态描述:

Flink Kafka Connector 與 Exactly Once 剖析

在恢複期間主要的處理在狀态初始化階段。當 Flink 任務重新開機時會觸發狀态初始化,此時應用與 Kafka 已經斷開了連接配接。但在運作期間可能存在資料 flush 尚未送出的情況。

如果想重新送出這些資料需要從狀态後端恢複當時 KafkaProducer 持有的事務對象,具體一點就是恢複當時事務的 transactionalId、producerId、epoch。這個時候就用到了 FlinkKafkaInternalProducer 元件中的事務重置,在狀态初始化時從狀态後端獲得這些事務資訊,并重置到目前 KafkaProducer 中,再執行 commit 操作。這樣就可以恢複任務重新開機前的狀态,Topic-B 的消費端依然可以 poll 到應用恢複後送出的資料。

需要注意的是:如果這個重置并送出的動作失敗了,可能會造成資料丢失。下圖描述的是狀态初始化階段的處理流程:

Flink Kafka Connector 與 Exactly Once 剖析

3.2.3 總結

FlinkKafkaProducer 故障恢複期間,狀态初始化是比較重要的處理階段。這個階段在 Kafka 事務特性的強有力支撐下,實作了事務狀态的恢複,并且使得狀态存儲占用空間最小。依賴 Flink 提供的 TwoPhaseCommitSinkFunction 實作類,我們自己也可以對 Sink 做更多的擴充。

▼ Apache Flink 社群推薦 ▼

Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 大會議程重磅釋出,參與

問卷調研

就有機會免費擷取門票!

https://developer.aliyun.com/special/ffa2019

首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:

https://tianchi.aliyun.com/markets/tianchi/flink2019