使用者畫像的場景中,通常會開發很多标簽,每個标簽作為一個qualifier,其中有一些不再使用後需要下線,但hbase提供的delete相關api都隻能針對單行,要清理某個qualifier的全部資料不太容易,這裡提供一個基于協處理器的實作方案;
hbase對于compact過程提供了以下5個hook可以嵌入自定義代碼:
- preCompactSelection
- postCompactSelection
- preCompactScannerOpen
- preCompact
- postCompact
而preCompact會在建立了storeScanner之後讀取資料之前調用,是以這裡的思路就是對scanner進行代理,建立一個新的scanner實作其next方法,進而對讀取到的原始資料進行加工;
代碼如下,參考了hbase-examples子產品中的ValueRewritingObserver類:
public class QualifierDeletingObserver implements RegionObserver, RegionCoprocessor {
private static final Logger LOG = LoggerFactory.getLogger(QualifierDeletingObserver.class);
private byte[] qualifierToDelete = null;
private Bytes.ByteArrayComparator comparator;
@Override
public Optional<RegionObserver> getRegionObserver() {
// Extremely important to be sure that the coprocessor is invoked as a RegionObserver
return Optional.of(this);
}
@Override
public void start(
@SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
qualifierToDelete = Bytes.toBytes(renv.getConfiguration().get("qualifier.to.delete"));
comparator = new Bytes.ByteArrayComparator();
}
@Override
public InternalScanner preCompact(
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
InternalScanner modifyingScanner = new InternalScanner() {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
boolean ret = scanner.next(result, scannerContext);
for (int i = 0; i < result.size(); i++) {
Cell c = result.get(i);
byte[] qualifier = CellUtil.cloneQualifier(c);
if (comparator.compare(qualifier, qualifierToDelete) == 0) {
result.remove(i);
}
}
return ret;
}
@Override
public void close() throws IOException {
scanner.close();
}
};
return modifyingScanner;
}
}
打成jar包上傳到hdfs;
以下是簡單的測試過程展示;
create 'cp_test','f'
put 'cp_test','rk1','f:q1','123'
put 'cp_test','rk1','f:q2','123'
put 'cp_test','rk2','f:q1','123'
put 'cp_test','rk2','f:q2','123'
put 'cp_test','rk2','f:q3','123'
hbase(main):015:0> scan 'cp_test'
ROW COLUMN+CELL
rk1 column=f:q1, timestamp=1590567958995, value=123
rk1 column=f:q2, timestamp=1590567959023, value=123
rk2 column=f:q1, timestamp=1590567959048, value=123
rk2 column=f:q2, timestamp=1590567959073, value=123
rk2 column=f:q3, timestamp=1590567959842, value=123
alter 'cp_test' \
, METHOD => 'table_att', 'coprocessor'=>'hdfs://xxx.jar|xxx.QualifierDelexxxtingObserver|1024|qualifier.to.delete=q1'
flush 'cp_test'
major_compact 'cp_test'
hbase(main):017:0> scan 'cp_test'
ROW COLUMN+CELL
rk1 column=f:q2, timestamp=1590567959023, value=123
rk2 column=f:q2, timestamp=1590567959073, value=123
rk2 column=f:q3, timestamp=1590567959842, value=123