作者:閑魚技術-劍辛
一、業務背景
在電商營運工作中,營銷活動是非常重要的部分,對使用者增長和GMV都有很大幫助。對電商營運來說,如何從龐大的商品庫中篩選出賣家優質商品并推送給有需要的買家購買是每時每刻都要思索的問題,而且這個過程需要盡可能快和實時。保證快和實時就可以提升買賣雙方的使用者體驗,提高使用者粘性。
二、實時選品
為了解決上面提到的問題,閑魚研發了馬赫系統。馬赫是一個實時高性能的商品選品系統,解決在億級别商品中通過規則篩選優質商品并進行投放的場景。有了馬赫系統之後,閑魚的營運同學可以在馬赫系統上建立篩選規則,比如商品标題包含“小豬佩奇”、類目為“玩具”、價格不超過100元且商品狀态為未賣出。在營運建立規則後,馬赫系統會同時進行兩步操作,第一步是從存量商品資料篩選符合條件的商品進行打标;第二步是對商品實時變更進行規則計算,實時同步規則命中結果。
馬赫系統最大的特點是快而實時,展現在命中規模為100w的規則可以在10分鐘之内完成打标;商品本身變更導緻的規則命中結果同步時間為1秒鐘。營運可以通過馬赫系統快速篩選商品向使用者投放,閑魚的流量也可以精準投給符合條件的商品并且将流量利用到最大化。
那麼馬赫系統是如何解決這一典型的電商問題的呢,馬赫系統和流計算有什麼關系呢,這是下面要詳細說明的部分。
三、流計算
流計算是持續、低延遲、事件觸發的資料處理模型。流計算模型是使用實時資料內建工具,将資料實時變化傳輸到流式資料存儲,此時資料的傳輸變成實時化,将長時間累積大量的資料平攤到每個時間點不停地小批量實時傳輸;流計算會将計算邏輯封裝為常駐計算服務,一旦啟動就一直處于等待事件觸發狀态,當有資料流入後會觸發計算迅速得到結果;當流計算得到計算結果後可以立刻将資料輸出,無需等待整體資料的計算結果。

閑魚實時選品系統使用的流計算架構是Blink,Blink是阿裡巴巴基于開源流計算架構Flink定制研發的企業級流計算架構,可以認為是Flink的加強版,現在已經開源。Flink是一個高吞吐、低延遲的計算引擎,同時還提供很多進階功能。比如它提供有狀态的計算,支援狀态管理,支援強一緻性的資料語義以及支援Event Time,WaterMark對消息亂序的處理等特性,為閑魚實時選品系統的超低延時選品提供了有力支援。
3.1、Blink之State
State是指流計算過程中計算節點的中間計算結果或中繼資料屬性,比如在aggregation過程中要在state中記錄中間聚合結果,比如Apache Kafka作為資料源時候,我們也要記錄已經讀取記錄的offset,這些State資料在計算過程中會進行持久化(插入或更新)。是以Blink中的State就是與時間相關的,Blink任務的内部資料(計算資料和中繼資料屬性)的快照。
馬赫系統會在State中儲存商品合并之後的全部資料和規則運作結果資料。當商品發生變更後,馬赫系統會将商品變更資訊與State儲存的商品資訊進行合并,并将合并的資訊作為入參運作所有規則,最後将規則運作結果與State儲存的規則運作結果進行Diff後得到最終有效的運作結果。是以Blink的State特性是馬赫系統依賴的關鍵特性。
3.2、Blink之Window
Blink的Window特性特指流計算系統特有的資料分組方式,Window的建立是資料驅動的,也就是說,視窗是在屬于此視窗的第一個元素到達時建立。當視窗結束時候删除視窗及狀态資料。Blink的Window主要包括兩種,分别為滾動視窗(Tumble)和滑動視窗(Hop)。
滾動視窗有固定大小,在每個視窗結束時進行一次資料計算,也就是說滾動視窗任務每經過一次固定周期就會進行一次資料計算,例如每分鐘計算一次總量。
滑動視窗與滾動視窗類似,視窗有固定的size,與滾動視窗不同的是滑動視窗可以通過slide參數控制滑動視窗的建立頻率。是以當slide值小于視窗size的值的時候多個滑動視窗會重疊,此時資料會被配置設定給多個視窗,如下圖所示:
Blink的Window特性在資料計算統計方面有很多使用場景,馬赫系統主要使用視窗計算系統處理資料的實時速度和延時,用來進行資料統計和監控告警。
3.3、Blink之UDX
UDX是Blink中使用者自定義函數,可以在任務中調用以實作一些定制邏輯。Blink的UDX包括三種,分别為:
-
UDF - User-Defined Scalar Function
UDF是最簡單的自定義函數,輸入是一行資料的任意字段,輸出是一個字段,可以實作資料比較、資料轉換等操作。
-
UDTF - User-Defined Table-Valued Function
UDTF 是表值函數,每個輸入(單column或多column)傳回N(N>=0)Row資料,Blink架構提供了少量的UDTF,比如:STRING_SPLIT,JSON_TUPLE和GENERATE_SERIES3個built-in的UDTF。
-
UDAF - User-Defined Aggregate Function
UDAF是聚合函數,輸入是多行資料,輸出是一個字段。Blink架構Built-in的UDAF包括MAX,MIN,AVG,SUM,COUNT等,基本滿足了80%常用的集合場景,但仍有一定比例的複雜業務場景,需要定制自己的聚合函數。
馬赫系統中使用了大量的UDX進行邏輯定制,包括消息解析、資料處理等。而馬赫系統最核心的商品資料合并、規則運作和結果Diff等流程就是通過UDAF實作的。
四、秒級選品方案
選品系統在項目立項後也設計有多套技術方案。經過多輪讨論後,最終決定對兩套方案實施驗證後決定最終實作方案。
第一套方案是基于PostgreSQL的方案,PostgreSQL可以很便捷的定義Function進行資料合并操作,在PostgreSQL的trigger上定義執行規則邏輯。基于PostgreSQL的技術實作較複雜,但能滿足功能需求。不過性能測試結果顯示PostgreSQL處理小資料量(百萬級)性能較好;當trigger數量多、trigger邏輯複雜或處理億級别資料時,PostgreSQL的性能會有較大下滑,不能滿足秒級選品的性能名額。是以基于PostgreSQL的方案被否決(在閑魚小商品池場景中仍在使用)。
第二套方案是基于Blink流計算方案,通過驗證發現Blink SQL很适合用來表達資料處理邏輯而且Blink性能很好,綜合對比之後最終選擇Blink流計算方案作為實際實施的技術方案。
為了配合使用流計算方案,馬赫系統經過設計和解耦,無縫對接Blink計算引擎。其中資料處理子產品是馬赫系統核心功能子產品,負責接入商品相關各類資料、校驗資料、合并資料、執行規則和處理執行結果并輸出等步驟,是以資料處理子產品的處理速度和延時在很大程度上能代表馬赫系統資料處理速度和延時。接下來我們看下資料處理子產品如何與Blink深度結合将資料處理延遲降到秒級。
資料處理子產品結構如上圖,包含資料接入層、資料合并層、規則運作層和規則運作結果處理層。每層都針對流計算處理模式進行了單獨設計。
4.1、資料接入層
資料接入層是資料處理子產品前置,負責對接多管道各種類型的業務資料,主要邏輯如下:
- 資料接入層對接多個管道多種類型的業務資料;
- 解析業務資料并做簡單校驗;
- 統計各管道業務資料量級并進行監控,包括總量和同比變化量;
- 通過中繼資料中心擷取字段級别的Metadata配置。中繼資料中心是用來儲存和管理所有字段的MetaData配置資訊元件。Metadata配置代表字段中繼資料配置,包括字段值類型,值範圍和值格式等基礎資訊;
- 根據Metadata配置進行字段級别資料校驗;
- 按照馬赫定義的标準資料範式組裝資料。
這樣設計的考慮是因為業務資料是多種多樣的,比如商品資訊包括資料庫的商品表記錄、商品變更的MQ消息和算法産生的離線資料,如果直接通過Blink對接這些業務資料源的話,需要建立多個Blink任務來對接不同類型業務資料源,這種處理方式太重,而且資料接入邏輯與Blink緊耦合,不夠靈活。
資料接入層可以很好的解決上述問題,資料接入層可以靈活接入多種業務資料,并且将資料接入與Blink解耦,最終通過同一個Topic發出消息。而Blink任務隻要監聽對應的Topic就可以連續不斷的收到業務資料流,觸發接下來的資料處理流程。
4.2、資料合并層
資料合并是資料處理流程的重要步驟,資料合并的主要作用是将商品的最新資訊與記憶體中儲存的商品資訊合并供後續規則運作使用。資料合并主要邏輯是:
- 監聽指定消息隊列Topic,擷取業務資料消息;
- 解析消息,并将消息内容按照字段重新組裝資料,格式為
,key是字段名稱,value是字段值,timestamp為字段資料産生時間戳;{key:[timestamp, value]}
- 将組裝後的資料和記憶體中儲存的曆史資料根據timestamp進行字段級别資料合并,合并算法為比較timestamp大小取最新字段值,具體邏輯見下圖。
Pick!閑魚億級商品庫中的秒級實時選品一、業務背景二、實時選品三、流計算四、秒級選品方案五、結論參考資料
資料合并有幾個前提:
-
記憶體可以儲存存量資料;
這個是Blink提供的特性,Blink可以将任務運作過程中産生的存量資料儲存在記憶體中,在下一次運作時從記憶體中取出繼續處理。
-
合并後的資料能代表商品的最新狀态;
這點需要一個巧妙設計:商品資訊有很多字段,每個字段的值是數組,不僅要記錄實際值,還要記錄目前值的修改時間戳。在合并商品資訊時,按照字段進行合并,合并規則是取時間戳最大的值為準。
舉例來說,記憶體中儲存的商品ID=1的資訊是{"desc": [1, "描述1"], "price": [4, 100.5]},資料流中商品ID=1的資訊是{"desc": [2, "描述2"], "price": [3, 99.5]},那麼合并結果就是{"desc": [2, "描述2"], "price": [4, 100.5]},每個字段的值都是最新的,代表商品目前最新資訊。
當商品資訊發生變化後,最新資料由資料接入層流入,通過資料合并層将資料合并到記憶體,Blink記憶體中儲存的是商品目前最新的全部資料。
4.3、規則運作層
規則運作層是資料處理流程核心子產品,通過規則運算得出商品對各規則命中結果,邏輯如下:
- 規則運作層接受輸入為經過資料合并後的資料;
- 通過中繼資料中心擷取字段級别Metadata配置;
- 根據字段Metadata配置解析資料;
- 通過規則中心擷取有效規則清單,規則中心是指建立和管理規則生命周期的元件;
- 循環規則清單,運作單項規則,将規則命中結果儲存在記憶體;
- 記錄運作規則抛出異常的資料,并進行監控告警。
這裡的規則指的是營運建立的業務規則,比如商品價格大于50且狀态為線上。規則的輸入是經過資料合并後的商品資料,輸出是true或false,即是否命中規則條件。規則代表的是業務投放場景,馬赫系統的業務價值就是在商品發生變更後盡快判斷是否命中之前未命中的規則或是不命中之前已經命中的規則,并将命中和不命中結果盡快展現到投放場景中。
規則運作需利用Blink強大算力來保證快速執行,馬赫系統目前有将近300條規則,而且還在快速增長。這意味着每個商品發生變更後要在Blink上運作成百上千條規則,閑魚每天有上億商品發生變更,這背後需要的運算量是非常驚人的。
4.4、運作結果處理層
讀者讀到這裡可能會奇怪,明明經過規則運作之後直接把運作結果輸出到投放場景就可以了,不需要運作結果處理層。實際上運作結果處理層是資料處理子產品最重要的部分。
因為在實際場景中,商品的變更在大部分情況隻會命中很少一部分規則,而且命中結果也很少會變化。也就是說商品對很多規則的命中結果是沒有意義的,如果将這些命中結果也輸出的話,隻會增加操作TPS,對實際結果沒有任何幫助。而篩選出有效的運作結果,這就是運作結果處理層的作用。運作結果處理層邏輯如下:
- 擷取商品資料的規則運作結果;
- 按照是否命中規則解析運作結果;
- 将運作結果與記憶體中儲存的曆史運作結果進行diff,diff作用是排除新老結果中相同的命中子項,邏輯見下圖。
Pick!閑魚億級商品庫中的秒級實時選品一、業務背景二、實時選品三、流計算四、秒級選品方案五、結論參考資料
運作結果處理層利用Blink記憶體儲存商品上一次變更後規則運作結果,并将目前變更後規則運作結果與記憶體中結果進行比較,計算出有效運作結果。舉例來說,商品A上一次變更後規則命中結果為{"rule1":true, "rule2":true, "rule3":false, "rule4":false},目前變更後規則命中結果為{"rule1":true, "rule2":false, "rule3":false, "rule4":true}。因為商品A變更後對rule1和rule3的命中結果沒有變化,是以實際有效的命中結果是{"rule2":false, "rule4":true},通過運作結果處理層處理後輸出的是有效結果的最小集,可以極大減小無效結果輸出,提高資料處理的整體性能和效率。
4.5、難點解析
雖然閑魚實時選品系統在立項之初經過預研和論證,但因為使用很多新技術架構和流計算思路,在開發過程中遇到一些難題,包括設計和功能實作方面的,很多是設計流計算系統的典型問題。我們就其中一個問題與各位讀者探讨-規則公式轉換。
4.5.1、規則公式轉換
這個問題的業務場景是:營運同學在馬赫系統頁面上篩選商品字段後儲存規則,服務端是已有的老系統,邏輯是根據規則生成一段SQL,SQL的where條件和營運篩選條件相同。SQL有兩方面的作用,一方面是作為離線規則,在離線資料庫中執行SQL篩選符合規則的離線商品資料;另一方面是轉換成線上規則,在Blink任務中對實時商品變更資料執行規則以判斷是否命中。
因為實時規則運作使用的是MVEL表達式引擎,MVEL表達式是類Java文法的,是以問題就是将離線規則的SQL轉換成線上規則的Java表達式,兩者邏輯需一緻,并且需兼顧性能和效率。問題的解決方案很明确,解析SQL後将SQL操作符轉換成Java操作符,并将SQL特有文法轉成Java文法,例如A like '%test%'轉成A.contains('test')。
這個問題的難點是如何解析SQL和将解析後的語義轉成Java語句。經過調研之後給出了簡單而優雅的解決方案,主要步驟如下:
- 使用Druid架構解析SQL語句,轉成一個二叉樹,單獨取出其中的where條件子樹;
- 通過後序周遊算法周遊where條件子樹;
-
将SQL操作符換成對應的Java操作符;
目前支援且、或、等于、不等于、大于、大于等于、小于、小于等于、like、not like和in等操作。
-
将SQL文法格式轉成Java文法;
将in文法改成Java的或文法,例如A in ('hello', 'world')轉成(A == 'hello') || (A == 'world')。
-
實際運作結果如下:
代碼邏輯如下(主要是二叉樹後續周遊和操作符轉換,不再詳細解釋):
五、結論
馬赫系統上線以來,已經支援近400場活動和投放場景,每天處理近1.4億條消息,峰值TPS達到50000。馬赫系統已經成為閑魚選品投放的重要支撐。
本文主要闡述馬赫系統中資料處理的具體設計方案,說明整體設計的來龍去脈。雖然閑魚實時選品系統針對的是商品選品,但資料處理流計算技術方案的輸入是MQ消息,輸出也是MQ消息,不與具體業務綁定,是以資料處理流計算技術方案不隻适用于商品選品,也适合其他類似實時篩選業務場景。希望我們的技術方案和設計思路能給你帶來一些想法和思考,也歡迎和我們留言讨論,謝謝。
參考資料
- 閑魚實時選品系統: https://mp.weixin.qq.com/s/8ROsZniYD7nIQssC14mn3w
- Blink: https://github.com/apache/flink/tree/blink
- PostgreSQL: https://www.postgresql.org/
- druid: https://github.com/alibaba/druid