天天看點

[ElasticSearch]Java API 之 滾動搜尋(Scroll API)

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/SunnyYoona/article/details/52810397

一般搜尋請求都是傳回一"頁"資料,無論資料量多大都一起傳回給使用者,Scroll API可以允許我們檢索大量資料(甚至全部資料)。Scroll API允許我們做一個初始階段搜尋并且持續批量從Elasticsearch裡拉取結果直到沒有結果剩下。這有點像傳統資料庫裡的cursors(遊标)。

Scroll API的建立并不是為了實時的使用者響應,而是為了處理大量的資料(Scrolling is not intended for real time user requests, but rather for processing large amounts of data)。從 scroll 請求傳回的結果隻是反映了 search 發生那一時刻的索引狀态,就像一個快照(The

results that are returned from a scroll request reflect the state of the index at the time that the initial search request was made, like a snapshot in time)。後續的對文檔的改動(索引、更新或者删除)都隻會影響後面的搜尋請求。

1. 普通請求

假設我們想一次傳回大量資料,下面代碼中一次請求58000條資料:

  1.    /**

  2.     *  普通搜尋

  3.     * @param client

  4.     */

  5.    public static void search(Client client) {

  6.        String index = "simple-index";

  7.        String type = "simple-type";

  8.        // 搜尋條件

  9.        SearchRequestBuilder searchRequestBuilder = client.prepareSearch();

  10.        searchRequestBuilder.setIndices(index);

  11.        searchRequestBuilder.setTypes(type);

  12.        searchRequestBuilder.setSize(58000);

  13.        // 執行

  14.        SearchResponse searchResponse = searchRequestBuilder.get();

  15.        // 搜尋結果

  16.        SearchHit[] searchHits = searchResponse.getHits().getHits();

  17.        for (SearchHit searchHit : searchHits) {

  18.            String source = searchHit.getSource().toString();

  19.            logger.info("--------- searchByScroll source {}", source);

  20.        } // for

  21.    }

運作結果:

  1. Caused by: QueryPhaseExecutionException[Result window is too large, from + size must be less than or equal to: [10000] but was [58000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.]

  2. at org.elasticsearch.search.internal.DefaultSearchContext.preProcess(DefaultSearchContext.java:212)

  3. at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:103)

  4. at org.elasticsearch.search.SearchService.createContext(SearchService.java:676)

  5. at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:620)

  6. at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:371)

  7. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:368)

  8. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:365)

  9. at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)

  10. at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)

  11. at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)

  12. at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)

  13. ... 3 more

從上面我們可以知道,搜尋請求一次請求最大量為[10000]。我們的請求量已經超标,是以報錯,異常資訊提示我們請求大資料量的情況下使用Scroll API。

2. 使用Scroll API 請求

為了使用 scroll,初始搜尋請求應該在查詢中指定 scroll 參數,告訴 Elasticsearch 需要保持搜尋的上下文環境多長時間(滾動時間)。

  1. searchRequestBuilder.setScroll(new TimeValue(60000));

下面代碼中指定了查詢條件以及滾動屬性,如滾動的有效時長(使用setScroll()方法)。我們通過SearchResponse對象的getScrollId()方法擷取滾動ID。滾動ID會在下一次請求中使用。

  1.    /**

  2.     * 使用scroll進行搜尋

  3.     * @param client

  4.     */

  5.    public static String searchByScroll(Client client) {

  6.        String index = "simple-index";

  7.        String type = "simple-type";

  8.        // 搜尋條件

  9.        SearchRequestBuilder searchRequestBuilder = client.prepareSearch();

  10.        searchRequestBuilder.setIndices(index);

  11.        searchRequestBuilder.setTypes(type);

  12.        searchRequestBuilder.setScroll(new TimeValue(30000));

  13.        // 執行

  14.        SearchResponse searchResponse = searchRequestBuilder.get();

  15.        String scrollId = searchResponse.getScrollId();

  16.        logger.info("--------- searchByScroll scrollID {}", scrollId);

  17.        SearchHit[] searchHits = searchResponse.getHits().getHits();

  18.        for (SearchHit searchHit : searchHits) {

  19.            String source = searchHit.getSource().toString();

  20.            logger.info("--------- searchByScroll source {}", source);

  21.        } // for

  22.        return scrollId;

  23.    }

使用上面的請求傳回的結果中的滾動ID,這個 ID 可以傳遞給 scroll API 來檢索下一個批次的結果。這一次請求中不用添加索引和類型,這些都指定在了原始的 search 請求中。

每次傳回下一個批次結果 直到沒有結果傳回時停止 即hits數組空時(Each call to the 

scroll

 API

returns the next batch of results until there are no more results left to return, ie the 

hits

 array

is empty)。

  1.    /**

  2.     *  通過滾動ID擷取文檔

  3.     * @param client

  4.     * @param scrollId

  5.     */

  6.    public static void searchByScrollId(Client client, String scrollId){

  7.        TimeValue timeValue = new TimeValue(30000);

  8.        SearchScrollRequestBuilder searchScrollRequestBuilder;

  9.        SearchResponse response;

  10.        // 結果

  11.        while (true) {

  12.            logger.info("--------- searchByScroll scrollID {}", scrollId);

  13.            searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);

  14.            // 重新設定滾動時間

  15.            searchScrollRequestBuilder.setScroll(timeValue);

  16.            // 請求

  17.            response = searchScrollRequestBuilder.get();

  18.            // 每次傳回下一個批次結果 直到沒有結果傳回時停止 即hits數組空時

  19.            if (response.getHits().getHits().length == 0) {

  20.                break;

  21.            } // if

  22.            // 這一批次結果

  23.            SearchHit[] searchHits = response.getHits().getHits();

  24.            for (SearchHit searchHit : searchHits) {

  25.                String source = searchHit.getSource().toString();

  26.                logger.info("--------- searchByScroll source {}", source);

  27.            } // for

  28.            // 隻有最近的滾動ID才能被使用

  29.            scrollId = response.getScrollId();

  30.        } // while

  31.    }

備注:

初始搜尋請求和每個後續滾動請求傳回一個新的 滾動ID——隻有最近的滾動ID才能被使用。(The initial search request and each subsequent

scroll request returns a new

_scroll_id

 — only

the most recent 

_scroll_id

 should

be used)  

我每次後續滾動請求傳回的滾動ID都是相同的,是以對上面的備注,不是很懂,有明白的可以告知,謝謝。

如果超過滾動時間,繼續使用該滾動ID搜尋資料,則會報錯:

  1. Caused by: SearchContextMissingException[No search context found for id [2861]]

  2. at org.elasticsearch.search.SearchService.findContext(SearchService.java:613)

  3. at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:403)

  4. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:384)

  5. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:381)

  6. at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)

  7. at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)

  8. at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)

  9. at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)

  10. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

  11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

  12. at java.lang.Thread.run(Thread.java:745)

3. 清除滾動ID

雖然當滾動有效時間已過,搜尋上下文(Search Context)會自動被清除,但是一值保持滾動代價也是很大的,是以當我們不在使用滾動時要盡快使用Clear-Scroll API進行清除。

  1. /**

  2. * 清除滾動ID

  3. * @param client

  4. * @param scrollIdList

  5. * @return

  6. */

  7. public static boolean clearScroll(Client client, List<String> scrollIdList){

  8. ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();

  9. clearScrollRequestBuilder.setScrollIds(scrollIdList);

  10. ClearScrollResponse response = clearScrollRequestBuilder.get();

  11. return response.isSucceeded();

  12. }

  13. /**

  14. * 清除滾動ID

  15. * @param client

  16. * @param scrollId

  17. * @return

  18. */

  19. public static boolean clearScroll(Client client, String scrollId){

  20. ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();

  21. clearScrollRequestBuilder.addScrollId(scrollId);

  22. ClearScrollResponse response = clearScrollRequestBuilder.get();

  23. return response.isSucceeded();

  24. }

4. 參考:
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html http://www.jianshu.com/p/14aa8b09c789
5. 說明

本代碼基于ElasticSearch 2.4.1