前提
通過id去重,而不是整條資料
id由SnowFlake算法生成,參考之前的文章SnowFlake算法在資料鍊路中的應用
需求
在實時平台的各個環節中,由于網絡或其他問題,有時會出現資料重複的情況,本質上是由于at least once保障機制造成的。例如
flume agent之間的資料傳輸,如果網絡不穩定,有可能出現src_agent發送資料逾時而導緻重發,但實際上dest_agent已經收到,造成了資料重複
kafka producer發送資料且設定acks=all,在replication完成之間就由于逾時而傳回失敗,如果retries不為0,那麼重發之後資料也會有重複
通常我們會在業務端通過幂等性來保證資料的唯一性,比如Mysql的primary key,或者是HBase的rowkey。但在流式計算或某些存儲媒體中,沒有辦法天然的實作資料去重,這時就需要在資料計算/存儲之前将重複的資料移除或忽略
思路
我司的實時資料都是通過Flume采集,并且通過SnowFlake算法給每條資料配置設定一個全局唯一長整型的id,這個id會被帶到整條資料鍊路中,是以考慮開發一個去重子產品,對實時資料進行預處理。又由于id是數字類型,可以考慮用BitSet進行存儲以提高查詢效率和減小開銷,但java.util.BitSet的最大長度是Integer.MAX_VALUE(2GB),再長的話記憶體開銷就會非常巨大,是以需要對id進行分段存儲

原始的id是由41位時間戳,8位機器資訊和12位序列号組合而成
将時間戳拆分成秒和毫秒兩部分
重新将各部分組合成新的key-value pair,秒數和機器資訊拼接為一個long型的key,序列号和毫秒數拼接成一個int型的value。假設對n分鐘内的資料進行過濾,則key的最大個數為n*60*256,value最大個數為4096*1000
将相同key的資料放到同一個BitSet中,并緩存到LoadingCache中
代碼
注意:DuplicationEliminator中的常量必須與SnowFlake算法中一緻,否則會解析錯位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86import java.util.BitSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
public class DuplicationEliminator{
private final static long SEQUENCE_BIT = 12; // 序列号占用的位數
private final static long MACHINE_BIT = 8; // 機器辨別占用的位數
private final static long MILLI_BIT = 10; // 毫秒占用的位數
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long TIMESTMP_LEFT = MACHINE_LEFT + MACHINE_BIT;
private final static long START_STMP = 1388505600000L;
private static final int RETENTION_MINUTES = 1;
private LoadingCache cache = CacheBuilder.newBuilder()//
.refreshAfterWrite(RETENTION_MINUTES, TimeUnit.MINUTES)// 給定時間内沒有被讀/寫通路,則回收。
.build(new CacheLoader() {
private final static int MAX_MILLI = (int) (-1L ^ (-1L << MILLI_BIT));
@Override
public BitSet load(Long key) throws ExecutionException{
// BitSet的值由sequence和毫秒數組合而成,每秒并發不超過1000的topic一般sequence都是0,是以這裡設定初始size是MAX_MILLI,避免過多的擴容開銷
return new BitSet(MAX_MILLI);
}
});
public boolean putIfAbsent(long id) throws ExecutionException{
Pair pair = idToPair(id);
long key = pair.getKey();
int value = pair.getValue();
BitSet existingValues = cache.get(key);
if (existingValues.get(value)) {
return false;
} else {
existingValues.set(value);
return true;
}
}
private Pair idToPair(long id){
int seq = (int) ((id) & ~(-1L << SEQUENCE_BIT));
long machineId = (id >> MACHINE_LEFT) & ~(-1L << MACHINE_BIT);
long timestamp = (id >> TIMESTMP_LEFT) + START_STMP;
long sec = timestamp / 1000;
int milli = (int) (timestamp % 1000);
return Pair.of(sec << MACHINE_BIT | machineId, seq << MILLI_BIT | milli);
}
private long pairToId(Pair pair){
long key = pair.getKey();
int value = pair.getValue();
long sequence = (value) >> MILLI_BIT;
long machineId = (key) & ~(-1L << MACHINE_BIT);
long timestamp = (key >> MACHINE_BIT) * 1000 + (value & ~(-1L << MILLI_BIT));
return (timestamp - START_STMP) << TIMESTMP_LEFT // 時間戳部分
| machineId << MACHINE_LEFT // 機器辨別部分
| sequence; // 序列号部分
}
}
測試
測試寫入一億個id,耗時48.4s,QPS=206.6萬,記憶體占用68MB,性能和開銷都還可以
1
2
3
4
5
6
7
8
9
10
11public static void main(String[] args) throws ExecutionException, InterruptedException{
DuplicationEliminator eliminator = new DuplicationEliminator();
Stopwatch watch = Stopwatch.createStarted();
for (int i = 0; i < 1_0000_0000; i++) {
long id = IdGenerator.generateId();
if (!eliminator.putIfAbsent(id)) {
System.out.println("duplicated: " + id);
}
}
System.out.println(watch.elapsed(TimeUnit.MILLISECONDS));
}