背景
Count Distinct是SQL查詢中經常使用的聚合統計方式,用于計算非重複結果的數目。由于需要去除重複結果,Count Distinct的計算通常非常耗時。
以如下查詢為例,Count Distinct的實作方式主要有兩種:
SELECT region, COUNT(DISTINCT userId) FROM orders GROUP BY region
- 對訂單表的資料按照region進行shuffle分區,在每個分區中使用一個類似HashTable的資料結構,存儲所有的非重複userId的值,最後統計所有key的數量。
- 對表t的資料按照(region, userId)進行shuffle分區,第一步的結果即為非重複的(region, userId)對,對于第一步的結果再按照region分區,統計每個分區中的Row數量。
第一種方式隻需要一次shuffle,但是需要在記憶體中維護一個資料結構,占用大量記憶體,甚至導緻OOM。第二種方式多了一次shuffle,但是更加穩定可靠。Spark采用第二種方式實作Count Distinct。在多元分析或報表等場景中,使用者可能需要秒級的互動響應,在大資料量的情況下,很難通過單純地擴充資源滿足要求。本文主要介紹在Spark中如何基于重聚合實作互動式響應的COUNT DISTINCT支援。
預聚合和重聚合
預計算是資料倉庫領域常見的一種提升查詢效率的方式,通過将全部或部分計算結果提前計算好并存儲下來,對于後續的相關的查詢可以直接重用之前的預計算結果,進而加速查詢速度。在多元分析或報表等查詢模式相對比較固定的場景中,我們可以通過預聚合,将需要處理的資料量下降成百上千倍。此外對于預計算來說,由于使用者的查詢次元,過濾條件,統計方式非常多,考慮到預計算的計算和存儲代價,不太可能把每種可能的查詢條件都進行預計算,通常的方式是按照較細粒度進行分組聚合,然後對于後續更粗粒度的分組聚合查詢,可以使用預計算的結果進行重聚合,如下所示:
// 原始SQL
SELECT SUM(userId) AS sum_user FROM orders;
// 預計算SQL
INSERT OVERWRITE pre_sum_orders AS SELECT region, sum(userId) as pre_sum_user FROM orders GROUP BY region;
// 重聚合SQL
SELECT sum(pre_sum_user) AS sum_user FROM pre_sum_orders;
由于pre_sum_orders中的資料已經經過了region的分組聚合,資料量相對于原始表可能會從100萬條下降到100條,重聚合可以非常快的完成。
Count Distinct的重聚合
SUM/COUNT/MIN/MAX等聚合函數都滿足結合律,可以非常容易的支援重聚合,例如Count 可以在通過 SUM 再聚合,最小值可以通過 MIN 再聚合,最大值也可以通過 MAX 再聚合。但是對于AVERAGE或COUNT DISTINCT等許多聚合函數卻并非如此。以AVG為例,每個region平均訂單金額和平均值并不等于全國的訂單平均值,對于COUNT DISTINCT也是如此。對于這類函數,我們也需要找到一種支援重聚合的中間結果。對于AVG,這個中間結果可以是(SUM, COUNT),可以通過如下方式重聚合,
// 原始SQL
SELECT avg(col2) FROM t;
// 預計算SQL
INSERT OVERWRITE pre_avg_t AS SELECT col1, sum(col2) as pre_sum_col2, count(col2)as pre_count_col2 FROM t GROUP BY col1;
// 重聚合SQL
SELECT sum(pre_sum_col2)/sum(pre_count_col2) FROM pre_avg_t
對于COUNT DISTINCT來說,由于需要計算非重複結果的數目,Bitset存儲與計算效率很高,而且可以用來去重,是一個比較合适存儲COUNT DISTINCT預計算結果的資料結構。
Bitmap
Bitmap是一個常見的資料結構,存儲元素到bit的映射關系,邏輯上可以了解為一個bit數組,每個bit有0和1兩個狀态,将元素映射到數組下标,在插入新元素時,修改該元素對應的數組下标的bit位置為1,重複元素出現時也不會有影響,由于每個元素僅需1個bit位表示,可以大大減少存儲空間。此外Bitmap還可以通過位運算支援高效的計算,例如使用或運算合并兩個Bitmap。實際實踐中,為了操作效率,通常會用long/in t array作為Bitmap的存儲,也有很多高效的開源實作可以直接使用,例如Google的EWAHCompressedBitmap,RoaringBitmap等等,還會對bitmap進行壓縮,對較為稀疏的bitmap節省更多的存儲空間。COUNT DISTINCT會使用的Bitmap接口主要包括:
- add(element: Int): 向Bitmap中插入新的值。
- or(other: Bitmap): 将兩個Bitmap按bit位進行或操作。
- getCardinality(): 擷取Bitmap中1出現的次數。
使用Bitmap進行重聚合
我們可以基于Bitmap實作對COUNT DISTINCT的重聚合,這主要包含兩部分工作:1. 在預聚合時将COUNT DISTINCT字段值寫入Bitmap,并将Bitmap序列化成Binary類型字段儲存。2. 在重聚合時,讀取并反序列化Bitmap字段,在合并後統計最終count數量。這兩步工作都需要新增自定義Spark UDF實作:
-
: 接受Integer類型字段作為參數,内部維護Bitmap資料結構,将輸入資料插入Bitmap中,并把Bitmap序列化二進制資料作為輸出結果。bit_mapping
-
: 接受二進制資料作為參數,反序列化位Bitmap,merge同一分區的多個Bitmap,把Bitmap的cardinality作為結果輸出。re_count_distinct
下面的SQL展示了如何使用這兩個UDF:
// 原始SQL
SELECT region, COUNT(DISTINCT userId) as dist_count_user FROM orders GROUP BY region;
// 預計算SQL
INSERT OVERWRITE pre_count_orders AS SELECT region, bit_mapping(userId) as userId_bitmap FROM orders GROUP BY region;
// 改寫後SQL
SELECT region, re_count_distinct(userId_bitmap) as dist_count_user FROM pre_count_orders;
原始SQL需要全量掃描orders表的資料,并經過partition key為(region, userId)和region的兩輪shuffle。而改寫後的SQL,不需要任何shuffle操作,而且pre_count_orders由于經過了region的分組聚合,表的行數相對于orders表大大降低,整體執行速度可能有十倍甚至百倍以上的提升。
對于更粗粒度的COUNT DISTINCT,也可以通過重聚合支援:
// 原始SQL
SELECT COUNT(DISTINCT userId) as dist_count_user FROM orders;
// 改寫後SQL
SELECT re_count_distinct(userId_bitmap) as dist_count_user FROM pre_count_orders;
Global Dictionary
在上面bit_mapping的實作中,由于Bitmap的輸入要求是從0開始的自然數類型,以對應bit數組的下标,但是我們實際統計的字段,例如上面的userId的具體值很可能不是一個簡單的自然數,是以需要一個映射函數,能夠将統計字段的取值範圍映射成自然數。此外,由于Spark SQL分布式執行的特性,這個映射必須是一緻的,即同一個字段值在不同的spark task必須映射成同一個自然數,否則在merge Bitmap的時候,同一個字段對應的值就會在合并後的Bitmap中出現兩次,違反COUNT DISTINCT的語義要求。一個維護了統計字段到自然數的全局字典可以幫助我們滿足這個要求
Spark在1.5版本後加入了對開窗函數的支援,其中的row_number()函數的作用是根據表中字段分組排序,然後給排序好的記錄添加一個從1開始的序号,我們可以使用這個函數實作一個全局字典的功能。例如,對于任意類型的userId,我們可以通過如下的方式映射為從1開始的自然數。
SELECT userId, row_number() over(order by userId) as bitmap_index FROM orders GROUP BY userId;
有了userId到bitmap_index的映射,我們就可以通過Join把全局字典和orders表關聯起來,使用bitmap_index字段去建構bitmap了,COUNT DISTINCT預聚合的SQL示例如下:
WITH globalDict AS (SELECT userId, row_number() over(order by userId) as bitmap_index FROM orders GROUP BY userId)
INSERT OVERWRITE pre_count_orders
AS SELECT region, bit_mapping(bitmap_index) as userId_bitmap
FROM orders, globalDict
WHERE orders.userId = globalDict.userId
GROUP BY region;
最後,建構全局字典是一個較為耗時的工作,當有新的訂單加入orders表後,如果我們建構新的預聚合結果時能夠重用已有的全局字典,隻是增加新的userId到字典中,顯然可以提高執行預聚合的效率。
其他
Approximate Count Distinct可以用更小的代價統計count distinct的估計值,在不需要精确值的場景中被大量使用,Approximate Count Distinct主要基于Hyperloglog資料結構實作,HLL sketch作為hll的中間結果,也是可以重聚合的。
Swoop開源了高性能的 HLL native 函數工具包,作為
spark-alchemy項目的一部分,提供了預聚合和重聚合的函數,使用者可以直接使用,實作Approximate Count Distinct的重聚合。詳細資訊可以參考:
Spark-Alchemy:HyperLogLog的使用介紹在EMR Spark中通過Relational Cache支援了Count Distinct的預聚合和重聚合,提供了pre_count_distinct和re_count_distinct函數的實作,還提供了自定義的優化規則,将pre_count_distinct函數自動轉化為基于Global Dictionary和bit_mapping的執行計劃,不需要使用者手工拼寫複雜的預聚合SQL邏輯。
總結
本文主要介紹了在Spark中實作Count Distinct重聚合的基本思路和實作方式,通過基于Bitmap的重聚合,可以實作幾個數量級的性能提升,滿足互動式分析場景統計Count Distinct的需求。