天天看點

Flink 和 Pulsar 的批流融合

簡介:StreamNative 聯合創始人翟佳在本次演講中介紹了下一代雲原生消息流平台 Apache Pulsar,并講解如何通過 Apache Pulsar 原生的存儲計算分離的架構提供批流融合的基礎,以及 Apache Pulsar 如何與 Flink 結合,實作批流一體的計算。

GitHub 位址

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

Apache Pulsar 相對比較新,它于 2017 年加入 Apache 軟體基金會,2018 年才從 Apache 軟體基金會畢業并成為一個頂級項目。Pulsar 由于原生采用了存儲計算分離的架構,并且有專門為消息和流設計的存儲引擎 BookKeeper,結合 Pulsar 本身的企業級特性,得到了越來越多開發者的關注。今天的分享分為 3 個部分:

  • Apache Pulsar 是什麼;
  • Pulsar 的資料視圖;
  • Pulsar 與 Flink 的批流融合。

一、Apache Pulsar 是什麼

下圖是屬于消息領域的開源工具,從事消息或者基礎設施的開發者對這些一定不會陌生。雖然 Pulsar 在 2012 年開始開發,直到 2016 年才開源,但它在跟大家見面之前已經在雅虎的線上運作了很長時間。這也是為什麼它一開源就得到了很多開發者關注的原因,它已經是一個經過線上檢驗的系統。

Flink 和 Pulsar 的批流融合

Pulsar 跟其他消息系統最根本的不同在于兩個方面:

  • 一方面,Pulsar 采用存儲計算分離的雲原生架構;
  • 另一方面,Pulsar 有專門為消息而設計的存儲引擎,Apache BookKeeper。

架構

下圖展示了 Pulsar 存儲計算分離的架構:

  • 首先在計算層,Pulsar Broker 不儲存任何狀态資料、不做任何資料存儲,我們也稱之為服務層。
  • 其次,Pulsar 擁有一個專門為消息和流設計的存儲引擎 BookKeeper,我們也稱之為資料層。
Flink 和 Pulsar 的批流融合

這個分層的架構對使用者的叢集擴充十分友善:

  • 如果想要支援更多的 Producer 和 Consumer,可以擴充上面無狀态的 Broker 層;
  • 如果要做更多的資料存儲,可以單獨擴充底層存儲層。

這個雲原生的架構有兩個主要特點:

  • 第一個是存儲計算的分離;
  • 另外一個特點是每一層都是一個節點對等的架構。

從節點對等來說,Broker 層不存儲資料,是以很容易實作節點對等。但是 Pulsar 在底層的存儲也是節點對等狀态:在存儲層,BookKeeper 沒有采用 master/slave 這種主從同步的方式,而是通過 Quorum 的方式。

如果是要保持多個資料備份,使用者通過一個 broker 并發地寫三個存儲節點,每一份資料都是一個對等狀态,這樣在底層的節點也是一個對等的狀态,使用者要做底層節點的擴容和管理就會很容易。有這樣節點對等的基礎,會給使用者帶來很大的雲原生的便捷,友善使用者在每一層單獨擴容,也會提高使用者的線上系統的可用性和維護性。

同時,這種分層的架構為我們在 Flink 做批流融合打好了基礎。因為它原生分成了兩層,可以根據使用者的使用場景和批流的不同通路模式,來提供兩套不同的 API。

  • 如果是實時資料的通路,可以通過上層 Broker 提供的 Consumer 接口;
  • 如果是曆史資料的通路,可以跳過 Broker,用存儲層的 reader 接口,直接通路底層存儲層。

存儲 BookKeeper

Pulsar 另一個優勢是有專門為流和消息設計的存儲引擎 Apache BookKeeper。它是一個簡單的 write-ahead-log 抽象。Log 抽象和流的抽象類似,所有的資料都是源源不斷地從尾部直接追加。

它給使用者帶來的好處就是寫入模式比較簡單,可以帶來比較高的吞吐。在一緻性方面,BookKeeper 結合了 PAXOS 和 ZooKeeper ZAB 這兩種協定。BookKeeper 暴露給大家的就是一個 log 抽象。你可以簡單認為它的一緻性很高,可以實作類似 Raft 的 log 層存儲。BookKeeper 的誕生是為了服務我們在 HDFS naming node 的 HA,這種場景對一緻性要求特别高。這也是為什麼在很多關鍵性的場景裡,大家會選擇 Pulsar 和 BookKeeper 做存儲的原因。

BookKeeper 的設計中,有專門的讀寫隔離,簡單了解就是,讀和寫是發生在不同的磁盤。這樣的好處是在批流融合的場景可以減少與曆史資料讀取的互相幹擾,很多時候使用者讀最新的實時資料時,不可避免會讀到曆史資料,如果有一個專門為曆史資料而準備的單獨的磁盤,曆史資料和實時資料的讀寫不會有 IO 的争搶,會對批流融合的 IO 服務帶來更好的體驗。

Flink 和 Pulsar 的批流融合

應用場景

Pulsar 場景應用廣泛。下面是 Pulsar 常見的幾種應用場景:

  • 第一,因為 Pulsar 有 BookKeeper,資料一緻性特别高,Pulsar 可以用在計費平台、支付平台和交易系統等,對資料服務品質,一緻性和可用性要求很高的場景。
  • 第二種應用場景是 Worker Queue / Push Notifications / Task Queue,主要是為了實作系統之間的互相解耦。
  • 第三種場景,與 Pulsar 對消息和隊列兩種場景的支援比較相關。Pulsar 支援 Queue 消費模式,也支援 Kafka 高帶寬的消費模型。後面我會專門講解 Queue 消費模型與 Flink 結合的優勢。
  • 第四個場景是 IoT 應用,因為 Pulsar 在服務端有 MQTT 協定的解析,以及輕量級的計算 Pulsar Functions。
  • 第五個方面是 unified data processing,把 Pulsar 作為一個批流融合的存儲的基礎。

我們在 2020 年 11 月底的 Pulsar Summit 亞洲峰會,邀請 40 多位講師來分享他們的 Pulsar 落地案例。如果大家對 Pulsar 應用場景比較感興趣,可以關注 B 站上 StreamNative 的賬号,觀看相關視訊。

Flink 和 Pulsar 的批流融合

二、Pulsar 的資料視圖

在這些應用場景中,Unified Data Processing 尤為重要。關于批流融合,很多國内使用者的第一反應是選擇 Flink。我們來看 Pulsar 和 Flink 結合有什麼樣的優勢?為什麼使用者會選擇 Pulsar 和 Flink 做批流融合。

首先,我們先從 Pulsar 的資料視圖來展開。跟其他的消息系統一樣,Pulsar 也是以消息為主體,以 Topic 為中心。所有的資料都是 producer 交給 topic,然後 consumer 從 topic 訂閱消費消息。

Flink 和 Pulsar 的批流融合

Partition 分區

為了友善擴充,Pulsar 在 topic 内部也有分區的概念,這跟很多消息系統都類似。上面提到 Pulsar 是一個分層的架構,它采用分區把 topic 暴露給使用者,但是在内部,實際上每一個分區又可以按照使用者指定的時間或者大小切成一個分片。一個 Topic 最開始建立的時候隻有一個 active 分片,随着使用者指定的時間到達以後,會再切一個新的分片。在新開一個分片的過程中,存儲層可以根據各個節點的容量,選擇容量最多的節點來存儲這個新的分片。

這樣的好處是,topic 的每一個分片都會均勻地散布在存儲層的各個節點上,實作資料存儲的均衡。如果使用者願意,就可以用整個存儲叢集來存儲分區,不再被單個節點容量所限制。如下圖所示,該 Topic 有 4 個分區,每一個分區被拆成多個分片,使用者可以按照時間(比如 10 分鐘或者一個小時),也可以按照大小(比如 1G 或者 2G)切一個分片。分片本身有順序性,按照 ID 逐漸遞增,分片内部所有消息按照 ID 單調遞增,這樣很容易保證順序性。

Flink 和 Pulsar 的批流融合

Stream 流存儲

我們再從單個分片來看一下,在常見流(stream)資料處理的概念。使用者所有的資料都是從流的尾部不斷追加,跟流的概念相似,Pulsar 中 Topic 的新資料不斷的添加在 Topic 的最尾部。不同的是,Pulsar 的 Topic 抽象提供了一些優勢:

  • 首先,它采用了存儲和計算分離的架構。在計算層,它更多的是一個消息服務層,可以快速地通過 consumer 接口,把最新的資料傳回給使用者,使用者可以實時的擷取最新的資料;
  • 另外一個好處是,它分成多個分片,如果使用者指定時間,從中繼資料可以找到對應的分片,使用者可以繞過實時的流直接讀取存儲層的分片;
  • 還有一個優勢是,Pulsar 可以提供無限的流存儲。

做基礎設施的同學,如果看到按照時間分片的架構,很容易想到把老的分片搬到二級存儲裡面去,在 Pulsar 裡也是這樣做的。使用者可以根據 topic 的消費熱度,設定把老的,或者超過時限或大小的資料自動搬到二級存儲中。使用者可以選擇使用 Google,微軟的 Azure 或者 AWS 來存儲老的分片,同時也支援 HDFS 存儲。

這樣的好處是:對最新的資料可以通過 BookKeeper 做快速傳回,對于老的冷資料可以利用網絡存儲雲資源做一個無限的流存儲。這就是 Pulsar 可以支援無限流存儲的原因,也是批流融合的一個基礎。

總體來說,Pulsar 通過存儲計算分離,為大家提供了實時資料和曆史資料兩套不同的通路接口。使用者可以依據内部不同的分片位置,根據 metadata 來選擇使用哪種接口來通路資料。同時根據分片機制可以把老的分片放到二級存儲中,這樣可以支撐無限的流存儲。

Pulsar 的統一展現在對分片中繼資料管理的方面。每個分片可以按照時間存放成不同的存儲媒體或格式,但 Pulsar 通過對每個分片的 metadata 管理,來對外提供一個分區的邏輯概念。在通路分區中的一個分片的時候我可以拿到它的中繼資料,知道它的在分區中的順序,資料的存放位置和儲存類型 Pulsar 對每一個分片的 metadata 的管理,提供了統一的 topic 的抽象。

Flink 和 Pulsar 的批流融合

三、Pulsar 和 Flink 的批流融合

在 Flink 中,流是一個基礎的概念,Pulsar 可以作為流的載體來存儲資料。如果使用者做一個批的計算,可以認為它是一個有界的流。對 Pulsar 來說,這就是一個 Topic 有界範圍内的分片。

在圖中我們可以看到,topic 有很多的分片,如果确定了起止的時間,使用者就可以根據這個時間來确定要讀取的分片範圍。對實時的資料,對應的是一個連續的查詢或通路。對 Pulsar 的場景來說就是不停的去消費 Topic 的尾部資料。這樣,Pulsar 的 Topic 的模型就可以和 Flink 流的概念很好的結合,Pulsar 可以作為 Flink 流計算的載體。

  • 有界的計算可以視為一個有界的流,對應 Pulsar 一些限定的分片;
  • 實時的計算就是一個無界的流,對 Topic 中最新的資料做查詢和通路。
Flink 和 Pulsar 的批流融合

對有界的流和無界的流,Pulsar 采取不同的響應模式:

  • 第一種是對曆史資料的響應。如下圖所示,左下角是使用者的 query,給定起止的時間限定流的範圍。對 Pulsar 的響應分為幾步:
    • 第一步,找到 Topic,根據我們統一管理的 metadata,可以擷取這個 topic 裡面所有分片的 metadata 的清單;
    • 第二步,根據時間限定在 metadata 清單中,通過兩分查找的方式來擷取起始分片和終止的分片,選擇需要掃的分片;
    • 第三步,找到這些分片以後通過底層存儲層的接口通路需要通路的這些分片,完成一次曆史資料的查找。
Flink 和 Pulsar 的批流融合
  • 對實時資料的查找,Pulsar 也提供和 Kafka 相同的接口,可以通過 consumer 的方式來讀取最尾端分片(也就是最新的資料),通過 consumer 接口對資料進行實時通路。它不停地查找最新的資料,完成之後再進行下一次查找。這種情況下,使用 Pulsar Pub/Sub 接口是一種最直接最有效的方式。
Flink 和 Pulsar 的批流融合

簡單來說,Flink 提供了統一的視圖讓使用者可以用統一的 API 來處理 streaming 和曆史資料。以前,資料科學家可能需要編寫兩套應用分别用來處理實時資料和曆史資料,現在隻需要一套模型就能夠解決這種問題。

Pulsar 主要提供一個資料的載體,通過基于分區分片的架構為上面的計算層提供流的存儲載體。因為 Pulsar 采用了分層分片的架構,它有針對流的最新資料通路接口,也有針對批的對并發有更高要求的存儲層通路接口。同時它提供無限的流存儲和統一的消費模型。

Flink 和 Pulsar 的批流融合

四、Pulsar 現有能力和進展

最後我們額外說一下 Pulsar 現在有怎樣的能力和最近的一些進展。

現有能力

schema

在大資料中,schema 是一個特别重要的抽象。在消息領域裡面也是一樣,在 Pulsar 中,如果 producer 和 consumer 可以通過 schema 來簽訂一套協定,那就不需要生産端和消費端的使用者再線下溝通資料的發送和接收的格式。在計算引擎中我們也需要同樣的支援。

在 Pulsar-Flink connector 中,我們借用 Flink schema 的 interface,對接 Pulsar 自帶的 Schema,Flink 能夠直接解析存儲在Pulsar 資料的 schema。這個 schema 包括兩種:

  • 第一種是我們常見的對每一個消息的中繼資料(meatdata)包括消息的 key、消息産生時間、或是其他中繼資料的資訊。
  • 另一種是對消息的内容的資料結構的描述,常見的是 Avro 格式,在使用者通路的時候就可以通過Schema知道每個消息對應的資料結構。

同時我們結合 Flip-107,整合 Flink metadata schema 和 Avro 的 metadata,可以将兩種 Schema 結合在一起做更複雜的查詢。

Flink 和 Pulsar 的批流融合

source

有了這個 schema,使用者可以很容易地把它作為一個 source,因為它可以從 schema 的資訊了解每個消息。

Flink 和 Pulsar 的批流融合

Pulsar Sink

我們也可以把在 Flink 中的計算結果傳回給 Pulsar 把它做為 Sink。

Flink 和 Pulsar 的批流融合

Streaming Tables

有了 Sink 和 Source 的支援,我們就可以把 Flink table 直接暴露給使用者。使用者可以很簡單的把 Pulsar 作為 Flink 的一個 table,查找資料。

Flink 和 Pulsar 的批流融合

write to straming tables

下圖展示如何把計算結果或資料寫到 Pulsar 的 Topic 中去。

Flink 和 Pulsar 的批流融合

Pulsar Catalog

Pulsar 自帶了很多企業流的特性。Pulsar 的 topic(e.g. persistent://tenant_name/namespace_name/topic_name)不是一個平鋪的概念,而是分很多級别。有 tenant 級别,還有 namespace 級别。這樣可以很容易得與 Flink 常用的 Catalog 概念結合。

如下圖所示,定義了一個 Pulsar Catalog,database 是 tn/ns,這是一個路徑表達,先是 tenant,然後是 namespace,最後再挂一個 topic。這樣就可以把Pulsar 的 namespace 當作 Flink 的 Catalog,namespace 下面會有很多 topic,每個 topic 都可以是 Catalog 的 table。這就可以很容易地跟 Flink Cataglog 做很好的對應。在下圖中,上方的是 Catalog 的定義,下方則示範如何使用這個 Catalog。不過,這裡還需要進一步完善,後邊也有計劃做 partition 的支援。

Flink 和 Pulsar 的批流融合

FLIP-27

FLIP-27 是 Pulsar - Flink 批流融合的一個代表。前面介紹了 Pulsar 提供統一的視圖,管理所有 topic 的 metadata。在這個視圖中,根據 metadata 标記每個分片的資訊,再依靠 FLIP-27 的 framework 達到批流融合的目的。FLIP-27 中有兩個概念:Splitter 和 reader。

它的工作原理是這樣的,首先會有一個 splitter 把資料源做切割,之後交給 reader 讀取資料。對 Pulsar 來說,splitter 處理的還是 Pulsar 的一個 topic。抓到 Pulsar topic 的 metadata 之後,根據每個分片的中繼資料來判斷這個分片存儲在什麼位置,再選最合适的 reader 進行通路。Pulsar 提供統一的存儲層,Flink 根據 splitter 對每個分區的不同位置和格式的資訊,選擇不同的 reader 讀取 Pulsar 中的資料。

Flink 和 Pulsar 的批流融合

Source 高并發

另一個和 Pulsar 消費模式緊密相關的是。很多 Flink 使用者面臨的問題是如何讓 Flink 更快地執行任務。例如,使用者給了 10 個并發度,它會有 10 個 job 并發,但假如一個 Kafka 的 topic 隻有 5 個分區,由于每個分區隻能被一個 job 消費,就會有 5 個 Flink job 是空閑的。如果想要加快消費的并發度,隻能跟業務方協調多開幾個分區。這樣的話,從消費端到生産端和後邊的運維方都會覺得特别複雜。并且它很難做到實時的按需更新。

而 Pulsar 不僅支援 Kafka 這種每個分區隻能被一個 active 的 consumer 消費的情況,也支援 Key-Shared 的模式,多個 consumer 可以共同對一個分區進行消費,同時保證每個 key 的消息隻發給一個 consumer,這樣就保證了 consumer 的并發,又同時保證了消息的有序。

對前面的場景,我們在 Pulsar Flink 裡做了 Key-shared 消費模式的支援。同樣是 5 個分區,10 個并發 Flink job。但是我可以把 key 的範圍拆成 10 個。每一個 Flink 的子任務消費在 10 個 key 範圍中的一個。這樣從使用者消費端來說,就可以很好解耦分區的數量和 Flink 并發度之間的關系,也可以更好提供資料的并發。

Flink 和 Pulsar 的批流融合

自動 Reader 選擇

另外一個方向是上文提到的 Pulsar 已經有統一的存儲基礎。我們可以在這個基礎上根據使用者不同的 segment metadata 選擇不同的 reader。目前,我們已經實作該功能。

Flink 和 Pulsar 的批流融合

近期工作

最近,我們也在做和 Flink 1.12 整合相關的工作。Pulsar-Flink 項目也在不停地做疊代,比如我們增加了對 Pulsar 2.7 中事務的支援,并且把端到端的 Exactly-Once 整合到 Pulsar Flink repo 中;另外的工作是如何讀取 Parquet 格式的二級存儲的列資料;以及使用 Pulsar 存儲層做 Flink 的 state 存儲等。

Flink 和 Pulsar 的批流融合

更多 Flink 相關技術交流,可掃碼加入社群釘釘大群~

Flink 和 Pulsar 的批流融合

活動推薦一

Flink 和 Pulsar 的批流融合

報名連結:

https://1712399719478.huodongxing.com/event/1594531547711

活動推薦二

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
Flink 和 Pulsar 的批流融合

繼續閱讀