天天看點

Java中ES5.3批量插入_bulk實作方案

ES5.5.3衆所周知是十分尴尬的,用不了RestHighLevelClient, TransportClient可以用但是網上各種不建議使用,其實TransportClient在7.0.0才開始廢棄,8.0.0之後才正式移除,現在的5.5.3版本完全是可以光明正大使用的,不過考慮到後面的更新,還是做出了妥協。

這裡分享一下兩種方案:

方案一:使用TransportClient:

1.pom檔案

repository務必要指定,在mvnrepository是沒有的,另外網上給出大多數都不用引x-pack-transport,直接使用transport,這對于自建的MySQL資料中是沒有問題的,但是要同步阿裡雲的RDS就會報錯

...

<

dependency

>

<

groupId

>org.elasticsearch.client</

groupId

>

<

artifactId

>transport</

artifactId

>

<

version

>5.5.3</

version

>

</

dependency

>

<

dependency

>

<

groupId

>org.elasticsearch.plugin</

groupId

>

<

artifactId

>transport-netty3-client</

artifactId

>

<

version

>5.5.3</

version

>

</

dependency

>

<

dependency

>

<

groupId

>org.elasticsearch.client</

groupId

>

<

artifactId

>x-pack-transport</

artifactId

>

<

version

>5.5.3</

version

>

</

dependency

>

...

<

repository

>

<

id

>elasticsearch-releases</

id

>

<

url

>https://artifacts.elastic.co/maven</

url

>

<

releases

>

<

enabled

>true</

enabled

>

</

releases

>

<

snapshots

>

<

enabled

>false</

enabled

>

</

snapshots

>

</

repository

>

...

2. 配置

需要注意的是的(1)網上普遍的TransportClient都是通過PreBuiltTransportClient,這在RDS上面也是行不通的,需要使用PreBuiltXPackTransportClient;(2)阿裡雲的clusterName是es的執行個體id,并不是名稱。

@Configuration

@Log4j2

public

class

ElasticSearchConfig {

@Value

(

"${icec.elasticsearch.host}"

)

private

String host;

@Value

(

"${icec.elasticsearch.tcpPort}"

)

private

int

tcpPort;

@Value

(

"${icec.elasticsearch.clusterName}"

)

private

String clusterName;

@Value

(

"${icec.elasticsearch.username}"

)

private

String username;

@Value

(

"${icec.elasticsearch.password}"

)

private

String password;

@Bean

public

TransportClient transportClient() {

TransportClient transportClient = 

null

;

TransportClient preBuiltTransportClient = 

new

PreBuiltXPackTransportClient(Settings.builder()

.put(

"cluster.name"

, clusterName)

.put(

"xpack.security.user"

, username + 

":"

+ password)

.put(

"client.transport.sniff"

false

)

.build());

try

{

transportClient = preBuiltTransportClient

.addTransportAddress(

new

InetSocketTransportAddress(InetAddress.getByName(host), tcpPort));

catch

(UnknownHostException e) {

log.warn(e);

}

return

transportClient;

}

}

3. 使用示例:

public

void

batchInsert(List<Map> datas) {

if

(CollectionUtils.isEmpty(datas)) {

return

;

}

BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();

datas.forEach(data -> bulkRequestBuilder.add(transportClient.prepareIndex(ES_INDEX, ES_TYPE, (String) data.get(OrderEsConstant.ORDER_ID)).setSource(data)));

BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();

List<String> failuerMessages = Arrays.asList(bulkResponse.getItems())

.stream().filter(p -> StringUtils.isNotBlank(p.getFailureMessage()))

.map(BulkItemResponse::getFailureMessage).collect(Collectors.toList());

if

(CollectionUtils.isNotEmpty(failuerMessages)) {

log.info(

"同步失敗訂單->{}"

, failuerMessages);

}

log.info(

"批處理完成 總條數:{}"

, datas.size());

}

方案二:手寫批量操作,通過RestClient實作

public

void

batchInsert(List<Map> orders) {

StringBuilder bulkRequestBody = 

new

StringBuilder();

int

count = 

1

;

for

(Map order : orders) {

String actionMetaData = String.format(

"{ \"index\" : {\"_id\" : \"%s\"} }%n"

, order.get(OrderEsConstant.ORDER_ID));

String orderJson = JSON.toJSONString(order, SerializerFeature.WriteNullStringAsEmpty);

bulkRequestBody.append(actionMetaData);

bulkRequestBody.append(orderJson);

bulkRequestBody.append(

"\n"

);

if

(count % 

5000

== 

|| count == orders.size()) {

Response response;

Map result = Maps.newHashMap();

String router = String.format(BASIC_FORMAT, ES_INDEX, ES_TYPE, 

"_bulk"

);

try

{

response = restClient.performRequest(POST, router, Collections.emptyMap(), 

new

StringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON));

result = objectMapper.readValue(response.getEntity().getContent(), Map.

class

);

catch

(IOException e) {

log.info(

"ES批量插入異常"

);

}

List<Map> itemResults = ((List<Map>) result.get(

"items"

)).stream().map(p -> (Map) p.get(

"index"

)).collect(Collectors.toList());

List errorMsgs = itemResults.stream().filter(p -> 

200

!= (Integer) p.get(

"status"

)).map(

this

::getErrorMsg).collect(Collectors.toList());

if

(CollectionUtils.isNotEmpty(errorMsgs)) {

log.warn(

"資料插入失敗 -> {}"

, errorMsgs);

}

log.info(

"批處理完成,總條數: {}"

, itemResults.size());

}

count++;

}

}

方案一可以一勞永逸,後續一些進階的操作都可以通過TransportClient去實作,但是如果ES更新的話,就會有問題,官方是在8.0.0才移除該功能,但是阿裡雲的資料訂閱的話隻支援到5.5.3;

方案二暫時不用考慮後續更新的問題,但是隻要涉及到進階操作,都需要自己去實作,例如upsert操作。