天天看點

DataLeap的Catalog系統近實時消息同步能力優化

更多技術交流、求職機會,歡迎關注位元組跳動資料平台微信公衆号,回複【1】進入官方交流群

摘要

位元組資料中台DataLeap的Data Catalog系統通過接收MQ中的近實時消息來同步部分中繼資料。Apache Atlas對于實時消息的消費處理不滿足性能要求,内部使用Flink任務的處理方案在ToB場景中也存在諸多限制,是以團隊自研了輕量級異步消息處理架構,很好的支援了位元組内部和火山引擎上同步中繼資料的訴求。本文定義了需求場景,并詳細介紹架構的設計與實作。

背景

動機

位元組資料中台DataLeap的Data Catalog系統基于Apache Atlas搭建,其中Atlas通過Kafka擷取外部系統的中繼資料變更消息。在開源版本中,每台伺服器支援的Kafka Consumer數量有限,在每日百萬級消息體量下,經常有長延時等問題,影響使用者體驗。

在2020年底,我們針對Atlas的消息消費部分做了重構,将消息的消費和處理從後端服務中剝離出來,并編寫了Flink任務承擔這部分工作,比較好的解決了擴充性和性能問題。然而,到2021年年中,團隊開始重點投入私有化部署和火山公有雲支援,對于Flink叢集的依賴引入了可維護性的痛點。

在仔細的分析了使用場景和需求,并調研了現成的解決方案後,我們決定投入人力自研一個消息處理架構。目前這個架構很好的支援了位元組内部以及ToB場景中Data Catalog對于消息消費和處理的場景。

本文會詳細介紹架構解決的問題,整體的設計,以及實作中的關鍵決定。

需求定義

使用下面的表格将具體場景定義清楚。

DataLeap的Catalog系統近實時消息同步能力優化

相關工作

在啟動自研之前,我們評估了兩個比較相關的方案,分别是Flink和Kafka Streaming。

Flink是我們之前生産上使用的方案,在能力上是符合要求的,最主要的問題是長期的可維護性。在公有雲場景,那個階段Flink服務在火山雲上還沒有釋出,我們自己的服務又有嚴格的時間線,是以必須考慮替代;在私有化場景,我們不确認客戶的環境一定有Flink叢集,即使部署的資料底座中帶有Flink,後續的維護也是個頭疼的問題。另外一個角度,作為通用流式處理架構,Flink的大部分功能其實我們并沒有用到,對于單條消息的流轉路徑,其實隻是簡單的讀取和處理,使用Flink有些“殺雞用牛刀”了。

另外一個比較标準的方案是Kafka Streaming。作為Kafka官方提供的架構,對于流式處理的語義有較好的支援,也滿足我們對于輕量的訴求。最終沒有采用的主要考慮點是兩個:

  • 對于Offset的維護不夠靈活:我們的場景不能使用自動送出(會丢消息),而對于同一個Partition中的資料又要求一定程度的并行處理,使用Kafka Streaming的原生接口較難支援。
  • 與Kafka強綁定:大部分場景下,我們團隊不是中繼資料消息隊列的擁有者,也有團隊使用RocketMQ等提供中繼資料變更,在應用層,我們希望使用同一套架構相容。

設計

概念說明

  • MQ Type:Message Queue的類型,比如Kafka與RocketMQ。後續内容以Kafka為主,設計一定程度相容其他MQ。
  • Topic:一批消息的集合,包含多個Partition,可以被多個Consumer Group消費。
  • Consumer Group:一組Consumer,同一Group内的Consumer資料不會重複消費。
  • Consumer:消費消息的最小機關,屬于某個Consumer Group。
  • Partition:Topic中的一部分資料,同一Partition内消息有序。同一Consumer Group内,一個Partition隻會被其中一個Consumer消費。
  • Event:由Topic中的消息轉換而來,部分屬性如下。

    Event Type:消息的類型定義,會與Processor有對應關系;

    Event Key:包含消息Topic、Partition、Offset等中繼資料,用來對消息進行Hash操作;

  • Processor:消息處理的單元,針對某個Event Type定制的業務邏輯。
  • Task:消費消息并處理的一條Pipeline,Task之間資源是互相獨立的。

架構架構

DataLeap的Catalog系統近實時消息同步能力優化

整個架構主要由MQ Consumer, Message Processor和State Manager組成。

  • MQ Consumer:負責從Kafka Topic拉取消息,并根據Event Key将消息投放到内部隊列,如果消息需要延時消費,會被投放到對應的延時隊列;該子產品還負責定時查詢State Manager中記錄的消息狀态,并根據傳回送出消息Offset;上報與消息消費相關的Metric。
  • Message Processor:負責從隊列中拉取消息并異步進行處理,它會将消息的處理結果更新給State Manager,同時上報與消息處理相關的Metric。
  • State Manager:負責維護每個Kafka Partition的消息狀态,并暴露目前應送出的Offset資訊給MQ Consumer。

實作

線程模型

DataLeap的Catalog系統近實時消息同步能力優化

每個Task可以運作在一台或多台執行個體,建議部署到多台機器,以獲得更好的性能和容錯能力。

每台執行個體中,存在兩組線程池:

  • Consumer Pool:負責管理MQ Consumer Thread的生命周期,當服務啟動時,根據配置拉起一定規模的線程,并在服務關閉時確定每個Thread安全退出或者逾時停止。整體有效Thread的上限與Topic的Partition的總數有關。
  • Processor Pool:負責管理Message Processor Thread的生命周期,當服務啟動時,根據配置拉起一定規模的線程,并在服務關閉時確定每個Thread安全退出或者逾時停止。可以根據Event Type所需要處理的并行度來靈活配置。

兩類Thread的性質分别如下:

  • Consumer Thread:每個MQ Consumer會封裝一個Kafka Consumer,可以消費0個或者多個Partition。根據Kafka的機制,當MQ Consumer Thread的個數超過Partition的個數時,目前Thread不會有實際流量。
  • Processor Thread:唯一對應一個内部的隊列,并以FIFO的方式消費和處理其中的消息。

StateManager

DataLeap的Catalog系統近實時消息同步能力優化

在State Manager中,會為每個Partition維護一個優先隊列(最小堆),隊列中的資訊是Offset,兩個優先隊列的職責如下:

  • 進行中的隊列:一條消息轉化為Event後,MQ Consumer會調用StateManager接口,将消息Offset 插入該隊列。
  • 處理完的隊列:一條消息處理結束或最終失敗,Message Processor會調用StateManager接口,将消息Offset插入該隊列。
  1. MQ Consumer會周期性的檢查目前可以Commit的Offset,情況枚舉如下:
  • 進行中的隊列堆頂 < 處理完的隊列堆頂或者處理完的隊列為空:代表目前消費回來的消息還在處理過程中,本輪不做Offset送出。
  • 進行中的隊列堆頂 = 處理完的隊列堆頂:表示目前消息已經處理完,兩邊同時出隊,并記錄目前堆頂為可送出的Offset,重複檢查過程。
  • 進行中的隊列堆頂 > 處理完的隊列堆頂:異常情況,通常是資料回放到某些中間狀态,将處理完的隊列堆頂出堆。

注意:當發生Consumer的Rebalance時,需要将對應Partition的隊列清空

KeyBy與Delay Processing的支援

因源頭的Topic和消息格式有可能不可控制,是以MQ Consumer的職責之一是将消息統一封裝為Event。

根據需求,會從原始消息中拼裝出Event Key,對Key取Hash後,相同結果的Event會進入同一個隊列,可以保證分區内的此類事件處理順序的穩定,同時将消息的消費與處了解耦,支援增大内部隊列數量來增加吞吐。

Event中也支援設定是否延遲處理屬性,可以根據Event Time延遲固定時間後處理,需要被延遲處理的事件會被發送到有界延遲隊列中,有界延遲隊列的實作繼承了DelayQueue,限制DelayQueue長度, 達到限定值入隊會被阻塞。

異常處理

Processor在消息處理過程中,可能遇到各種異常情況,設計架構的動機之一就是為業務邏輯的編寫者屏蔽掉這種複雜度。Processor相關架構的邏輯會與State Manager協作,處理異常并充分暴露狀态。比較典型的異常情況以及處理政策如下:

  • 處理消息失敗:自動觸發重試,重試到使用者設定的最大次數或預設值後會将消息失敗狀态通知State Manager。
  • 處理消息逾時:逾時對于吞吐影響較大,且通常重試的效果不明顯,是以目前政策是不會對消息重試,直接通知State Manager 消息處理失敗。
  • 處理消息較慢:上遊Topic存在Lag,Message Consumer消費速率大于Message Processor處理速率時,消息會堆積在隊列中,達到隊列最大長度,Message Consumer 會被阻塞在入隊操作,停止拉取消息,類似Flink架構中的背壓。

監控

為了友善運維,在架構層面暴露了一組監控名額,并支援使用者自定義Metrics。其中預設支援的Metrics如下表所示:

線上運維Case舉例

實際生産環境運作時,偶爾需要做些運維操作,其中最常見的是消息堆積和消息重放。

  1. 對于Conusmer Lag這類問題的處理步驟大緻如下:
  • 檢視Enqueue Time,Queue Length的監控确定服務内隊列是否有堆積。
  • 如果隊列有堆積,檢視Process Time名額,确定是否是某個Processor處理慢,如果是,根據名額中的Tag 确定事件類型等屬性特征,判斷業務邏輯或者Key設定是否合理;全部Processor 處理慢,可以通過增加Processor并行度來解決。
  • 如果隊列無堆積,排除網絡問題後,可以考慮增加Consumer并行度至Topic Partition 上限。

消息重放被觸發的原因通常有兩種,要麼是業務上需要重放部分資料做補全,要麼是遇到了事故需要修複資料。為了應對這種需求,我們在架構層面支援了根據時間戳重置Offset的能力。具體操作時的步驟如下:

  • 使用服務測暴露的API,啟動一台執行個體使用新的Consumer GroupId: {newConsumerGroup} 從某個startupTimestamp開始消費
  • 更改全部配置中的 Consumer GroupId 為 {newConsumerGroup}
  • 分批重新開機所有執行個體

總結

為了解決位元組資料中台DataLeap中Data Catalog系統消費近實時中繼資料變更的業務場景,我們自研了輕量級消息處理架構。目前該架構已在位元組内部生産環境穩定運作超過1年,并支援了火山引擎上的資料地圖服務的中繼資料同步場景,滿足了我們團隊的需求。