我在这篇focus在两个主题:如何支持多表同步共用一个jar包,如何持续稳定的与ES交互写入数据。 在 《 使用Hbase协作器(Coprocessor)同步数据到ElasticSearch 》中作者把两个关键组件中的属性和方法都声明为static,这意味什么?类方法和属性在所有的线程中共享,源代码请参考该博客。 问题出来了,当你用如下传参数的方式绑定到多个表:
alter 'test_record', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase_es/hbase-observer-elasticsearch-1.0-SNAPSHOT-zcestestrecord.jar|org.eminem.hbase.observer.HbaseDataSyncEsObserver|1001|es_cluster=zcits,es_type=zcestestrecord,es_index=zcestestrecord,es_port=9100,es_host=master'
Hbase中的多个表同步到ES会串数据,什么意思? 比如说,同步Hbase中的A、B表到ES中A`、B`,A表的数据都到B`中了。造成这种错误的原因就是上述两个构件使用了静态的方法和属性。如何改正,就是都改为非静态的方法和类,用到该构件的时候实例化。代码如下:
EsClient构件:
public class EsClient {
// ElasticSearch的集群名称
private String clusterName;
// ElasticSearch的host
private String[] nodeHost;
// ElasticSearch的端口(Java API用的是Transport端口,也就是TCP)
private int nodePort;
private TransportClient client = null;
private static final Log LOG = LogFactory.getLog(EsClient.class);
/**
* get Es config
*
* @return
*/
public EsClient(String clusterName, String nodeHost, int nodePort) {
this.clusterName = clusterName;
this.nodeHost = nodeHost.split("-");
this.nodePort = nodePort;
this.client = initEsClient();
}
public String getInfo() {
List<String> fields = new ArrayList<String>();
try {
for (Field f : EsClient.class.getDeclaredFields()) {
fields.add(f.getName() + "=" + f.get(this));
}
} catch (IllegalAccessException ex) {
ex.printStackTrace();
}
return StringUtils.join(fields, ", ");
}
public String getOneNodeHost() {
if (this.nodeHost == null || this.nodeHost.length == 0) {
return "";
}
Random rand = new Random();
return nodeHost[rand.nextInt(this.nodeHost.length)];
}
/**
* init ES client
*/
public TransportClient initEsClient() {
LOG.info("---------- Init ES Client " + this.clusterName + " -----------");
TransportClient client = null;
Settings settings = Settings.builder().put("cluster.name", this.clusterName).put("client.transport.sniff", true).build();
try {
client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(getOneNodeHost()), this.nodePort));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
public void repeatInitEsClient() {
this.client = initEsClient();
}
/**
* @return the clusterName
*/
public String getClusterName() {
return clusterName;
}
/**
* @param clusterName the clusterName to set
*/
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
/**
* @return the nodePort
*/
public int getNodePort() {
return nodePort;
}
/**
* @param nodePort the nodePort to set
*/
public void setNodePort(int nodePort) {
this.nodePort = nodePort;
}
/**
* @return the client
*/
public TransportClient getClient() {
return client;
}
/**
* @param client the client to set
*/
public void setClient(TransportClient client) {
this.client = client;
}
}
ElasticSearchBulkOperator构件:
public class ElasticSearchBulkOperator {
private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);
private static final int MAX_BULK_COUNT = 5000;
private BulkRequestBuilder bulkRequestBuilder = null;
private Lock commitLock = new ReentrantLock();
private ScheduledExecutorService scheduledExecutorService = null;
private EsClient esClient = null;
public ElasticSearchBulkOperator(final EsClient esClient) {
LOG.info("----------------- Init Bulk Operator for Table: " + " ----------------");
this.esClient = esClient;
// init es bulkRequestBuilder
this.bulkRequestBuilder = esClient.getClient().prepareBulk();
// init thread pool and set size 1
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
// create beeper thread( it will be sync data to ES cluster)use a commitLock to protected bulk es as thread-save
Runnable beeper = new Runnable() {
@Override
public void run() {
commitLock.lock();
try {
LOG.info("Scheduled Thread start run for ");
bulkRequest(0);
} catch (Exception ex) {
LOG.error("Time Bulk " + " index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
};
// set beeper thread(15 second to delay first execution , 25 second period between successive executions)
scheduledExecutorService.scheduleAtFixedRate(beeper, 15, 25, TimeUnit.SECONDS);
}
/**
* shutdown time task immediately
*/
public void shutdownScheduEx() {
if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
scheduledExecutorService.shutdown();
}
}
/**
* bulk request when number of builders is grate then threshold
*
* @param threshold
*/
public void bulkRequest(int threshold) {
int count = bulkRequestBuilder.numberOfActions();
if (bulkRequestBuilder.numberOfActions() > threshold) {
try {
LOG.info("Bulk Request Run " + ", the row count is: " + count);
BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
if (bulkItemResponse.hasFailures()) {
LOG.error("------------- Begin: Error Response Items of Bulk Requests to ES ------------");
LOG.error(bulkItemResponse.buildFailureMessage());
LOG.error("------------- End: Error Response Items of Bulk Requests to ES ------------");
}
bulkRequestBuilder = esClient.getClient().prepareBulk();
} catch (Exception e) {// two cause: 1. transport client is closed 2. None of the configured nodes are available
LOG.error(" Bulk Request " + " index error : " + e.getMessage());
LOG.error("Reconnect the ES server...");
List<DocWriteRequest> tempRequests = bulkRequestBuilder.request().requests();
esClient.getClient().close();
esClient.repeatInitEsClient();
bulkRequestBuilder = esClient.getClient().prepareBulk();
bulkRequestBuilder.request().add(tempRequests);
}
}
}
/**
* add update builder to bulk use commitLock to protected bulk as
* thread-save
*
* @param builder
*/
public void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" Add Bulk index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
/**
* add delete builder to bulk use commitLock to protected bulk as
* thread-save
*
* @param builder
*/
public void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" delete Bulk index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
}
注意:我在TransportClient的setting中用了 "client.transport.sniff"=true,这对持续同步的稳定性至关重要,前提ES是多台机器的集群。这样就可以实现多个表同时绑定一个jar包传入不同参数时,不发生串表的奇怪现象。 ElasticSearchBulkOperator构件的bulkRequest方法至关重要,写不好轻则同步数据丢失,重则Hbase挂掉。 比如,导致Hbase中RegionServer的堆积的RPC过多,导致数据不能写入Hbase,如下图所示:
为什么会出现上图这种情况,我建议研究一下Coprocessor运行机制,以及RegionServer与Master的交互机制。在这里就不多说了。我们有必要花点时间研究一下bulkRequest方法: 我采取定时定量的方式来执行一次BulkRequest方法, BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet(); 该次交互有序的请求多条Requests,返回对应的多依次次对应的Responses结果,可能某些条数据没有成功,很大原因是ES的Mapping类型抛出异常,导致数据插入失败,题外话就是一定要进行数据的校验和帅选。言归正传,这些没有成功的数据你如何处理这要看你的处理机制——直接舍弃,还是记在某些地方。千万不要像《 使用Hbase协作器(Coprocessor)同步数据到ElasticSearch 》中那样: if (!bulkItemResponse.hasFailures()) { bulkRequestBuilder = ESClient.client.prepareBulk(); } 这样你会死的很惨。 esclient不能保证一直连接不失败吧,所以要有重连机制,这对单台的ES服务器至关重要。上述代码列出了两种esclient连接断掉的原因:1. transport client is closed 2. None of the configured nodes are available。为了不丢失上次请求失败的数据,我们要把这些数据加入到新建的esclient中的bulkRequestBuilder,重新发送,逻辑如下:
LOG.error("Reconnect the ES server...");
List<DocWriteRequest> tempRequests = bulkRequestBuilder.request().requests();
esClient.getClient().close();
esClient.repeatInitEsClient();
bulkRequestBuilder = esClient.getClient().prepareBulk();
bulkRequestBuilder.request().add(tempRequests);
上述都是必须要做的,你也可以扩展,比如设计重连机制。 第二篇就讲到这吧,下一篇写一些如何把Coprocessor的侵入性给Hbase带来的宕机风险降到最低,如何高效的实现同步,以及心得。