天天看点

Java中ES5.3批量插入_bulk实现方案

ES5.5.3众所周知是十分尴尬的,用不了RestHighLevelClient, TransportClient可以用但是网上各种不建议使用,其实TransportClient在7.0.0才开始废弃,8.0.0之后才正式移除,现在的5.5.3版本完全是可以光明正大使用的,不过考虑到后面的升级,还是做出了妥协。

这里分享一下两种方案:

方案一:使用TransportClient:

1.pom文件

repository务必要指定,在mvnrepository是没有的,另外网上给出大多数都不用引x-pack-transport,直接使用transport,这对于自建的MySQL数据中是没有问题的,但是要同步阿里云的RDS就会报错

...

<

dependency

>

<

groupId

>org.elasticsearch.client</

groupId

>

<

artifactId

>transport</

artifactId

>

<

version

>5.5.3</

version

>

</

dependency

>

<

dependency

>

<

groupId

>org.elasticsearch.plugin</

groupId

>

<

artifactId

>transport-netty3-client</

artifactId

>

<

version

>5.5.3</

version

>

</

dependency

>

<

dependency

>

<

groupId

>org.elasticsearch.client</

groupId

>

<

artifactId

>x-pack-transport</

artifactId

>

<

version

>5.5.3</

version

>

</

dependency

>

...

<

repository

>

<

id

>elasticsearch-releases</

id

>

<

url

>https://artifacts.elastic.co/maven</

url

>

<

releases

>

<

enabled

>true</

enabled

>

</

releases

>

<

snapshots

>

<

enabled

>false</

enabled

>

</

snapshots

>

</

repository

>

...

2. 配置

需要注意的是的(1)网上普遍的TransportClient都是通过PreBuiltTransportClient,这在RDS上面也是行不通的,需要使用PreBuiltXPackTransportClient;(2)阿里云的clusterName是es的实例id,并不是名称。

@Configuration

@Log4j2

public

class

ElasticSearchConfig {

@Value

(

"${icec.elasticsearch.host}"

)

private

String host;

@Value

(

"${icec.elasticsearch.tcpPort}"

)

private

int

tcpPort;

@Value

(

"${icec.elasticsearch.clusterName}"

)

private

String clusterName;

@Value

(

"${icec.elasticsearch.username}"

)

private

String username;

@Value

(

"${icec.elasticsearch.password}"

)

private

String password;

@Bean

public

TransportClient transportClient() {

TransportClient transportClient = 

null

;

TransportClient preBuiltTransportClient = 

new

PreBuiltXPackTransportClient(Settings.builder()

.put(

"cluster.name"

, clusterName)

.put(

"xpack.security.user"

, username + 

":"

+ password)

.put(

"client.transport.sniff"

false

)

.build());

try

{

transportClient = preBuiltTransportClient

.addTransportAddress(

new

InetSocketTransportAddress(InetAddress.getByName(host), tcpPort));

catch

(UnknownHostException e) {

log.warn(e);

}

return

transportClient;

}

}

3. 使用示例:

public

void

batchInsert(List<Map> datas) {

if

(CollectionUtils.isEmpty(datas)) {

return

;

}

BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();

datas.forEach(data -> bulkRequestBuilder.add(transportClient.prepareIndex(ES_INDEX, ES_TYPE, (String) data.get(OrderEsConstant.ORDER_ID)).setSource(data)));

BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();

List<String> failuerMessages = Arrays.asList(bulkResponse.getItems())

.stream().filter(p -> StringUtils.isNotBlank(p.getFailureMessage()))

.map(BulkItemResponse::getFailureMessage).collect(Collectors.toList());

if

(CollectionUtils.isNotEmpty(failuerMessages)) {

log.info(

"同步失败订单->{}"

, failuerMessages);

}

log.info(

"批处理完成 总条数:{}"

, datas.size());

}

方案二:手写批量操作,通过RestClient实现

public

void

batchInsert(List<Map> orders) {

StringBuilder bulkRequestBody = 

new

StringBuilder();

int

count = 

1

;

for

(Map order : orders) {

String actionMetaData = String.format(

"{ \"index\" : {\"_id\" : \"%s\"} }%n"

, order.get(OrderEsConstant.ORDER_ID));

String orderJson = JSON.toJSONString(order, SerializerFeature.WriteNullStringAsEmpty);

bulkRequestBody.append(actionMetaData);

bulkRequestBody.append(orderJson);

bulkRequestBody.append(

"\n"

);

if

(count % 

5000

== 

|| count == orders.size()) {

Response response;

Map result = Maps.newHashMap();

String router = String.format(BASIC_FORMAT, ES_INDEX, ES_TYPE, 

"_bulk"

);

try

{

response = restClient.performRequest(POST, router, Collections.emptyMap(), 

new

StringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON));

result = objectMapper.readValue(response.getEntity().getContent(), Map.

class

);

catch

(IOException e) {

log.info(

"ES批量插入异常"

);

}

List<Map> itemResults = ((List<Map>) result.get(

"items"

)).stream().map(p -> (Map) p.get(

"index"

)).collect(Collectors.toList());

List errorMsgs = itemResults.stream().filter(p -> 

200

!= (Integer) p.get(

"status"

)).map(

this

::getErrorMsg).collect(Collectors.toList());

if

(CollectionUtils.isNotEmpty(errorMsgs)) {

log.warn(

"数据插入失败 -> {}"

, errorMsgs);

}

log.info(

"批处理完成,总条数: {}"

, itemResults.size());

}

count++;

}

}

方案一可以一劳永逸,后续一些高级的操作都可以通过TransportClient去实现,但是如果ES升级的话,就会有问题,官方是在8.0.0才移除该功能,但是阿里云的数据订阅的话只支持到5.5.3;

方案二暂时不用考虑后续升级的问题,但是只要涉及到高级操作,都需要自己去实现,例如upsert操作。