接口:
package com.core.hbase;
import org.apache.hadoop.hbase.client.Result;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public interface IHbaseDao {
/**
* 创建表
*
* @param tableName
* @param familyNames 列簇
* @throws IOException
*/
public void createTable(String tableName, String... familyNames) throws IOException;
/**
* 删除表
*
* @param tableName
* @throws IOException
*/
public void deleteTable(String tableName) throws IOException;
/**
* 查询全表的数据
*
* @param tablename
* @return
*/
public List<Result> scaner(String tablename);
/**
* 根据rowKey查询单条记录
*
* @param tableName
* @param rowKey
* @return
*/
public Result getRow(String tableName, String rowKey);
/**
* 根据regxKey正则匹配数据
*
* @param tableName
* @param regxKey
* @return
*/
public List<Result> getRegexRow(String tableName, String regxKey);
/**
* 根据regxKey正则匹配数据,取出num条
*
* @param tableName
* @param regxKey
* @param num
* @return
*/
public List<Result> getRegexRow(String tableName, String regxKey, int num);
/**
* 根据startKey和endKey的范围匹配数据
*
* @param tableName
* @param startKey
* @param stopKey
* @return
*/
public List<Result> getStartRowAndEndRow(String tableName, String startKey, String stopKey);
/**
* 确定startKey和endKey的范围,根据regKey匹配数据
*
* @param tableName
* @param startKey
* @param stopKey
* @param regxKey
* @return
*/
public List<Result> getRegexRow(String tableName, String startKey, String stopKey, String regxKey);
/**
* 确定startKey和endKey的范围,根据regKey匹配数据,取出num条
*
* @param tableName
* @param startKey
* @param stopKey
* @param regxKey
* @param num
* @return
*/
public List<Result> getRegexRow(String tableName, String startKey, String stopKey, String regxKey, int num);
/**
* 添加数据
*
* @param rowKey
* @param tableName
* @param column
* @param value
*/
public Boolean addData(String rowKey, String tableName, String familyName, String[] column, String[] value);
/**
* 批量添加数据
*
* @param tableName
* @param familyName
* @param list
* @return
*/
public Boolean addDataBatch(final String tableName, final String familyName, List<Map<String, String>> list);
/**
* 删除记录
*
* @param tableName
* @param rowKeys
*/
public Boolean delRecord(String tableName, String... rowKeys);
/**
* 修改一条数据
*
* @param tableName
* @param rowKey
* @param familyName
* @param column
* @param value
* @throws IOException
*/
public Boolean updateTable(String tableName, String rowKey, String familyName, String column[], String value[]) throws IOException;
/**
* 批量修改数据
*
* @param tableName
* @param familyName
* @param list
* @return
*/
public Boolean updateTableBatch(final String tableName, final String familyName, List<Map<String, String>> list);
/**
* 查找最新的一条数据,或者说倒序查询
*
* @param tableName
* @return
*/
public Result getNewRow(String tableName);
/**
* 正则查出所有匹配的key
*
* @param tableName
* @param regxKey
* @return
*/
public List<String> queryKeys(String tableName, String regxKey);
/**
* 增加表中对应字段的值
*
* @param tableName
* @param cf
* @param rowKey
* @param column
* @param num
* @return
*/
long incrQualifier(String tableName, String cf, String rowKey, String column, long num);
/**
* 查找最新的一条数据,或者说倒序查询
*
* @param tableName
* @param regxKey
* @return
*/
Result getNewRow(String tableName, String regxKey);
/**
* 根据列值查询(类似外键关联)
*
* @param tableName
* @param cf
* @param column
* @param value
* @return
*/
List<Result> getBySingleColumnValueFilter(String tableName, String cf, String column, String value);
/**
* 根据多列列值查询(类似外键关联)
*
* @param tableName
* @param cf
* @param column
* @param value
* @return
*/
List<Result> getByColumnsValueFilter(String tableName, String cf, String[] column, String[] value);
/**
* 根据多列列值查询分页
*
* @param tableName
* @param cf
* @param column
* @param value
* @return
*/
List<Result> getByColumnsValueFilterByPage(String startRowKey, int pageSize, String tableName, String cf, String[] column, String[] value);
/**
* 根据时间查询数据
*
* @param tableName
* @param cf
* @param column
* @param value
* @return
*/
List<Result> getByTimeValue(String tableName, String cf, String[] column, String[] value);
/**
* 利用HBase协处理器Coprocessor,统计行
*
* @param tablename
* @return
* @throws Throwable
*/
public Long rowCountByCoprocessor(String tablename,String cf,String[] column, String[] value) throws Throwable;
/**
* Result数据转成Map
*
* @param result
* @param rowNum
* @return
*/
public Map<String, String> mapRow(Result result, int rowNum);
/**
* @Description: 通过pageNumber,pageSize,查询起始页
* @Param: [tableName, pageNumber, pageSize]
* @return: java.lang.String
* @Author: zhoujie
* @Date: 2018/11/8
*/
String getStartRowKey(String tableName, Integer pageNumber, Integer pageSize) throws Throwable;
/**
* 列表所有数据
*
* @param tableName
* @return
*/
public List<Map<String, String>> listAll(String tableName);
}
实现类:
package com.swstsoft.core.hbase;
import com.swstsoft.constant.ConstantUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import java.io.IOException;
import java.util.*;
@Component
public class HbaseDaoImpl implements IHbaseDao {
@Autowired
private HbaseTemplate hbaseTemplate;
//编码常量
private static final String ENCODING = "utf-8";
@Override
public void createTable(String tableName, String... familyNames) throws IOException {
Configuration configuration = hbaseTemplate.getConfiguration();
HBaseAdmin admin = new HBaseAdmin(configuration);
HTableDescriptor htd = new HTableDescriptor(tableName);
for (String st : familyNames) {
htd.addFamily(new HColumnDescriptor(st));
}
admin.createTable(htd);
admin.close();
}
@Override
public void deleteTable(String tableName) throws IOException {
Configuration configuration = hbaseTemplate.getConfiguration();
HBaseAdmin admin = new HBaseAdmin(configuration);
if (!admin.isTableDisabled(tableName)) {
admin.disableTable(tableName);
}
admin.deleteTable(tableName);
admin.close();
}
@Override
public List<Result> scaner(final String tableName) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
Scan scan = new Scan();
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
list.add(result);
}
return list;
}
});
}
@Override
public Result getRow(final String tableName, final String rowKey) {
return hbaseTemplate.execute(tableName, new TableCallback<Result>() {
@Override
public Result doInTable(HTableInterface table) throws Throwable {
Get get = new Get(rowKey.getBytes(ENCODING));
return table.get(get);
}
});
}
@Override
public List<Result> getRegexRow(final String tableName, final String regxKey) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
RegexStringComparator rc = new RegexStringComparator(regxKey);
RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, rc);
Scan scan = new Scan();
scan.setFilter(rowFilter);
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
list.add(result);
}
return list;
}
});
}
@Override
public List<Result> getRegexRow(final String tableName, final String regxKey, final int num) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
RegexStringComparator rc = new RegexStringComparator(regxKey);
RowFilter rf = new RowFilter(CompareOp.EQUAL, rc);
if (num > 0) {
// 过滤获取的条数
Filter filterNum = new PageFilter(num);// 每页展示条数
fl.addFilter(filterNum);
} // 过滤器的添加
fl.addFilter(rf);
Scan scan = new Scan();
scan.setFilter(fl);// 为查询设置过滤器的list
ResultScanner rscanner = table.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
@Override
public List<Result> getStartRowAndEndRow(final String tableName, final String startKey, final String stopKey) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
// 过滤器的添加
Scan scan = new Scan();
scan.setStartRow(startKey.getBytes(ENCODING));// 开始的key
scan.setStopRow(stopKey.getBytes(ENCODING));// 结束的key
ResultScanner rscanner = table.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
@Override
public List<Result> getRegexRow(final String tableName, final String startKey, final String stopKey, final String regxKey) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
// 设置正则过滤器
RegexStringComparator rc = new RegexStringComparator(regxKey);
RowFilter rf = new RowFilter(CompareOp.EQUAL, rc);
// 过滤器的添加
Scan scan = new Scan();
scan.setStartRow(startKey.getBytes(ENCODING));// 开始的key
scan.setStopRow(stopKey.getBytes(ENCODING));// 结束的key
scan.setFilter(rf);// 为查询设置过滤器的list
ResultScanner rscanner = table.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
@Override
public List<Result> getRegexRow(final String tableName, final String startKey, final String stopKey, final String regxKey, final int num) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
// 设置正则过滤器
RegexStringComparator rc = new RegexStringComparator(regxKey);
RowFilter rf = new RowFilter(CompareOp.EQUAL, rc);
if (num > 0) {
// 过滤获取的条数
Filter filterNum = new PageFilter(num);// 每页展示条数
fl.addFilter(filterNum);
}
// 过滤器的添加
fl.addFilter(rf);
// 过滤器的添加
Scan scan = new Scan();
scan.setStartRow(startKey.getBytes(ENCODING));// 开始的key
scan.setStopRow(stopKey.getBytes(ENCODING));// 结束的key
scan.setFilter(fl);// 为查询设置过滤器的list
ResultScanner rscanner = table.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
@Override
public Boolean addData(final String rowKey, final String tableName, final String familyName, final String[] column, final String[] value) {
return hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
@Override
public Boolean doInTable(HTableInterface table) throws Throwable {
Put put = new Put(Bytes.toBytes(rowKey));
// 设置rowkey
for (int j = 0; j < column.length; j++) {
put.add(Bytes.toBytes(familyName), Bytes.toBytes(column[j]), Bytes.toBytes(value[j]));
}
table.put(put);
return true;
}
});
}
@Override
public Boolean addDataBatch(final String tableName, final String familyName, List<Map<String, String>> list) {
return hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
@Override
public Boolean doInTable(HTableInterface table) throws Throwable {
List<Put> puts = new ArrayList<>();
for (Map<String, String> map : list) {
// 设置rowkey
Put put = new Put(Bytes.toBytes(map.get("rowKey")));
Set<Map.Entry<String, String>> entries = map.entrySet();
for (Map.Entry<String, String> entry : entries) {
if ("rowKey".equals(entry.getKey()))
continue;
put.add(Bytes.toBytes(familyName), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
}
puts.add(put);
}
table.put(puts);
return true;
}
});
}
@Override
public Boolean delRecord(final String tableName, final String... rowKeys) {
return hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
@Override
public Boolean doInTable(HTableInterface table) throws Throwable {
List<Delete> list = new ArrayList<>();
for (String rowKey : rowKeys) {
Delete del = new Delete(Bytes.toBytes(rowKey));
list.add(del);
}
table.delete(list);
return true;
}
});
}
@Override
public Boolean updateTable(final String tableName, final String rowKey, final String familyName, final String[] column, final String[] value) throws IOException {
return hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
@Override
public Boolean doInTable(HTableInterface table) throws Throwable {
Put put = new Put(Bytes.toBytes(rowKey));
for (int j = 0; j < column.length; j++) {
put.add(Bytes.toBytes(familyName), Bytes.toBytes(column[j]), Bytes.toBytes(value[j]));
}
table.put(put);
return true;
}
});
}
@Override
public Boolean updateTableBatch(final String tableName, final String familyName, List<Map<String, String>> list) {
return hbaseTemplate.execute(tableName, new TableCallback<Boolean>() {
@Override
public Boolean doInTable(HTableInterface table) throws Throwable {
List<Put> puts = new ArrayList<>();
for (Map<String, String> map : list) {
// 设置rowkey
Put put = new Put(Bytes.toBytes(map.get("rowKey")));
Set<Map.Entry<String, String>> entries = map.entrySet();
for (Map.Entry<String, String> entry : entries) {
put.add(Bytes.toBytes(familyName), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
}
puts.add(put);
}
table.put(puts);
return true;
}
});
}
@Override
public Result getNewRow(final String tableName) {
return hbaseTemplate.execute(tableName, new TableCallback<Result>() {
@Override
public Result doInTable(HTableInterface table) throws Throwable {
Filter filterNum = new PageFilter(1);// 每页展示条数
Scan scan = new Scan();
scan.setFilter(filterNum);
scan.setReversed(true);
ResultScanner scanner = table.getScanner(scan);
return scanner.next();
}
});
}
@Override
public Result getNewRow(final String tableName, final String regxKey) {
return hbaseTemplate.execute(tableName, new TableCallback<Result>() {
@Override
public Result doInTable(HTableInterface table) throws Throwable {
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
RegexStringComparator rc = new RegexStringComparator(regxKey);
RowFilter rf = new RowFilter(CompareOp.EQUAL, rc);
Filter filterNum = new PageFilter(1);// 每页展示条数
fl.addFilter(rf);
fl.addFilter(filterNum);
Scan scan = new Scan();
scan.setFilter(fl);
scan.setReversed(true);
ResultScanner scanner = table.getScanner(scan);
return scanner.next();
}
});
}
@Override
public List<String> queryKeys(final String tableName, final String regxKey) {
return hbaseTemplate.execute(tableName, new TableCallback<List<String>>() {
List<String> list = new ArrayList<>();
@Override
public List<String> doInTable(HTableInterface table) throws Throwable {
PrefixFilter filter = new PrefixFilter(regxKey.getBytes(ENCODING));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result rs : scanner) {
list.add(new String(rs.getRow()));
}
return list;
}
});
}
@Override
public long incrQualifier(final String tableName, final String cf, final String rowKey, final String column, final long num) {
return hbaseTemplate.execute(tableName, new TableCallback<Long>() {
@Override
public Long doInTable(HTableInterface table) throws Throwable {
long qualifie = table.incrementColumnValue(rowKey.getBytes(ENCODING), cf.getBytes(ENCODING), column.getBytes(ENCODING), num);
return qualifie;
}
});
}
@Override
public List<Result> getByColumnsValueFilter(String tableName, String cf, String[] column, String[] value) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
FilterList filterList = new FilterList();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
// 要查询的字段数组集
Scan scan = new Scan();
for (int i = 0; i < column.length; i++) {
SingleColumnValueFilter b = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(column[i]), CompareOp.EQUAL, new RegexStringComparator(value[i]));
filterList.addFilter(b);
}
scan.setFilter(filterList);
ResultScanner rscanner = table.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
@Override
public List<Result> getByColumnsValueFilterByPage(String startRowKey, int pageSize, String tableName, String cf, String[] column, String[] value) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
FilterList filterList = new FilterList();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
// 要查询的字段数组集
Scan scan = new Scan();
//与时间相关的提出来
for (int i = 0; i < column.length; i++) {
// 起始时间和开始时间的范围查询
if (column[i].equals("startTime")) {
SingleColumnValueFilter startTime = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes("createTimeString"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(value[i]));
filterList.addFilter(startTime);
} else if (column[i].equals("endTime")) {
SingleColumnValueFilter endTime = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes("createTimeString"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(value[i]));
filterList.addFilter(endTime);
} else {
SingleColumnValueFilter b = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(column[i]), CompareOp.EQUAL, new RegexStringComparator(value[i]));
filterList.addFilter(b);
}
}
scan.setStartRow(Bytes.toBytes(startRowKey));
PageFilter pageFilter = new PageFilter(pageSize);
filterList.addFilter(pageFilter);
scan.setFilter(filterList);
ResultScanner rscanner = table.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
@Override
public List<Result> getByTimeValue(String tableName, String cf, String[] column, String[] value) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
FilterList filterList = new FilterList();
@Override
public List<Result> doInTable(HTableInterface hTableInterface) throws Throwable {
// 要查询的字段数组集
Scan scan = new Scan();
//与时间相关的提出来
for (int i = 0; i < column.length; i++) {
// 起始时间和开始时间的范围查询
if (column[i].equals("startTime")) {
SingleColumnValueFilter startTime = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes("createTimeString"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(value[i]));
filterList.addFilter(startTime);
} else if (column[i].equals("endTime")) {
SingleColumnValueFilter endTime = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes("createTimeString"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(value[i]));
filterList.addFilter(endTime);
} else {
SingleColumnValueFilter b = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(column[i]), CompareOp.EQUAL, new RegexStringComparator(value[i]));
filterList.addFilter(b);
}
}
scan.setFilter(filterList);
ResultScanner rscanner = hTableInterface.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
@Override
public List<Result> getBySingleColumnValueFilter(final String tableName, final String cf, final String column, final String value) {
return hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
List<Result> list = new ArrayList<>();
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
// 要查询的字段
Scan scan = new Scan();
SingleColumnValueFilter b = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(column), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(value)));
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, b);
scan.setFilter(filterList);
ResultScanner rscanner = table.getScanner(scan);
for (Result result : rscanner) {
list.add(result);
}
return list;
}
});
}
/**
* 分页获取StartRowKey
*
* @param tableName
* @param pageNumber
* @param pageSize
* @return
* @throws Exception
*/
public String getStartRowKey(String tableName, Integer pageNumber, Integer pageSize) throws Exception {
if (pageNumber <= 1) {
return null;
}
String startRowKey = null;
for (int i = 1; i < pageNumber; i++) {
String finalStartRowKey = startRowKey;
// TODO 待优化
List<Result> results = hbaseTemplate.execute(tableName, new TableCallback<List<Result>>() {
@Override
public List<Result> doInTable(HTableInterface table) throws Throwable {
Scan scan = new Scan();
if (!StringUtils.isBlank(finalStartRowKey)) {
scan.setStartRow(finalStartRowKey.getBytes());
}
Filter pageFilter = new PageFilter(pageSize + 1);
scan.setFilter(pageFilter);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
List<Result> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(iterator.next());
}
return list;
}
});
for (Result result : results) {
startRowKey = Bytes.toString(result.getRow());
}
}
return startRowKey;
}
/**
* 列表所有数据
*
* @param tableName
* @return
*/
@Override
public List<Map<String, String>> listAll(String tableName) {
List<Result> scaner = scaner(tableName);
List<Map<String, String>> list = new ArrayList<>();
for (int i = 0; i < scaner.size(); i++) {
Map<String, String> map = mapRow(scaner.get(i), i);
list.add(map);
}
return list;
}
/**
* 利用HBase协处理器Coprocessor,统计行
*
* @param
* @return
* @throws Throwable
*/
public Long rowCountByCoprocessor(String tableName,String cf,String[] column, String[] value) throws Throwable {
Configuration configuration = hbaseTemplate.getConfiguration();
// //提前创建connection和conf
// HBaseAdmin admin = new HBaseAdmin(configuration);
// TableName name = TableName.valueOf(tablename);
// //先disable表,添加协处理器后再enable表
// admin.disableTable(name);
// HTableDescriptor descriptor = admin.getTableDescriptor(name);
// String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
// if (!descriptor.hasCoprocessor(coprocessorClass)) {
// descriptor.addCoprocessor(coprocessorClass);
// }
// admin.modifyTable(name, descriptor);
// admin.enableTable(name);
Scan scan = new Scan();
FilterList filterList = new FilterList();
//与时间相关的提出来
for (int i = 0; i < column.length; i++) {
// 起始时间和开始时间的范围查询
if (column[i].equals("startTime")) {
SingleColumnValueFilter startTime = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes("createTimeString"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(value[i]));
filterList.addFilter(startTime);
} else if (column[i].equals("endTime")) {
SingleColumnValueFilter endTime = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes("createTimeString"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(value[i]));
filterList.addFilter(endTime);
} else {
SingleColumnValueFilter b = new SingleColumnValueFilter(Bytes.toBytes(cf), Bytes.toBytes(column[i]), CompareOp.EQUAL, new RegexStringComparator(value[i]));
filterList.addFilter(b);
}
}
scan.setFilter(filterList);
AggregationClient aggregationClient = new AggregationClient(configuration);
long rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
return rowCount;
}
/**
* Result数据转成Map
*
* @param result
* @param rowNum
* @return
*/
public Map<String, String> mapRow(Result result, int rowNum) {
if (result.isEmpty()) {
return null;
}
Map<String, String> map = new HashMap<>();
map.put(ConstantUtil.ROW_KRY, Bytes.toString(result.getRow()));
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> ResultMap = result.getMap();
Set<Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> resultEntries = ResultMap.entrySet();
for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultEntry : resultEntries) {
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = resultEntry.getValue();
Set<Map.Entry<byte[], NavigableMap<Long, byte[]>>> familyEntries = familyMap.entrySet();
for (Map.Entry<byte[], NavigableMap<Long, byte[]>> family : familyEntries) {
NavigableMap<Long, byte[]> columnMap = family.getValue();
Set<Map.Entry<Long, byte[]>> columnEntries = columnMap.entrySet();
for (Map.Entry<Long, byte[]> column : columnEntries) {
map.put(Bytes.toString(family.getKey()), Bytes.toString(column.getValue()));
}
}
}
return map;
}
}