天天看點

HBase2 使用協處理器删除指定qualifier的全部資料

使用者畫像的場景中,通常會開發很多标簽,每個标簽作為一個qualifier,其中有一些不再使用後需要下線,但hbase提供的delete相關api都隻能針對單行,要清理某個qualifier的全部資料不太容易,這裡提供一個基于協處理器的實作方案;

hbase對于compact過程提供了以下5個hook可以嵌入自定義代碼:

  • preCompactSelection
  • postCompactSelection
  • preCompactScannerOpen
  • preCompact
  • postCompact

而preCompact會在建立了storeScanner之後讀取資料之前調用,是以這裡的思路就是對scanner進行代理,建立一個新的scanner實作其next方法,進而對讀取到的原始資料進行加工;

代碼如下,參考了hbase-examples子產品中的ValueRewritingObserver類:

public class QualifierDeletingObserver implements RegionObserver, RegionCoprocessor {
  private static final Logger LOG = LoggerFactory.getLogger(QualifierDeletingObserver.class);

  private byte[] qualifierToDelete = null;
  private Bytes.ByteArrayComparator comparator;

  @Override
  public Optional<RegionObserver> getRegionObserver() {
    // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
    return Optional.of(this);
  }

  @Override
  public void start(
      @SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
    qualifierToDelete = Bytes.toBytes(renv.getConfiguration().get("qualifier.to.delete"));
    comparator = new Bytes.ByteArrayComparator();
  }

  @Override
  public InternalScanner preCompact(
      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
      final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
      CompactionRequest request) {
    InternalScanner modifyingScanner = new InternalScanner() {
      @Override
      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
        boolean ret = scanner.next(result, scannerContext);
        for (int i = 0; i < result.size(); i++) {
          Cell c = result.get(i);
          byte[] qualifier = CellUtil.cloneQualifier(c);
          if (comparator.compare(qualifier, qualifierToDelete) == 0) {
            result.remove(i);
          }
        }
        return ret;
      }

      @Override
      public void close() throws IOException {
        scanner.close();
      }
    };

    return modifyingScanner;
  }
}           

打成jar包上傳到hdfs;

以下是簡單的測試過程展示;

create 'cp_test','f'

put 'cp_test','rk1','f:q1','123'
put 'cp_test','rk1','f:q2','123'
put 'cp_test','rk2','f:q1','123'
put 'cp_test','rk2','f:q2','123'
put 'cp_test','rk2','f:q3','123'


hbase(main):015:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q1, timestamp=1590567958995, value=123                                                                 
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q1, timestamp=1590567959048, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123        
 

alter 'cp_test' \
, METHOD => 'table_att', 'coprocessor'=>'hdfs://xxx.jar|xxx.QualifierDelexxxtingObserver|1024|qualifier.to.delete=q1'

flush 'cp_test'

major_compact 'cp_test'

hbase(main):017:0> scan 'cp_test'
ROW                                    COLUMN+CELL                                                                                                     
 rk1                                   column=f:q2, timestamp=1590567959023, value=123                                                                 
 rk2                                   column=f:q2, timestamp=1590567959073, value=123                                                                 
 rk2                                   column=f:q3, timestamp=1590567959842, value=123