天天看點

elasticsearch使用指南之Elasticsearch Document Multi Get API、Bulk API詳解、原理與示例

作者簡介:《RocketMQ技術内幕》作者、中間件興趣圈微信公衆号維護者。

本文将詳細介紹elasticsearch批量擷取API(Multi Get API)和Bulk API的使用。

1、Multi Get API

詳細API如下:

  • public final MultiGetResponse mget(MultiGetRequest multiGetRequest, RequestOptions options) throws IOException
  • public final void mgetAsync(MultiGetRequest multiGetRequest, RequestOptions options, ActionListener listener)

其核心需要關注MultiGetRequest 。

elasticsearch使用指南之Elasticsearch Document Multi Get API、Bulk API詳解、原理與示例

從上面所知,mget及批量擷取文檔,通過add方法添加多個Item,每一個item代表一個檔案擷取請求,其相關字段已在get API中詳細介紹,這裡就不做過多詳解。

Mget API使用示例

public static void testMget() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            MultiGetRequest request = new MultiGetRequest();
            request.add("twitter", "_doc", "10");
            request.add("twitter", "_doc", "11");
            request.add("twitter", "_doc", "12");
            request.add("gisdemo", "_doc", "10");
            MultiGetResponse result = client.mget(request, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }           

傳回的結果其本質是一個 GetResponse的數組,不會因為其中一個失敗,整個請求失敗,但其結果中會标明每一個是否成功。

其傳回結果類圖如下:

elasticsearch使用指南之Elasticsearch Document Multi Get API、Bulk API詳解、原理與示例

其字段過濾(Source filtering)、路由等機制與Get API相同,詳情請參考:

Elasticsearch Document Get API詳解、原理與示例

2、Elasticsearch Bulk API

Bulk API可以在一次API調用中包含多個索引操作,例如更新索引,删除索引等,類比批量操作。

詳細API如下:

  • public final BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException
  • public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener)

2.1 BulkRequest詳解

elasticsearch使用指南之Elasticsearch Document Multi Get API、Bulk API詳解、原理與示例

我們先一一來看一下其核心屬性與與典型方法:

  • final List requests = new ArrayList<>():單個指令容器,DocWriteRequest的子類包括:IndexRequest、UpdateRequest、DeleteRequest。
  • private final Set indices = new HashSet<>():List requests涉及到的索引。

    List

  • protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT:timeout機制,針對一個Bulk請求生效。
  • private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT: waitForActiveShards,針對一個Bulk請求生效,各個請求中waitForActiveShards優先。
  • private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:重新整理政策。
  • private long sizeInBytes = 0:整個Bulk請求的大小。

通過add api為BulkRequest添加一個請求。

2.2 Bulk API請求格式詳解

Bulk Rest請求協定基于如下格式:

POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }           

請求格式如下(restfull):

  • POST請求,其Content-Type為application/x-ndjson。
  • 每一個指令占用兩行,每行的結束字元為rn。
  • 第一行為中繼資料,"opType" : {中繼資料}。
  • 第二行為有效載體(非必選),例如Index操作,其有效載荷為IndexRequest#source字段。
  • opType可選值 index、create、update、delete。

公用中繼資料(index、create、update、delete)如下

1)_index :索引名

2)_type:類型名

3)_id:文檔ID

4)routing:路由值

5)parent

6)version:資料版本号

7)version_type:版本類型

各操作特有中繼資料

1、index | create

1)pipeline

2、update

1)retry_on_conflict :更新沖突時重試次數。

2)_source:字段過濾。

有效載荷說明

其有效載荷為_source字段。

其有效載荷為:partial doc, upsert and script。

3、delete

沒有有效載荷。

請求格式為什麼要設計成metdata+有效載體的方式,主要是為了在接收端節點(所謂的接收端節點是指收到指令的第一節點),隻需解析 metadata,然後将請求直接轉發給對應的資料節點。

2.3 bulk API通用特性分析

2.3.1 版本管理

每一個Bulk條目擁有獨自的version,存在于請求條目的item的中繼資料中。

2.3.2 路由

每一個Bulk條目各自生效。

2.3.3 Wait For Active Shards

通常可以設定BulkRequest#waitForActiveShards來要求Bulk批量執行之前要求處于激活的最小副本數。

2.3.4 Bulk Demo

public static final void testBulk() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12")
                    .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk"));
            UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11")
                        .doc(new IndexRequest("twitter", "_doc", "11")
                                .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update")));
            BulkRequest request = new BulkRequest();
            request.add(indexRequest);
            request.add(updateRequest);
            BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
            for (BulkItemResponse bulkItemResponse : bulkResponse) { 
                if (bulkItemResponse.isFailed()) { 
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); 
                    System.out.println(failure);
                    continue;
                }
                DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 
                if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                        || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
                    IndexResponse indexResponse = (IndexResponse) itemResponse;
                    System.out.println(indexRequest);
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                    UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                    System.out.println(updateRequest);
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                    System.out.println(deleteResponse);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }           

批量更新bulk api就介紹到這裡了。

繼續閱讀