大資料實時計算高性能解決方案
假設有這樣一個場景:百萬級資料量,每天千萬級增量,幾十字段比對,單值/多值/範圍,毫秒級别耗時,同時需要考慮後續資料量遞增情況下,整體性能不能線性下降。我們應該采用哪種索引技術來實作比對計算呢?
業務場景
以機票搜尋場景為例,有效資料量百萬+,每天增量千萬+,單docker執行個體TPS要求1000+。下面的表格是計算機票所需要的一種規則資料,每條規則涉及到許多字段需要進行索引比對,其中字段有多種資料類型:單值、多值、範圍,多字段比對關系支援:且、或、非。
規則資料
- 行程類型,總共有三種枚舉值:1-單程,2-往返,3-不限
- 開票航司,支援多個,用/分隔
- 出發地/目的地/排除出發地/排除目的地,支援多個,用/分隔,支援多元度,同時支援全球(GLOBAL)
- 班期,支援多個,用/分隔
- 艙位,支援多個,用/分隔,同時支援全部(ALL)
- 旅行日期,支援日期範圍
索引選型
索引選擇原則:算法穩定、查詢時間複雜度低、空間複雜度适中、支援多元查詢。
綜合對比,我們選擇反向索引+BitMap組合方式來建構索引。
BitMap特點
- BitMap是一種資料結構,通常用于存儲大量的布爾值或者整數。它将每個元素映射到一個二進制位上,使用0或1來表示該元素的狀态。在實際使用中,BitMap通常被用來解決一些需要高效查找或者去重的問題。
- BitMap的基本操作包括設定某個元素為1或0,查詢某個元素的狀态,以及對多個BitMap進行位運算,例如求并集、交集、差集等。由于BitMap的實作采用了位運算,是以它的操作非常高效,可以在常數時間内完成。
- 在計算機科學中,BitMap通常使用一個長整型數組來實作。在Java中,可以使用BitSet類來實作BitMap,它提供了一系列高效的操作方法,例如set、get、and、or等等。
- BitMap的應用非常廣泛,例如在資料庫中用于去重、在網絡中用于IP位址的過濾、在圖像進行中用于表示像素的顔色等等。
索引建構
基于規則資料,在順序建立索引時,先采用哈希表進行雙向映射,便于後續提取規則ID,結構如下:
按屬性名逐一建構索引模型,結構如下:
- 航程類型屬性,按枚舉拆分成0-不限,1-單程,2-往返,三個值索引,其中不限單獨做為一個值進行索引
- 開票航司屬性,按/分隔符進行拆分,拆分為AA,CI,LH,MH,四個值索引
- 出發地屬性,按/分割符進行拆分,拆分為SHA,CN,HKG,GLOBAL,四個值索引,區域各層級單獨作為一個值進行索引
- 目的地屬性,按/分割符進行拆分,拆分為US,TPE,EUR,NOA,GLOBAL,五個值索引,同出發地屬性
- 排除出發地屬性,按/分隔符進行拆分,拆分為DFW,CN兩個值索引,沒有值不建索引,同出發地屬性
- 排除目的地屬性,按/分隔符進行拆分,拆分為CN一個值索引,沒有值不建索引,同出發地屬性
- 艙位屬性,按/分隔符進行拆分,拆分為N,S,V,L,ALL,五個值索引,其中不限單獨作為一個值進行索引
- 旅行日期屬性,按範圍拆分到天
索引比對
所有規則資料建立索引完成後,那如何進行索引比對的呢,下面我們以正常搜尋場景為例來進行介紹:
例如查詢請求3月25日從上海(SHA)飛往洛杉矶(LAX)的單程航班,有一條達美航空(AA)的艙位代碼為V艙的運價。按照請求參數逐一取對應屬性名的索引模型,用傳入資料進行BitMap索引比對:
- 航程類型:值為單程(1)和不限(0)進行比對,比對結果分别為1001、0010,再做或運算,結果為1011
- 出發地:值為SHA、CN、GLOBAL進行比對,比對結果分别為1100、0010、0001,再做或運算,結果為1111
- 排除出發地:值為SHA、CN、GLOBAL進行比對,比對結果為0001,再做取反運算,結果為1110
- 目的地:值為LAX、US、NOA、GLOBAL進行比對,比對結果分别為1000、0010、0001,再做或運算,結果為1011
- 排除目的地:值為LAX、US、NOA、GLOBAL進行比對,沒有對應的索引模型,忽略目前屬性
- 出發日期:值為2023-03-25進行比對,比對結果為1111開票航司:值為AA進行比對,比對結果為1010艙位:值為V、ALL進行比對,分别比對結果為1000、0011,做或運算,結果為1011單屬性計算結果分别為:航程類型-1011,出發地-1110,目的地-1011,出發日期-1111,開票航司-1010,艙位-1011所有屬性進行與運算,計算公式為:1011 & 1110 & 1011 & 1111 & 1010 & 1011,最終位運算結果為:1010最終取号位為0和2的規則ID,分别為100001和100003,作為目前搜尋命中的兩條規則ID。
以上就是索引比對邏輯,涉及到BitMap的and、or、not等操作,通過位運算來提升比對性能,降低資料量對查詢性能線性影響。
代碼實作
1.規則模型
Rule類:提供規則字段。
@Data
public class Rule implements IndexKey {
/** 規則ID */
private String ruleId;
/** 開票航司 */
@Index(name = "AIRLINE")
private String airline;
/** 出發地 */
@Index(name = "DEP_CITY")
private String depCity;
/** 排除出發地 */
@Index(name = "FORBID_DEP_CITY")
private String forbidDepCity;
/** 目的地 */
@Index(name = "ARR_CITY")
private String arrCity;
/** 排除目的地 */
@Index(name = "FORBID_ARR_CITY")
private String forbidArrCity;
/** 起飛日期範圍 */
@Index(name = "DEP_DATE")
private String depDate;
/** 艙位 */
@Index(name = "CABIN")
private String cabin;
public Rule(String ruleId) {
this.ruleId = ruleId;
}
@Override
public String getIndexKey() {
return ruleId;
}
}
2.索引資料倉庫接口
Repository接口:提供索引新增、删除和查詢服務。
public interface Repository<T> {
/**
* 儲存 索引 和 資料 的關系
*/
void put(Integer index, T data);
/**
* 删除Key
*/
void remove(String key);
/**
* 根據資料ID查找對應的索引ID
*/
Integer findIndexId(String key);
/**
* 根據索引ID查找對應的資料ID
*/
String findDataId(Integer index);
}
3.索引資料倉庫接口實作類
預設實作類:DefaultRepository,資料預設存儲在記憶體。
public class DefaultRepository<T extends IndexKey> implements Repository<T> {
private final Map<Integer, String> dataMap = Maps.newConcurrentMap();
private final Map<String, Integer> indexMap = Maps.newConcurrentMap();
@Override
public void put(Integer index, T data) {
dataMap.put(index, data.getIndexKey());
indexMap.put(data.getIndexKey(), index);
}
@Override
public void remove(String key) {
Integer index = indexMap.get(key);
dataMap.remove(index);
indexMap.remove(key);
}
@Override
public Integer findIndexId(String key) {
return indexMap.get(key);
}
@Override
public String findDataId(Integer index) {
return dataMap.get(index);
}
}
4.索引模型接口
IndexModel接口:提供指定屬性值建構、删除和查詢索引,同時提供索引and、andIn、or和orIn位運算操作。
public interface IndexModel {
/**
* 建構 屬性值對應的索引模型
*/
void createIndex(String fieldValue, int index);
/**
* 清除 指定屬性值對應的索引模型
*/
void clear(String fieldValue, Integer index);
/**
* 擷取 屬性值 對應的索引模型
*/
BitMapWrapper get(String fieldValue);
/**
* 與 運算
*/
BitMapWrapper and(BitMapWrapper left, String fieldValue);
/**
* 與 運算
*/
BitMapWrapper and(BitMapWrapper left, BitMapWrapper right);
/**
* 與 運算
*/
BitMapWrapper and(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);
/**
* 與 運算
*/
BitMapWrapper and(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);
/**
* 或 運算
*/
BitMapWrapper or(BitMapWrapper left, String fieldValue);
/**
* 或 運算
*/
BitMapWrapper or(BitMapWrapper left, BitMapWrapper right);
/**
* 或 運算
*/
BitMapWrapper or(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);
/**
* 或 運算
*/
BitMapWrapper or(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight);
}
5.索引模型接口實作類
預設實作類:DefaultIndexModel
public class DefaultIndexModel implements IndexModel {
private final Map<String, BitMapWrapper> valueMap = new ConcurrentHashMap<>();
@Override
public void createIndex(String fieldValue, int index) {
valueMap.computeIfAbsent(fieldValue, k -> buildBitMapWrapper()).set(index);
}
@Override
public void clear(String fieldValue, Integer index) {
if (!valueMap.containsKey(fieldValue)) {
return;
}
valueMap.get(fieldValue).clear(index);
}
@Override
public BitMapWrapper get(String fieldValue) {
return valueMap.get(fieldValue);
}
@Override
public BitMapWrapper and(BitMapWrapper left, String fieldValue) {
return and(left, getBitMapWrapper(fieldValue));
}
@Override
public BitMapWrapper and(BitMapWrapper left, BitMapWrapper right) {
if (right == null) {
return left;
}
if (left == null) {
left = buildBitMapWrapper();
left.or(right);
} else {
left.and(right);
}
return left;
}
@Override
public BitMapWrapper and(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {
BitMapWrapper right = valueMap.getOrDefault(fieldValue,defaultRight);
if (left == null) {
left = defaultLeft;
left.or(right);
} else {
left.and(right);
}
return left;
}
@Override
public BitMapWrapper and(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {
if (right == null) {
right = defaultRight;
}
if (left == null) {
left = defaultLeft;
left.or(right);
} else {
left.and(right);
}
return left;
}
@Override
public BitMapWrapper or(BitMapWrapper left, String fieldValue) {
return or(left, getBitMapWrapper(fieldValue));
}
@Override
public BitMapWrapper or(BitMapWrapper left, BitMapWrapper right) {
if (right == null) {
return left;
}
if (left == null) {
left = buildBitMapWrapper();
}
left.or(right);
return left;
}
@Override
public BitMapWrapper or(BitMapWrapper left, String fieldValue, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {
if (left == null) {
left = defaultLeft;
}
if (!valueMap.containsKey(fieldValue)) {
return left;
}
BitMapWrapper right = valueMap.get(fieldValue);
left.or(right);
return left;
}
@Override
public BitMapWrapper or(BitMapWrapper left, BitMapWrapper right, BitMapWrapper defaultLeft, BitMapWrapper defaultRight) {
if (left == null) {
left = defaultLeft;
}
if (right == null) {
return left;
}
left.or(right);
return left;
}
public BitMapWrapper getBitMapWrapper(String fieldValue) {
return valueMap.computeIfAbsent(fieldValue,k -> buildBitMapWrapper());
}
private BitMapWrapper buildBitMapWrapper() {
return new DefaultBitMapWrapper();
}
}
6.BitMap包裝類接口
BitMapWrapper接口:提供BitSet位運算基礎操作,且、或、翻轉等運算,同時提供設定、清除、查詢業務資料ID等服務。
public interface BitMapWrapper {
/**
* 且 運算
*/
void and(BitMapWrapper wrapper);
/**
* 或 運算
*/
void or(BitMapWrapper wrapper);
/**
* 翻轉 運算
*/
void flip(int fromIndex, int toIndex);
/**
* 指定号位 設定 為1
*/
void set(int bitIndex);
/**
* 清除指定号位(設定 為0)
*/
void clear(int bitIndex);
/**
* 清除 BitMap
*/
void clear();
/**
* 判斷是否為空
*/
boolean isEmpty();
/**
* 擷取源目标對象
*/
Object getTarget();
/**
* 擷取業務資料ID
*/
void getDataId(Consumer<Integer> consumer);
}
7.BitMap包裝類接口實作類
預設實作類:DefaultBitMapWrapper
public class DefaultBitMapWrapper implements BitMapWrapper {
private final BitSet bitSet = new BitSet();
@Override
public void and(BitMapWrapper wrapper) {
bitSet.and((BitSet) wrapper.getTarget());
}
@Override
public void or(BitMapWrapper wrapper) {
bitSet.or((BitSet) wrapper.getTarget());
}
@Override
public void flip(int fromIndex, int toIndex) {
bitSet.flip(fromIndex, toIndex);
}
@Override
public void set(int bitIndex) {
bitSet.set(bitIndex);
}
@Override
public void clear(int bitIndex) {
bitSet.clear(bitIndex);
}
@Override
public void clear() {
bitSet.clear();
}
@Override
public boolean isEmpty() {
return bitSet.isEmpty();
}
@Override
public Object getTarget() {
return bitSet;
}
@Override
public void getDataId(Consumer<Integer> consumer) {
int i = bitSet.nextSetBit(0);
if (i != -1) {
consumer.accept(i);
for (i = bitSet.nextSetBit(i + 1); i >= 0; i = bitSet.nextSetBit(i + 1)) {
int endOfRun = bitSet.nextClearBit(i);
do {
consumer.accept(i);
} while (++i < endOfRun);
}
}
}
}
8.索引建構器接口
Build接口:提供建立、擷取目前規則所有屬性名對應的索引模型和查詢目前索引長度。
public interface Build<T> {
/**
* 建立索引
*/
void create(T data);
/**
* 擷取所有屬性名對應的索引模型
*/
Map<String,IndexModel> getIndexModels();
/**
* 擷取目前索引長度
*/
AtomicInteger getIndexLength();
}
9.索引建構器接口實作類:
抽象實作類:AbstractBuild,實作接口相關服務,提供添加索引模型抽象方法,需自定義實作。
public abstract class AbstractBuild<T extends IndexKey> implements Build<T> {
private final Map<String, IndexModel> INDEX_MODELS = Maps.newConcurrentMap();
private final AtomicInteger indexGenerator = new AtomicInteger();
private Repository<T> repository;
public AbstractBuild(Repository<T> repository){
this.repository = repository;
}
@Override
public void create(T data) {
doCreateIndex(data,indexGenerator.getAndIncrement());
}
/**
* Do create index.
*
* @param data the data
* @param index the index
*/
private void doCreateIndex(T data, int index){
// 擷取目前模型所有屬性,根據标記索引屬性名建構對應的索引模型,添加到INDEX_MODELS中
putIndexModel(data,index);
repository.put(index,data);
}
/**
* 添加索引模型,此處不提供預設實作
*
* @param data the data
* @param index the index
*/
protected abstract void putIndexModel(T data, int index);
@Override
public Map<String, IndexModel> getIndexModels() {
return INDEX_MODELS;
}
@Override
public AtomicInteger getIndexLength() {
return indexGenerator;
}
}
10.索引比對接口
Match接口:提供單值和多值的與、或BitMap運算服務,以及擷取最終比對的業務資料ID集合,
public interface Match {
/**
* 與 運算操作
*
* @param wrapper 左值
* @param fieldName 屬性名
* @param fieldValue 屬性值
* @return the bit map wrapper
*/
BitMapWrapper and(BitMapWrapper wrapper, String fieldName, String fieldValue);
/**
* in 與 運算操作
*
* @param wrapper 左值
* @param fieldName 屬性名
* @param fieldValues 屬性值
* @return the bit map wrapper
*/
BitMapWrapper andIn(BitMapWrapper wrapper, String fieldName, String... fieldValues);
/**
* Not in 與 運算操作
*
* @param wrapper 左值
* @param fieldName 屬性名
* @param fieldValues 屬性值
* @return the bit map wrapper
*/
BitMapWrapper notIn(BitMapWrapper wrapper, String fieldName, String... fieldValues);
/**
* 或 運算操作
*
* @param wrapper 左值
* @param fieldName 屬性名
* @param fieldValue 屬性值
* @return the bit map wrapper
*/
BitMapWrapper or(BitMapWrapper wrapper, String fieldName, String fieldValue);
/**
* in 或 運算操作
*
* @param wrapper 左值
* @param fieldName 屬性名
* @param fieldValues 屬性值
* @return the bit map wrapper
*/
BitMapWrapper orIn(BitMapWrapper wrapper, String fieldName, String... fieldValues);
/**
* 送出運算,計算出索引比對後的資料ID集合
*
* @param bitMap the bit map
* @return the list
*/
List<String> commit(BitMapWrapper bitMap);
}
11 索引比對接口實作類
預設實作類:DefaultMatch
@Slf4j
public class DefaultMatch implements Match {
/**
* 索引模型集合,Key:屬性名,Val:索引模型
*/
private Map<String, IndexModel> indexModels;
/**
* 目前規則索引自增記錄器
*/
private AtomicInteger indexGenerator;
/**
* 目前運算結果緩存
*/
private BitMapWrapper resultWrapper;
/**
* OR運算結果緩存
*/
private BitMapWrapper orResultWrapper;
/**
* 空結果緩存
*/
private BitMapWrapper emptyWrapper;
/**
* 索引&業務資料倉庫
*/
private Repository repository;
/**
* Instantiates a new Default match.
*
* @param build the build
* @param repository the repository
*/
public DefaultMatch(Build build, Repository repository){
this.resultWrapper = new DefaultBitMapWrapper();
this.orResultWrapper = new DefaultBitMapWrapper();
this.emptyWrapper = new DefaultBitMapWrapper();
this.indexModels = build.getIndexModels();
this.repository = repository;
this.indexGenerator = build.getIndexLength();
}
@Override
public BitMapWrapper and(BitMapWrapper wrapper, String fieldName, String fieldValue) {
if (StringUtils.isBlank(fieldValue)) {
return wrapper;
}
wrapper = andIn(wrapper, fieldName, fieldValue);
return wrapper;
}
@Override
public BitMapWrapper andIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) {
IndexModel indexModel = indexModels.get(fieldName);
if (indexModel == null) {
if (wrapper == null) {
wrapper = resultWrapper;
}
wrapper.clear();
return wrapper;
}
if (wrapper != null && wrapper.isEmpty()) {
return wrapper;
}
try {
// 屬性值集合做 或 運算
for (String value : fieldValues) {
orResultWrapper = indexModel.or(orResultWrapper, value, null, emptyWrapper);
}
// 原結果 和 翻轉後結果做 與 運算
wrapper = indexModel.and(wrapper, orResultWrapper, resultWrapper, emptyWrapper);
return wrapper;
} finally {
orResultWrapper.clear();
}
}
@Override
public BitMapWrapper notIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) {
IndexModel indexModel = indexModels.get(fieldName);
if (indexModel == null) {
return wrapper;
}
if (wrapper != null && wrapper.isEmpty()) {
return wrapper;
}
try {
// 屬性值集合做 或 運算
for (String value : fieldValues) {
orResultWrapper = indexModel.or(orResultWrapper, value, null, emptyWrapper);
}
// 對結果翻轉
orResultWrapper.flip(0, indexGenerator.get());
// 原結果 和 翻轉後結果做 與 運算
wrapper = indexModel.and(wrapper, orResultWrapper, resultWrapper, emptyWrapper);
return wrapper;
} finally {
orResultWrapper.clear();
}
}
@Override
public BitMapWrapper or(BitMapWrapper wrapper, String fieldName, String fieldValue) {
if (StringUtils.isBlank(fieldValue)) {
return wrapper;
}
wrapper = orIn(wrapper, fieldName, fieldValue);
return wrapper;
}
@Override
public BitMapWrapper orIn(BitMapWrapper wrapper, String fieldName, String... fieldValues) {
IndexModel indexModel = indexModels.get(fieldName);
if (indexModel == null) {
return wrapper;
}
// 屬性值集合做 或 運算
for (String value : fieldValues) {
wrapper = indexModel.or(wrapper, value, resultWrapper, emptyWrapper);
}
return wrapper;
}
@Override
public List<String> commit(BitMapWrapper bitMap) {
try {
if (bitMap == null) {
return null;
}
if (bitMap.isEmpty()) {
return null;
}
// 提取目前索引運算後業務資料ID集合
return keyMapping(bitMap);
} finally {
resultWrapper.clear();
}
}
/**
* 提取目前索引運算後業務資料ID集合
*
* @param bitMap the bit map
* @return the list
*/
private List<String> keyMapping(BitMapWrapper bitMap) {
List<String> keys = new ArrayList<>();
bitMap.getDataId(i -> {
CollectionUtils.addIgnoreNull(keys, repository.findDataId(i));
});
return keys;
}
}
12 測試類:Test
public class Test {
public static void main(String[] args) {
// 資料倉庫
Repository repository = new DefaultRepository();
// 索引建構器
Build build = new AbstractBuild(repository) {
@Override
protected void putIndexModel(IndexKey data, int index) {
// TODO
}
};
// 建立索引(模拟)
build.create(new Rule("100001"));
build.create(new Rule("100002"));
build.create(new Rule("100003"));
build.create(new Rule("100004"));
// 索引比對
Match match = new DefaultMatch(build,repository);
// 航程類型,枚舉值:1-單程,0-不限
BitMapWrapper wrapper = match.andIn(null,"TRAVEL_TYPE","1","0");
// 開票航司,枚舉值:AA
match.and(wrapper,"AIRLINE","AA");
// 起飛地,枚舉值:上海(SHA),中國(CN),全球(GLOBAL)
match.andIn(wrapper,"DEP_CITY","SHA","CN","GLOBAL");
// 排除起飛地,枚舉值:上海(SHA),中國(CN),全球(GLOBAL)
match.notIn(wrapper,"FORBID_DEP_CITY","SHA","CN","GLOBAL");
// 目的地,枚舉值:洛杉矶(LAX),美國(US),北美(NOA),全球(GLOBAL)
match.andIn(wrapper,"ARR_CITY","LAX","US","NOA","GLOBAL");
// 排除目的地,枚舉值:洛杉矶(LAX),美國(US),北美(NOA),全球(GLOBAL)
match.notIn(wrapper,"FORBID_ARR_CITY","LAX","US","NOA","GLOBAL");
// 出發日期,枚舉值:2023-03-25
match.and(wrapper,"DEP_DATE","2023-03-25");
// 艙位,枚舉值:V,不限(ALL)
match.andIn(wrapper,"CABIN","V","ALL");
List<String> ruleIds = match.commit(wrapper);
System.out.println(ruleIds);
}
}
寫在最後
- 沒有完美架構方案,隻有最适合解決方案,大家需結合業務特點甄别篩選。
- 如何大家喜歡這篇文章或者感覺對自己有所幫助,盼大家多多點贊并關注。
- 如果大家有任何疑問和建議,歡迎大家評論區留言!!!