作者簡介:《RocketMQ技術内幕》作者、中間件興趣圈微信公衆号維護者。
本文将詳細介紹elasticsearch批量擷取API(Multi Get API)和Bulk API的使用。
1、Multi Get API
詳細API如下:
- public final MultiGetResponse mget(MultiGetRequest multiGetRequest, RequestOptions options) throws IOException
- public final void mgetAsync(MultiGetRequest multiGetRequest, RequestOptions options, ActionListener listener)
其核心需要關注MultiGetRequest 。

從上面所知,mget及批量擷取文檔,通過add方法添加多個Item,每一個item代表一個檔案擷取請求,其相關字段已在get API中詳細介紹,這裡就不做過多詳解。
Mget API使用示例
public static void testMget() {
RestHighLevelClient client = EsClient.getClient();
try {
MultiGetRequest request = new MultiGetRequest();
request.add("twitter", "_doc", "10");
request.add("twitter", "_doc", "11");
request.add("twitter", "_doc", "12");
request.add("gisdemo", "_doc", "10");
MultiGetResponse result = client.mget(request, RequestOptions.DEFAULT);
System.out.println(result);
} catch (Throwable e) {
e.printStackTrace();
} finally {
EsClient.close(client);
}
}
傳回的結果其本質是一個 GetResponse的數組,不會因為其中一個失敗,整個請求失敗,但其結果中會标明每一個是否成功。
其傳回結果類圖如下:
其字段過濾(Source filtering)、路由等機制與Get API相同,詳情請參考:
Elasticsearch Document Get API詳解、原理與示例2、Elasticsearch Bulk API
Bulk API可以在一次API調用中包含多個索引操作,例如更新索引,删除索引等,類比批量操作。
詳細API如下:
- public final BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException
- public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener)
2.1 BulkRequest詳解
我們先一一來看一下其核心屬性與與典型方法:
- final List requests = new ArrayList<>():單個指令容器,DocWriteRequest的子類包括:IndexRequest、UpdateRequest、DeleteRequest。
-
private final Set indices = new HashSet<>():List requests涉及到的索引。
List
- protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT:timeout機制,針對一個Bulk請求生效。
- private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT: waitForActiveShards,針對一個Bulk請求生效,各個請求中waitForActiveShards優先。
- private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:重新整理政策。
- private long sizeInBytes = 0:整個Bulk請求的大小。
通過add api為BulkRequest添加一個請求。
2.2 Bulk API請求格式詳解
Bulk Rest請求協定基于如下格式:
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
請求格式如下(restfull):
- POST請求,其Content-Type為application/x-ndjson。
- 每一個指令占用兩行,每行的結束字元為rn。
- 第一行為中繼資料,"opType" : {中繼資料}。
- 第二行為有效載體(非必選),例如Index操作,其有效載荷為IndexRequest#source字段。
- opType可選值 index、create、update、delete。
公用中繼資料(index、create、update、delete)如下
1)_index :索引名
2)_type:類型名
3)_id:文檔ID
4)routing:路由值
5)parent
6)version:資料版本号
7)version_type:版本類型
各操作特有中繼資料
1、index | create
1)pipeline
2、update
1)retry_on_conflict :更新沖突時重試次數。
2)_source:字段過濾。
有效載荷說明
其有效載荷為_source字段。
其有效載荷為:partial doc, upsert and script。
3、delete
沒有有效載荷。
請求格式為什麼要設計成metdata+有效載體的方式,主要是為了在接收端節點(所謂的接收端節點是指收到指令的第一節點),隻需解析 metadata,然後将請求直接轉發給對應的資料節點。
2.3 bulk API通用特性分析
2.3.1 版本管理
每一個Bulk條目擁有獨自的version,存在于請求條目的item的中繼資料中。
2.3.2 路由
每一個Bulk條目各自生效。
2.3.3 Wait For Active Shards
通常可以設定BulkRequest#waitForActiveShards來要求Bulk批量執行之前要求處于激活的最小副本數。
2.3.4 Bulk Demo
public static final void testBulk() {
RestHighLevelClient client = EsClient.getClient();
try {
IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12")
.source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk"));
UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11")
.doc(new IndexRequest("twitter", "_doc", "11")
.source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update")));
BulkRequest request = new BulkRequest();
request.add(indexRequest);
request.add(updateRequest);
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.println(failure);
continue;
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
System.out.println(indexRequest);
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
System.out.println(updateRequest);
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
System.out.println(deleteResponse);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
EsClient.close(client);
}
}
批量更新bulk api就介紹到這裡了。