天天看點

海量資料去重 oracle,海量資料去重

前提

通過id去重,而不是整條資料

id由SnowFlake算法生成,參考之前的文章SnowFlake算法在資料鍊路中的應用

需求

在實時平台的各個環節中,由于網絡或其他問題,有時會出現資料重複的情況,本質上是由于at least once保障機制造成的。例如

flume agent之間的資料傳輸,如果網絡不穩定,有可能出現src_agent發送資料逾時而導緻重發,但實際上dest_agent已經收到,造成了資料重複

kafka producer發送資料且設定acks=all,在replication完成之間就由于逾時而傳回失敗,如果retries不為0,那麼重發之後資料也會有重複

通常我們會在業務端通過幂等性來保證資料的唯一性,比如Mysql的primary key,或者是HBase的rowkey。但在流式計算或某些存儲媒體中,沒有辦法天然的實作資料去重,這時就需要在資料計算/存儲之前将重複的資料移除或忽略

思路

我司的實時資料都是通過Flume采集,并且通過SnowFlake算法給每條資料配置設定一個全局唯一長整型的id,這個id會被帶到整條資料鍊路中,是以考慮開發一個去重子產品,對實時資料進行預處理。又由于id是數字類型,可以考慮用BitSet進行存儲以提高查詢效率和減小開銷,但java.util.BitSet的最大長度是Integer.MAX_VALUE(2GB),再長的話記憶體開銷就會非常巨大,是以需要對id進行分段存儲

海量資料去重 oracle,海量資料去重

原始的id是由41位時間戳,8位機器資訊和12位序列号組合而成

将時間戳拆分成秒和毫秒兩部分

重新将各部分組合成新的key-value pair,秒數和機器資訊拼接為一個long型的key,序列号和毫秒數拼接成一個int型的value。假設對n分鐘内的資料進行過濾,則key的最大個數為n*60*256,value最大個數為4096*1000

将相同key的資料放到同一個BitSet中,并緩存到LoadingCache中

代碼

注意:DuplicationEliminator中的常量必須與SnowFlake算法中一緻,否則會解析錯位

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86import java.util.BitSet;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.tuple.Pair;

import com.google.common.cache.CacheBuilder;

import com.google.common.cache.CacheLoader;

import com.google.common.cache.LoadingCache;

public class DuplicationEliminator{

private final static long SEQUENCE_BIT = 12; // 序列号占用的位數

private final static long MACHINE_BIT = 8; // 機器辨別占用的位數

private final static long MILLI_BIT = 10; // 毫秒占用的位數

private final static long MACHINE_LEFT = SEQUENCE_BIT;

private final static long TIMESTMP_LEFT = MACHINE_LEFT + MACHINE_BIT;

private final static long START_STMP = 1388505600000L;

private static final int RETENTION_MINUTES = 1;

private LoadingCache cache = CacheBuilder.newBuilder()//

.refreshAfterWrite(RETENTION_MINUTES, TimeUnit.MINUTES)// 給定時間内沒有被讀/寫通路,則回收。

.build(new CacheLoader() {

private final static int MAX_MILLI = (int) (-1L ^ (-1L << MILLI_BIT));

@Override

public BitSet load(Long key) throws ExecutionException{

// BitSet的值由sequence和毫秒數組合而成,每秒并發不超過1000的topic一般sequence都是0,是以這裡設定初始size是MAX_MILLI,避免過多的擴容開銷

return new BitSet(MAX_MILLI);

}

});

public boolean putIfAbsent(long id) throws ExecutionException{

Pair pair = idToPair(id);

long key = pair.getKey();

int value = pair.getValue();

BitSet existingValues = cache.get(key);

if (existingValues.get(value)) {

return false;

} else {

existingValues.set(value);

return true;

}

}

private Pair idToPair(long id){

int seq = (int) ((id) & ~(-1L << SEQUENCE_BIT));

long machineId = (id >> MACHINE_LEFT) & ~(-1L << MACHINE_BIT);

long timestamp = (id >> TIMESTMP_LEFT) + START_STMP;

long sec = timestamp / 1000;

int milli = (int) (timestamp % 1000);

return Pair.of(sec << MACHINE_BIT | machineId, seq << MILLI_BIT | milli);

}

private long pairToId(Pair pair){

long key = pair.getKey();

int value = pair.getValue();

long sequence = (value) >> MILLI_BIT;

long machineId = (key) & ~(-1L << MACHINE_BIT);

long timestamp = (key >> MACHINE_BIT) * 1000 + (value & ~(-1L << MILLI_BIT));

return (timestamp - START_STMP) << TIMESTMP_LEFT // 時間戳部分

| machineId << MACHINE_LEFT // 機器辨別部分

| sequence; // 序列号部分

}

}

測試

測試寫入一億個id,耗時48.4s,QPS=206.6萬,記憶體占用68MB,性能和開銷都還可以

1

2

3

4

5

6

7

8

9

10

11public static void main(String[] args) throws ExecutionException, InterruptedException{

DuplicationEliminator eliminator = new DuplicationEliminator();

Stopwatch watch = Stopwatch.createStarted();

for (int i = 0; i < 1_0000_0000; i++) {

long id = IdGenerator.generateId();

if (!eliminator.putIfAbsent(id)) {

System.out.println("duplicated: " + id);

}

}

System.out.println(watch.elapsed(TimeUnit.MILLISECONDS));

}