Elasticsearch Scroll和Slice Scroll查詢API使用案例
the best elasticsearch highlevel java rest api-----[url=https://my.oschina.net/bboss/blog/1942562]bboss[/url]
[b]本文内容[/b]
[list]
[*]基本scroll api使用
[*]基本scroll api與自定義scorll結果集handler函數結合使用
[*]slice api使用(并行/串行)
[*]slice api使用與自定義scorll結果集handler函數結合使用(并行/串行)
[/list]
本文對應的maven源碼工程:
[url]https://gitee.com/bbossgroups/eshelloword-booter[/url]
[b][size=medium]1.dsl配置檔案定義[/size][/b]
首先定義本文需要的dsl配置檔案
esmapper/scroll.xml
<properties>
<!--
簡單的scroll query案例,複雜的條件修改query dsl即可
-->
<property name="scrollQuery">
<![CDATA[
{
"size":#[size],
"query": {"match_all": {}}
}
]]>
</property>
<!--
簡單的slice scroll query案例,複雜的條件修改query dsl即可
-->
<property name="scrollSliceQuery">
<![CDATA[
{
"slice": {
"id": #[sliceId], ## 必須使用sliceId作為變量名稱
"max": #[sliceMax] ## 必須使用sliceMax作為變量名稱
},
"size":#[size],
"query": {"match_all": {}}
}
]]>
</property>
</properties>
下面介紹scroll各種用法,對應的測試類檔案為:TestScrollAPIQuery
[size=medium][b] 2.基本scroll api使用[/b][/size]
@Test
public void testSimleScrollAPI(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
//scroll分頁檢索
Map params = new HashMap();
params.put("size", 10000);//每頁10000條記錄
//scroll上下文有效期1分鐘,每次scroll檢索的結果都會合并到總得結果集中;資料量大時存在oom記憶體溢出風險,大資料量時可以采用handler函數來處理每次scroll檢索的結果(後面介紹)
ESDatas<Map> response = clientUtil.scroll("demo/_search","scrollQuery","1m",params,Map.class);
List<Map> datas = response.getDatas();
long realTotalSize = datas.size();
long totalSize = response.getTotalSize();
System.out.println("totalSize:"+totalSize);
System.out.println("realTotalSize:"+realTotalSize);
System.out.println("countAll:"+clientUtil.countAll("demo"));
}
[size=medium][b]3.基本scroll api與自定義scorll結果集handler函數結合使用[/b][/size]
@Test
public void testSimleScrollAPIHandler(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
//scroll分頁檢索
Map params = new HashMap();
params.put("size", 5000);//每頁5000條記錄
//采用自定義handler函數處理每個scroll的結果集後,response中隻會包含總記錄數,不會包含記錄集合
//scroll上下文有效期1分鐘;大資料量時可以采用handler函數來處理每次scroll檢索的結果,規避資料量大時存在的oom記憶體溢出風險
ESDatas<Map> response = clientUtil.scroll("demo/_search", "scrollQuery", "1m", params, Map.class, new ScrollHandler<Map>() {
public void handle(ESDatas<Map> response) throws Exception {//自己處理每次scroll的結果
List<Map> datas = response.getDatas();
long totalSize = response.getTotalSize();
System.out.println("totalSize:"+totalSize+",datas.size:"+datas.size());
}
});
System.out.println("response realzie:"+response.getTotalSize());
}
[size=medium][b]4.slice api使用[/b][/size]
[b] 串行[/b]
/**
* 串行方式執行slice scroll操作
*/
@Test
public void testSimpleSliceScrollApi() {
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
//scroll slice分頁檢索,max對應并行度,一般設定為與索引表的shards數一緻
int max = 6;
Map params = new HashMap();
params.put("sliceMax", max);//建議不要大于索引表的shards數
params.put("size", 100);//每頁100條記錄
//scroll上下文有效期1分鐘,每次scroll檢索的結果都會合并到總得結果集中;資料量大時存在oom記憶體溢出風險,大資料量時可以采用handler函數來處理每次slice scroll檢索的結果(後面介紹)
ESDatas<Map> sliceResponse = clientUtil.scrollSlice("demo/_search",
"scrollSliceQuery", params,"1m",Map.class,
false);//false表示串行;如果資料量大,建議采用并行方式來執行
System.out.println("totalSize:"+sliceResponse.getTotalSize());
System.out.println("realSize size:"+sliceResponse.getDatas().size());
}
[b] 并行[/b]
/**
* 并行方式執行slice scroll操作
*/
@Test
public void testSimpleSliceScrollApiParral() {
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
//scroll slice分頁檢索,max對應并行度,一般設定為與索引表的shards數一緻
int max = 6;
Map params = new HashMap();
params.put("sliceMax", max);//這裡設定6個slice,建議不要大于索引表的shards數,必須使用sliceMax作為變量名稱
params.put("size", 100);//每頁100條記錄
//scroll上下文有效期2分鐘,每次scroll檢索的結果都會合并到總得結果集中;資料量大時存在oom記憶體溢出風險,大資料量時可以采用handler函數來處理每次scroll檢索的結果(後面介紹)
ESDatas<Map> sliceResponse = clientUtil.scrollSlice("demo/_search",
"scrollSliceQuery", params,"2m",Map.class,
true);//true表示并行,會從slice scroll線程池中申請sliceMax個線程來并行執行slice scroll檢索操作,大資料量多個shared分片的情況下建議采用并行模式
System.out.println("totalSize:"+sliceResponse.getTotalSize());
System.out.println("realSize size:"+sliceResponse.getDatas().size());
}
[size=medium][b]5.slice api使用與自定義scorll結果集handler函數結合使用[/b][/size]
[b]串行[/b]
/**
* 串行方式執行slice scroll操作
*/
@Test
public void testSimpleSliceScrollApiHandler() {
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
//scroll slice分頁檢索,max對應并行度,一般設定為與索引表的shards數一緻
int max = 6;
Map params = new HashMap();
params.put("sliceMax", max);//這裡設定6個slice,建議不要大于索引表的shards數,必須使用sliceMax作為變量名稱
params.put("size", 1000);//每頁1000條記錄
//采用自定義handler函數處理每個slice scroll的結果集後,sliceResponse中隻會包含總記錄數,不會包含記錄集合
//scroll上下文有效期1分鐘,大資料量時可以采用handler函數來處理每次scroll檢索的結果,規避資料量大時存在的oom記憶體溢出風險
ESDatas<Map> sliceResponse = clientUtil.scrollSlice("demo/_search",
"scrollSliceQuery", params,"1m",Map.class, new ScrollHandler<Map>() {
public void handle(ESDatas<Map> response) throws Exception {//自己處理每次scroll的結果
List<Map> datas = response.getDatas();
long totalSize = response.getTotalSize();
System.out.println("totalSize:"+totalSize+",datas.size:"+datas.size());
}
},
false);//false表示串行,如果資料量大建議采用并行模式
long totalSize = sliceResponse.getTotalSize();
System.out.println("totalSize:"+totalSize);
}
[b]并行[/b]
/**
* 并行方式執行slice scroll操作
*/
@Test
public void testSimpleSliceScrollApiParralHandler() {
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
//scroll slice分頁檢索,max對應并行度,一般設定為與索引表的shards數一緻
int max = 6;
Map params = new HashMap();
params.put("sliceMax", max);//這裡設定6個slice,建議不要大于索引表的shards數,必須使用sliceMax作為變量名稱
params.put("size", 1000);//每頁1000條記錄
//采用自定義handler函數處理每個slice scroll的結果集後,sliceResponse中隻會包含總記錄數,不會包含記錄集合
//scroll上下文有效期1分鐘,大資料量時可以采用handler函數來處理每次scroll檢索的結果,規避資料量大時存在的oom記憶體溢出風險
ESDatas<Map> sliceResponse = clientUtil.scrollSlice("demo/_search",
"scrollSliceQuery", params,"1m",Map.class, new ScrollHandler<Map>() {
public void handle(ESDatas<Map> response) throws Exception {//自己處理每次scroll的結果,注意結果是異步檢索的
List<Map> datas = response.getDatas();
long totalSize = response.getTotalSize();
System.out.println("totalSize:"+totalSize+",datas.size:"+datas.size());
}
},
true);//true表示并行,會從slice scroll線程池中申請sliceMax個線程來并行執行slice scroll檢索操作,大資料量多個shared分片的情況下建議采用并行模式
long totalSize = sliceResponse.getTotalSize();
System.out.println("totalSize:"+totalSize);
}
我們可以在application.properties檔案中增加以下配置來設定slice scroll查詢線程池線程數和等待隊列長度:
[color=blue]elasticsearch.sliceScrollThreadCount 預設值500
elasticsearch.sliceScrollThreadQueue 預設值500[/color]
[b]6 開發交流[/b]
elasticsearch技術交流群:166471282
elasticsearch微信公衆号:
[img]https://static.oschina.net/uploads/space/2017/0617/094201_QhWs_94045.jpg[/img]