天天看點

面向高穩定,高性能之-Hbase資料實時同步到ElasticSearch(之二)

我在這篇focus在兩個主題:如何支援多表同步共用一個jar包,如何持續穩定的與ES互動寫入資料。 在 《 使用Hbase協作器(Coprocessor)同步資料到ElasticSearch 》中作者把兩個關鍵元件中的屬性和方法都聲明為static,這意味什麼?類方法和屬性在所有的線程中共享,源代碼請參考該部落格。 問題出來了,當你用如下傳參數的方式綁定到多個表:

alter 'test_record', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase_es/hbase-observer-elasticsearch-1.0-SNAPSHOT-zcestestrecord.jar|org.eminem.hbase.observer.HbaseDataSyncEsObserver|1001|es_cluster=zcits,es_type=zcestestrecord,es_index=zcestestrecord,es_port=9100,es_host=master'
           

Hbase中的多個表同步到ES會串資料,什麼意思? 比如說,同步Hbase中的A、B表到ES中A`、B`,A表的資料都到B`中了。造成這種錯誤的原因就是上述兩個構件使用了靜态的方法和屬性。如何改正,就是都改為非靜态的方法和類,用到該構件的時候執行個體化。代碼如下:

EsClient構件:

public class EsClient {

	// ElasticSearch的叢集名稱
	private String clusterName;
	// ElasticSearch的host
	private String[] nodeHost;
	// ElasticSearch的端口(Java API用的是Transport端口,也就是TCP)
	private int nodePort;
	private TransportClient client = null;

	private static final Log LOG = LogFactory.getLog(EsClient.class);

	/**
	 * get Es config
	 * 
	 * @return
	 */
	public EsClient(String clusterName, String nodeHost, int nodePort) {
		this.clusterName = clusterName;
		this.nodeHost = nodeHost.split("-");
		this.nodePort = nodePort;
		this.client = initEsClient();

	}

	public String getInfo() {
		List<String> fields = new ArrayList<String>();
		try {
			for (Field f : EsClient.class.getDeclaredFields()) {
				fields.add(f.getName() + "=" + f.get(this));
			}
		} catch (IllegalAccessException ex) {
			ex.printStackTrace();
		}
		return StringUtils.join(fields, ", ");
	}

	public String getOneNodeHost() {
		if (this.nodeHost == null || this.nodeHost.length == 0) {
			return "";
		}
		Random rand = new Random();
		return nodeHost[rand.nextInt(this.nodeHost.length)];

	}

	/**
	 * init ES client
	 */
	public TransportClient initEsClient() {
		LOG.info("---------- Init ES Client " + this.clusterName + " -----------");
		TransportClient client = null;
		Settings settings = Settings.builder().put("cluster.name", this.clusterName).put("client.transport.sniff", true).build();

		try {
			client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(getOneNodeHost()), this.nodePort));
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
		return client;
	}

	public void repeatInitEsClient() {
		this.client = initEsClient();
	}

	/**
	 * @return the clusterName
	 */
	public String getClusterName() {
		return clusterName;
	}

	/**
	 * @param clusterName the clusterName to set
	 */
	public void setClusterName(String clusterName) {
		this.clusterName = clusterName;
	}

	/**
	 * @return the nodePort
	 */
	public int getNodePort() {
		return nodePort;
	}

	/**
	 * @param nodePort the nodePort to set
	 */
	public void setNodePort(int nodePort) {
		this.nodePort = nodePort;
	}

	/**
	 * @return the client
	 */
	public TransportClient getClient() {
		return client;
	}

	/**
	 * @param client the client to set
	 */
	public void setClient(TransportClient client) {
		this.client = client;
	}

}
           

ElasticSearchBulkOperator構件:

public class ElasticSearchBulkOperator {

	private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);

	private static final int MAX_BULK_COUNT = 5000;

	private BulkRequestBuilder bulkRequestBuilder = null;

	private Lock commitLock = new ReentrantLock();

	private ScheduledExecutorService scheduledExecutorService = null;
	private EsClient esClient = null;

	public ElasticSearchBulkOperator(final EsClient esClient) {
		LOG.info("----------------- Init Bulk Operator for Table: " + " ----------------");
		this.esClient = esClient;
		// init es bulkRequestBuilder
		this.bulkRequestBuilder = esClient.getClient().prepareBulk();
		// init thread pool and set size 1
		this.scheduledExecutorService = Executors.newScheduledThreadPool(1);

		// create beeper thread( it will be sync data to ES cluster)use a commitLock to protected bulk es as thread-save
		Runnable beeper = new Runnable() {
			@Override
			public void run() {
				commitLock.lock();
				try {
					LOG.info("Scheduled Thread start run for ");
					bulkRequest(0);
				} catch (Exception ex) {
					LOG.error("Time Bulk " + " index error : " + ex.getMessage());
				} finally {
					commitLock.unlock();
				}
			}
		};

		// set beeper thread(15 second to delay first execution , 25 second period between successive executions)
		scheduledExecutorService.scheduleAtFixedRate(beeper, 15, 25, TimeUnit.SECONDS);

	}

	/**
	 * shutdown time task immediately
	 */
	public void shutdownScheduEx() {
		if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
			scheduledExecutorService.shutdown();
		}
	}

	/**
	 * bulk request when number of builders is grate then threshold
	 * 
	 * @param threshold
	 */
	public void bulkRequest(int threshold) {
		int count = bulkRequestBuilder.numberOfActions();
		if (bulkRequestBuilder.numberOfActions() > threshold) {
			try {
				LOG.info("Bulk Request Run " + ", the row count is: " + count);
				BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
				if (bulkItemResponse.hasFailures()) {
					LOG.error("------------- Begin: Error Response Items of Bulk Requests to ES ------------");
					LOG.error(bulkItemResponse.buildFailureMessage());
					LOG.error("------------- End: Error Response Items of Bulk Requests to ES ------------");
				}
				bulkRequestBuilder = esClient.getClient().prepareBulk();
			} catch (Exception e) {// two cause: 1. transport client is closed 2. None of the configured nodes are available
				LOG.error(" Bulk Request " + " index error : " + e.getMessage());
				LOG.error("Reconnect the ES server...");
				List<DocWriteRequest> tempRequests = bulkRequestBuilder.request().requests();
				esClient.getClient().close();
				esClient.repeatInitEsClient();
				bulkRequestBuilder = esClient.getClient().prepareBulk();
				bulkRequestBuilder.request().add(tempRequests);
			}
		}
	}

	/**
	 * add update builder to bulk use commitLock to protected bulk as
	 * thread-save
	 * 
	 * @param builder
	 */
	public void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
		commitLock.lock();
		try {
			bulkRequestBuilder.add(builder);
			bulkRequest(MAX_BULK_COUNT);
		} catch (Exception ex) {
			LOG.error(" Add Bulk index error : " + ex.getMessage());
		} finally {
			commitLock.unlock();
		}
	}

	/**
	 * add delete builder to bulk use commitLock to protected bulk as
	 * thread-save
	 * 
	 * @param builder
	 */
	public void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
		commitLock.lock();
		try {
			bulkRequestBuilder.add(builder);
			bulkRequest(MAX_BULK_COUNT);
		} catch (Exception ex) {
			LOG.error(" delete Bulk index error : " + ex.getMessage());
		} finally {
			commitLock.unlock();
		}
	}
}
           

注意:我在TransportClient的setting中用了 "client.transport.sniff"=true,這對持續同步的穩定性至關重要,前提ES是多台機器的叢集。這樣就可以實作多個表同時綁定一個jar包傳入不同參數時,不發生串表的奇怪現象。 ElasticSearchBulkOperator構件的bulkRequest方法至關重要,寫不好輕則同步資料丢失,重則Hbase挂掉。 比如,導緻Hbase中RegionServer的堆積的RPC過多,導緻資料不能寫入Hbase,如下圖所示:

面向高穩定,高性能之-Hbase資料實時同步到ElasticSearch(之二)

為什麼會出現上圖這種情況,我建議研究一下Coprocessor運作機制,以及RegionServer與Master的互動機制。在這裡就不多說了。我們有必要花點時間研究一下bulkRequest方法: 我采取定時定量的方式來執行一次BulkRequest方法, BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet(); 該次互動有序的請求多條Requests,傳回對應的多依次次對應的Responses結果,可能某些條資料沒有成功,很大原因是ES的Mapping類型抛出異常,導緻資料插入失敗,題外話就是一定要進行資料的校驗和帥選。言歸正傳,這些沒有成功的資料你如何處理這要看你的處理機制——直接舍棄,還是記在某些地方。千萬不要像《 使用Hbase協作器(Coprocessor)同步資料到ElasticSearch 》中那樣: if (!bulkItemResponse.hasFailures()) { bulkRequestBuilder = ESClient.client.prepareBulk(); } 這樣你會死的很慘。 esclient不能保證一直連接配接不失敗吧,是以要有重連機制,這對單台的ES伺服器至關重要。上述代碼列出了兩種esclient連接配接斷掉的原因:1. transport client is closed 2. None of the configured nodes are available。為了不丢失上次請求失敗的資料,我們要把這些資料加入到建立的esclient中的bulkRequestBuilder,重新發送,邏輯如下:

LOG.error("Reconnect the ES server...");
List<DocWriteRequest> tempRequests = bulkRequestBuilder.request().requests();
esClient.getClient().close();
esClient.repeatInitEsClient();
bulkRequestBuilder = esClient.getClient().prepareBulk();
bulkRequestBuilder.request().add(tempRequests);
           

上述都是必須要做的,你也可以擴充,比如設計重連機制。 第二篇就講到這吧,下一篇寫一些如何把Coprocessor的侵入性給Hbase帶來的當機風險降到最低,如何高效的實作同步,以及心得。

繼續閱讀