天天看點

Flink 原理與實作:Aysnc I/O

原文連結: http://wuchong.me/blog/2017/05/17/flink-internals-async-io/

背景

Async I/O 是阿裡巴巴貢獻給社群的一個呼聲非常高的特性,于1.2版本引入。主要目的是為了解決與外部系統互動時網絡延遲成為了系統瓶頸的問題。

流計算系統中經常需要與外部系統進行互動,比如需要查詢外部資料庫以關聯上使用者的額外資訊。通常,我們的實作方式是向資料庫發送使用者

a

的查詢請求,然後等待結果傳回,在這之前,我們無法發送使用者

b

的查詢請求。這是一種同步通路的模式,如下圖左邊所示。

Flink 原理與實作:Aysnc I/O

圖檔來自官方文檔

圖中棕色的長條表示等待時間,可以發現網絡等待時間極大地阻礙了吞吐和延遲。為了解決同步通路的問題,異步模式可以并發地處理多個請求和回複。也就是說,你可以連續地向資料庫發送使用者

a

b

c

等的請求,與此同時,哪個請求的回複先傳回了就處理哪個回複,進而連續的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實作原理。

Async I/O

使用 Async I/O 的前提是需要一個支援異步請求的用戶端。當然,沒有異步請求用戶端的話也可以将同步用戶端丢到線程池中執行作為異步用戶端。Flink 提供了非常簡潔的API,讓使用者隻需要關注業務邏輯,一些髒活累活比如消息順序性和一緻性保證都由架構處理了,多麼棒的事情!

使用方式如下方代碼片段所示(來自官網文檔):

/** 'AsyncFunction' 的一個實作,向資料庫發送異步請求并設定回調 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** 可以異步請求的特定資料庫的用戶端 */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** future 的回調的執行上下文(目前線程) */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())

    override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {

        // 發起一個異步請求,傳回結果的 future
        val resultFuture: Future[String] = client.query(str)

        // 設定請求完成時的回調: 将結果傳遞給 collector
        resultFuture.onSuccess {
            case result: String => asyncCollector.collect(Iterable((str, result)));
        }
    }
}

// 建立一個原始的流
val stream: DataStream[String] = ...

// 添加一個 async I/O 的轉換
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.(un)orderedWait(
      stream, new AsyncDatabaseRequest(),
      1000, TimeUnit.MILLISECONDS, // 逾時時間
      100)  // 進行中的異步請求的最大數量
      

AsyncDataStream

 有兩個靜态方法,

orderedWait

 和 

unorderedWait

,對應了兩種輸出模式:有序和無序。

  • 有序:消息的發送順序與接受到的順序相同(包括 watermark ),也就是先進先出。
  • 無序:
    • 在 ProcessingTime 的情況下,完全無序,先傳回的結果先發送。
    • 在 EventTime 的情況下,watermark 不能超越消息,消息也不能超越 watermark,也就是說 watermark 定義的順序的邊界。在兩個 watermark 之間的消息的發送是無序的,但是在watermark之後的消息不能先于該watermark之前的消息發送。

原理實作

AsyncDataStream.(un)orderedWait

 的主要工作就是建立了一個 

AsyncWaitOperator

AsyncWaitOperator

 是支援異步 IO 通路的算子實作,該算子會運作 

AsyncFunction

 并處理異步傳回的結果,其内部原理如下圖所示。

Flink 原理與實作:Aysnc I/O

如圖所示,

AsyncWaitOperator

 主要由兩部分組成:

StreamElementQueue

Emitter

。StreamElementQueue 是一個 Promise 隊列,所謂 Promise 是一種異步抽象表示将來會有一個值(參考 Scala Promise 了解更多),這個隊列是未完成的 Promise 隊列,也就是進行中的請求隊列。Emitter 是一個單獨的線程,負責發送消息(收到的異步回複)給下遊。

圖中

E5

表示進入該算子的第五個元素(”Element-5”),在執行過程中首先會将其包裝成一個 “Promise” 

P5

,然後将

P5

放入隊列。最後調用 

AsyncFunction

 的 

ayncInvoke

 方法,該方法會向外部服務發起一個異步的請求,并注冊回調。該回調會在異步請求成功傳回時調用 

AsyncCollector.collect

 方法将傳回的結果交給架構處理。實際上 

AsyncCollector

 是一個 Promise ,也就是 

P5

,在調用 

collect

 的時候會标記 Promise 為完成狀态,并通知 Emitter 線程有完成的消息可以發送了。Emitter 就會從隊列中拉取完成的 Promise ,并從 Promise 中取出消息發送給下遊。

消息的順序性

上文提到 Async I/O 提供了兩種輸出模式。其實細分有三種模式: 有序,ProcessingTime 無序,EventTime 無序。Flink 使用隊列來實作不同的輸出模式,并抽象出一個隊列的接口(

StreamElementQueue

),這種分層設計使得

AsyncWaitOperator

Emitter

不用關心消息的順序問題。

StreamElementQueue

有兩種具體實作,分别是 

OrderedStreamElementQueue

UnorderedStreamElementQueue

UnorderedStreamElementQueue

 比較有意思,它使用了一套邏輯巧妙地實作完全無序和 EventTime 無序。

有序

有序比較簡單,使用一個隊列就能實作。所有新進入該算子的元素(包括 watermark),都會包裝成 Promise 并按到達順序放入該隊列。如下圖所示,盡管

P4

的結果先傳回,但并不會發送,隻有 

P1

 (隊首)的結果傳回了才會觸發 Emitter 拉取隊首元素進行發送。

Flink 原理與實作:Aysnc I/O

ProcessingTime 無序

ProcessingTime 無序也比較簡單,因為沒有 watermark,不需要協調 watermark 與消息的順序性,是以使用兩個隊列就能實作,一個 

uncompletedQueue

 一個 

completedQueue

。所有新進入該算子的元素,同樣的包裝成 Promise 并放入 

uncompletedQueue

 隊列,當

uncompletedQueue

隊列中任意的Promise傳回了資料,則将該 Promise 移到 

completedQueue

 隊列中,并通知 Emitter 消費。如下圖所示:

Flink 原理與實作:Aysnc I/O

EventTime 無序

EventTime 無序類似于有序與 ProcessingTime 無序的結合體。因為有 watermark,需要協調 watermark 與消息之間的順序性,是以

uncompletedQueue

中存放的元素從原先的 Promise 變成了 Promise 集合。如果進入算子的是消息元素,則會包裝成 Promise 放入隊尾的集合中。如果進入算子的是 watermark,也會包裝成 Promise 并放到一個獨立的集合中,再将該集合加入到 

uncompletedQueue

 隊尾,最後再建立一個空集合加到 

uncompletedQueue

 隊尾。這樣,watermark 就成了消息順序的邊界。隻有處在隊首的集合中的 Promise 傳回了資料,才能将該 Promise 移到 

completedQueue

 隊列中,由 Emitter 消費發往下遊。隻有隊首集合空了,才能處理第二個集合。這樣就保證了當且僅當某個 watermark 之前所有的消息都已經被發送了,該 watermark 才能被發送。過程如下圖所示:

Flink 原理與實作:Aysnc I/O

快照與恢複

分布式快照機制是為了保證狀态的一緻性。我們需要分析哪些狀态是需要快照的,哪些是不需要的。首先,已經完成回調并且已經發往下遊的元素是不需要快照的。否則,會導緻重發,那就不是 exactly-once 了。而已經完成回調且未發往下遊的元素,加上未完成回調的元素,就是上述隊列中的所有元素。

是以快照的邏輯也非常簡單,(1)清空原有的狀态存儲,(2)周遊隊列中的所有 Promise,從中取出 

StreamElement

(消息或 watermark)并放入狀态存儲中,(3)執行快照操作。

恢複的時候,從快照中讀取所有的元素全部再處理一次,當然包括之前已完成回調的元素。是以在失敗恢複後,會有元素重複請求外部服務,但是每個回調的結果隻會被發往下遊一次。

本文的原理和實作分析基于 Flink 1.3 版本。

參考資料

繼續閱讀