一、介紹
此工具類涉及的方面較為廣泛,不僅包括了衆多的es查詢api,對index的建立、判斷等也進行了內建。
二、完整代碼示例
注:關于對es的執行個體建立可在config配置類中完成。
package com.xw.sun.train.util;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* @author sunxw
* @create_time 2021/7/13 9:03
* @since 1.0-Snapshoot
* @description ES工具類
*/
@Slf4j
//@Component
public class ElasticSearchUtils {
@Value("${spring.elasticsearch.rest.uris}")
private String uris;
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 在Servlet容器初始化前執行
*/
@PostConstruct
private void init() {
try {
if (restHighLevelClient != null) {
restHighLevelClient.close();
}
if (StringUtils.isBlank(uris)) {
log.error("spring.elasticsearch.rest.uris is blank");
return;
}
//解析yml中的配置轉化為HttpHost數組
String[] uriArr = uris.split(",");
HttpHost[] httpHostArr = new HttpHost[uriArr.length];
int i = 0;
for (String uri : uriArr) {
if (StringUtils.isEmpty(uris)) {
continue;
}
try {
//拆分出ip和端口号
String[] split = uri.split(":");
String host = split[0];
String port = split[1];
HttpHost httpHost = new HttpHost(host, Integer.parseInt(port), "http");
httpHostArr[i++] = httpHost;
} catch (Exception e) {
log.error(e.getMessage());
}
}
RestClientBuilder builder = RestClient.builder(httpHostArr);
restHighLevelClient = new RestHighLevelClient(builder);
} catch (IOException e) {
log.error(e.getMessage());
}
}
/**
* 建立索引
*
* @param index
* @return
*/
public boolean createIndex(String index) throws IOException {
if (isIndexExist(index)) {
log.error("Index is exits!");
return false;
}
//1.建立索引請求
CreateIndexRequest request = new CreateIndexRequest(index);
//2.執行用戶端請求
CreateIndexResponse response = restHighLevelClient.indices()
.create(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
}
/**
* 判斷索引是否存在
*
* @param index
* @return
*/
public boolean isIndexExist(String index) throws IOException {
GetIndexRequest request = new GetIndexRequest(index);
return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
}
/**
* 删除索引
*
* @param index
* @return
*/
public boolean deleteIndex(String index) throws IOException {
if (!isIndexExist(index)) {
log.error("Index is not exits!");
return false;
}
DeleteIndexRequest request = new DeleteIndexRequest(index);
AcknowledgedResponse delete = restHighLevelClient.indices()
.delete(request, RequestOptions.DEFAULT);
return delete.isAcknowledged();
}
/**
* 新增/更新資料
*
* @param object 要新增/更新的資料
* @param index 索引,類似資料庫
* @param id 資料ID
* @return
*/
public String submitData(Object object, String index,String type, String id) throws IOException {
if (null == id) {
return addData(object, index,type);
}
if (this.existsById(index,type, id)) {
return this.updateDataByIdNoRealTime(object, index,type, id);
} else {
return addData(object, index, type,id);
}
}
/**
* 新增資料,自定義id
*
* @param object 要增加的資料
* @param index 索引,類似資料庫
* @param id 資料ID,為null時es随機生成
* @return
*/
public String addData(Object object, String index,String type, String id) throws IOException {
if (null == id) {
return addData(object, index,type);
}
if (this.existsById(index,type, id)) {
return this.updateDataByIdNoRealTime(object, index,type, id);
}
//建立請求
IndexRequest request = new IndexRequest(index);
request.id(id);
request.timeout(TimeValue.timeValueSeconds(1));
//将資料放入請求 json
request.source(JSON.toJSONString(object), XContentType.JSON);
//用戶端發送請求
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
log.info("添加資料成功 索引為: {}, response 狀态: {}, id為: {}", index, response.status().getStatus(), response.getId());
return response.getId();
}
/**
* 資料添加 随機id
*
* @param object 要增加的資料
* @param index 索引,類似資料庫
* @return
*/
public String addData(Object object, String index ,String type) throws IOException {
return addData(object, index,type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
}
/**
* 通過id删除指定庫表中的資料
* @param index
* @param type
* @param id
* @return
* @throws IOException
*/
public String deleteDataById(String index,String type, String id) throws IOException {
DeleteRequest request = new DeleteRequest(index, type,id);
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
return deleteResponse.getId();
}
/**
* 通過ID 更新資料
*
* @param object 要更新資料
* @param index 索引,類似資料庫
* @param id 資料ID
* @return
*/
public String updateDataById(Object object, String index,String type, String id) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(index,type, id);
updateRequest.timeout("1s");
updateRequest.doc(JSON.toJSONString(object), XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引為: {}, id為: {},updateResponseID:{}, 更新資料成功", index, id, updateResponse.getId());
return updateResponse.getId();
}
/**
* 通過ID 更新資料,保證明時性
*
* @param object 要增加的資料
* @param index 索引,類似資料庫
* @param id 資料ID
* @return
*/
public String updateDataByIdNoRealTime(Object object, String index, String type, String id) throws IOException {
//更新請求
UpdateRequest updateRequest = new UpdateRequest(index,type, id);
//保證資料實時更新
updateRequest.setRefreshPolicy("wait_for");
updateRequest.timeout("1s");
updateRequest.doc(JSON.toJSONString(object), XContentType.JSON);
//執行更新請求
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引為: {}, id為: {},updateResponseID:{}, 實時更新資料成功", index, id, updateResponse.getId());
return updateResponse.getId();
}
/**
* 通過ID擷取資料
*
* @param index 索引,類似資料庫
* @param id 資料ID
* @param fields 需要顯示的字段,逗号分隔(預設為全部字段)
* @return
*/
public Map<String, Object> searchDataById(String index, String type, String id, String fields) throws IOException {
GetRequest request = new GetRequest(index,type, id);
if (StringUtils.isNotEmpty(fields)) {
//隻查詢特定字段。如果需要查詢所有字段則不設定該項。
request.fetchSourceContext(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
}
GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
return response.getSource();
}
/**
* 通過ID判斷文檔是否存在
*
* @param index 索引,類似資料庫
* @param id 資料ID
* @return
*/
public boolean existsById(String index,String type, String id) throws IOException {
GetRequest request = new GetRequest(index,type, id);
//不擷取傳回的_source的上下文
request.fetchSourceContext(new FetchSourceContext(false));
request.storedFields("_none_");
return restHighLevelClient.exists(request, RequestOptions.DEFAULT);
}
/**
* 批量插入false成功
*
* @param index 索引,類似資料庫
* @param objects 資料
* @return
*/
public boolean bulkPost(String index, List<?> objects) {
BulkRequest bulkRequest = new BulkRequest();
BulkResponse response = null;
//最大數量不得超過20萬
for (Object object : objects) {
IndexRequest request = new IndexRequest(index);
request.source(JSON.toJSONString(object), XContentType.JSON);
bulkRequest.add(request);
}
try {
response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return null != response && response.hasFailures();
}
/**
* 擷取低水準用戶端
*
* @return
*/
public RestClient getLowLevelClient() {
return restHighLevelClient.getLowLevelClient();
}
/**
* 高亮結果集 特殊處理
* map轉對象 JSONObject.parseObject(JSONObject.toJSONString(map), Content.class)
*
* @param searchResponse
* @param highlightField
*/
private List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
//解析結果
ArrayList<Map<String, Object>> list = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
Map<String, HighlightField> high = hit.getHighlightFields();
HighlightField title = high.get(highlightField);
//原來的結果
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
//解析高亮字段,将原來的字段換為高亮字段
if (title != null) {
Text[] texts = title.fragments();
StringBuilder nTitle = new StringBuilder();
for (Text text : texts) {
nTitle.append(text);
}
//替換
sourceAsMap.put(highlightField, nTitle.toString());
}
list.add(sourceAsMap);
}
return list;
}
/**
* 查詢并分頁
*
* @param index 索引名稱
* @param query 查詢條件
* @param highlightField 高亮字段
* @return
*/
public List<Map<String, Object>> searchListData(String index,
SearchSourceBuilder query,
String highlightField) throws IOException {
SearchRequest request = new SearchRequest(index);
//高亮
HighlightBuilder highlight = new HighlightBuilder();
highlight.field(highlightField);
//關閉多個高亮
highlight.requireFieldMatch(false);
highlight.preTags("<span style='color:red'>");
highlight.postTags("</span>");
query.highlighter(highlight);
//不傳回源資料。隻有條數之類的資料。
//builder.fetchSource(false);
request.source(query);
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
log.info("totalHits:" + response.getHits().getTotalHits());
if (response.status().getStatus() == 200) {
// 解析對象
return setSearchResponse(response, highlightField);
}
return null;
}
}