天天看点

【ElasticSearch实战】——封装java操作es基础架构

创建ES专栏很久了,但是写的文章非常的少,实在是项目比较忙,2018年最后一天了也该总结一下对es的封装的东西了,这篇博客我们通过java对es批量插入为主流程,来介绍一下java对接es 的全部过程;

需求处理流程:

 从mysql中插入手机号7位所有的组合,然后通过程序处理补充后四位所有的可能,然后组成一个庞大的手机号码库,然后在讲手机号加密,为其他的应用提供 手机号明密文转换服务;

1、引入jar

<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.0.1</version>
        </dependency>
           

2、根据配置创建索引

    2.1、配置文件内容:

es.cluster.name=elasticsearch
es.cluster.nodes=39.**.45.**:9201  #集群ip端口都好分隔
es.client.useragent=ACS/Search 1.0
es.index.casetel={"settings":{"number_of_shards":2,"number_of_replicas":1,"max_result_window":10000000},"mappings":{"casetel":{"properties":{"telId":{"type":"long"},"tel":{"type":"keyword"},"encryptTel":{"type":"keyword"}}}}}
           

   2.2、 配置文件对应 操作类

package com.zqf.search.es.model;


import com.zqf.common.utils.JsonUtils;

/**
  * @description
  * @param
  * @date 10:18 2018/12/9
  * @return
  * @author zhenghao
*/
public class ElasticSearchConfig {
    private String clusterName;
    private String nodes;
    private String userAgent;
    private String caseTel;

    public String getClusterName() {
        return clusterName;
    }

    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }

    public String getNodes() {
        return nodes;
    }

    public void setNodes(String nodes) {
        this.nodes = nodes;
    }

    public String getUserAgent() {
        return userAgent;
    }

    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }

    public String getCaseTel() {
        return caseTel;
    }

    public void setCaseTel(String caseTel) {
        this.caseTel = caseTel;
    }

    @Override
    public String toString() {
        return JsonUtils.writeValue(this);
    }
}
           

通过 spring管理配置实体类:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
	   http://www.springframework.org/schema/beans/spring-beans.xsd"
       default-lazy-init="false">

    <bean id="esNodeConfig" class="com.zqf.search.es.model.ElasticSearchConfig">
        <property name="clusterName" value="${es.cluster.name}" />
        <property name="nodes" value="${es.cluster.nodes}" />
        <property name="userAgent" value="${es.client.useragent}" />
        <property name="caseTel" value="${es.index.casetel}" />
    </bean>
</beans>
           

 2.3、系统启动检查是否有索引,如果没有根据配置创建相关索引

package com.zqf.search.es.service;

import java.util.Collections;
import java.util.Map;

import com.sun.org.apache.bcel.internal.generic.NEW;
import com.zqf.search.es.model.ElasticSearchConfig;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;


/**
  * @description
  * @param
  * @date 10:18 2018/12/9
  * @return
  * @author zhenghao
*/
@Service
public class ElasticSearchService implements ApplicationListener<ContextRefreshedEvent> {
    private final Logger log = LoggerFactory.getLogger(ElasticSearchService.class);
    public static final int CONNECT_TIMEOUT_MILLIS = 60 * 1000;
    public static final int SOCKET_TIMEOUT_MILLIS = 5 * 60 * 1000;
    public static final int MAX_RETRY_TIMEOUT_MILLIS = SOCKET_TIMEOUT_MILLIS;
    public static final int CONNECTION_REQUEST_TIMEOUT_MILLIS = SOCKET_TIMEOUT_MILLIS;
    private RestClient restClient;
    private RestHighLevelClient restHighLevelClient;
    private BasicHeader[] basicHeaders;
    @Autowired
    private ElasticSearchConfig  elasticSearchConfig;
    @Autowired
    private IndexService indexService;

    public BulkResponse bulk(BulkRequest bulkRequest) {
        try {
            return restHighLevelClient.bulk(bulkRequest, basicHeaders);
        } catch (Exception e) {
            log.error("bulk exception", e);
        }

        return null;
    }

    public void bulkAsync(BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
        restHighLevelClient.bulkAsync(bulkRequest, listener, basicHeaders);
    }

    public boolean ping() {
        try {
            return restHighLevelClient.ping(basicHeaders);
        } catch (Exception e) {
            log.error("ping exception", e);
        }

        return false;
    }

    public MainResponse info() {
        try {
            return restHighLevelClient.info(basicHeaders);
        } catch (Exception e) {
            log.error("ping exception", e);
        }

        return null;
    }

    public GetResponse get(GetRequest getRequest) {
        try {
            return restHighLevelClient.get(getRequest, basicHeaders);
        } catch (Exception e) {
            log.error("get exception", e);
        }

        return null;
    }

    public void getAsync(GetRequest getRequest, ActionListener<GetResponse> listener) {
        restHighLevelClient.getAsync(getRequest, listener, basicHeaders);
    }

    public boolean exists(GetRequest getRequest) {
        try {
            return restHighLevelClient.exists(getRequest, basicHeaders);
        } catch (Exception e) {
            log.error("exists exception", e);
        }

        return false;
    }

    public void existsAsync(GetRequest getRequest, ActionListener<Boolean> listener) {
        restHighLevelClient.existsAsync(getRequest, listener, basicHeaders);
    }

    public IndexResponse index(IndexRequest indexRequest) {
        try {
            return restHighLevelClient.index(indexRequest, basicHeaders);
        } catch (Exception e) {
            log.error("index exception", e);
        }

        return null;
    }

    public void indexAsync(IndexRequest indexRequest, ActionListener<IndexResponse> listener) {
        restHighLevelClient.indexAsync(indexRequest, listener, basicHeaders);
    }

    public UpdateResponse update(UpdateRequest updateRequest) {
        try {
            return restHighLevelClient.update(updateRequest, basicHeaders);
        } catch (Exception e) {
            log.error("update exception", e);
        }

        return null;
    }

    public void updateAsync(UpdateRequest updateRequest, ActionListener<UpdateResponse> listener) {
        restHighLevelClient.updateAsync(updateRequest, listener, basicHeaders);
    }

    public DeleteResponse delete(DeleteRequest deleteRequest) {
        try {
            return restHighLevelClient.delete(deleteRequest, basicHeaders);
        } catch (Exception e) {
            log.error("delete exception", e);
        }

        return null;
    }

    public void deleteAsync(DeleteRequest deleteRequest, ActionListener<DeleteResponse> listener) {
        restHighLevelClient.deleteAsync(deleteRequest, listener, basicHeaders);
    }

    public SearchResponse search(SearchRequest searchRequest) {
        try {
            return restHighLevelClient.search(searchRequest, basicHeaders);
        } catch (Exception e) {
            log.error("search exception", e);
        }

        return null;
    }

    public void searchAsync(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        restHighLevelClient.searchAsync(searchRequest, listener, basicHeaders);
    }

    public SearchResponse searchScroll(SearchScrollRequest searchScrollRequest) {
        try {
            return restHighLevelClient.searchScroll(searchScrollRequest, basicHeaders);
        } catch (Exception e) {
            log.error("searchScroll exception", e);
        }

        return null;
    }

    public void searchScrollAsync(SearchScrollRequest searchScrollRequest, ActionListener<SearchResponse> listener) {
        restHighLevelClient.searchScrollAsync(searchScrollRequest, listener, basicHeaders);
    }

    public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest) {
        try {
            return restHighLevelClient.clearScroll(clearScrollRequest, basicHeaders);
        } catch (Exception e) {
            log.error("clearScroll exception", e);
        }

        return null;
    }

    public void clearScrollAsync(ClearScrollRequest clearScrollRequest, ActionListener<ClearScrollResponse> listener) {
        restHighLevelClient.clearScrollAsync(clearScrollRequest, listener, basicHeaders);
    }

    public Response performRequest(String method, String endpoint) {
        try {
            return restClient.performRequest(method, endpoint, Collections.emptyMap(), basicHeaders);
        } catch (Exception e) {
            log.error("performRequest exception", e);
        }

        return null;
    }

    public Response performRequest(String method, String endpoint, Map<String, String> params) {
        try {
            return restClient.performRequest(method, endpoint, params, basicHeaders);
        } catch (Exception e) {
            log.error("performRequest exception", e);
        }

        return null;
    }

    public Response performRequest(String method, String endpoint, Map<String, String> params, String jsonBody) {
        try {
            HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON);
            return restClient.performRequest(method, endpoint, params, entity, basicHeaders);
        } catch (Exception e) {
            log.error("performRequest exception", e);
        }

        return null;
    }

    public Response performRequest(String method, String endpoint, Map<String, String> params, String jsonBody, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
        try {
            HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON);
            return restClient.performRequest(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, basicHeaders);
        } catch (Exception e) {
            log.error("performRequest exception", e);
        }

        return null;
    }

    public void performRequestAsync(String method, String endpoint, ResponseListener responseListener) {
        restClient.performRequestAsync(method, endpoint, Collections.emptyMap(), responseListener, basicHeaders);
    }

    public void performRequestAsync(String method, String endpoint, Map<String, String> params, ResponseListener responseListener) {
        restClient.performRequestAsync(method, endpoint, params, responseListener, basicHeaders);
    }

    public void performRequestAsync(String method, String endpoint, Map<String, String> params, String jsonBody, ResponseListener responseListener) {
        HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON);
        restClient.performRequestAsync(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, responseListener, basicHeaders);
    }

    public void performRequestAsync(String method, String endpoint, Map<String, String> params, String jsonBody, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, ResponseListener responseListener) {
        HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON);
        restClient.performRequestAsync(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, responseListener, basicHeaders);
    }

    /**
     * spring容器初始化完成后执行,确保可以正确初始化搜索客户端
     * @param contextRefreshedEvent
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        initClient();
    }

    private void initClient() {
        if (restHighLevelClient == null) {
            synchronized (this) {
                if (restHighLevelClient == null) {
                    String[] nodes = elasticSearchConfig.getNodes().split(",");
                    HttpHost[] hosts = new HttpHost[nodes.length];
                    for (int i = 0; i < nodes.length; i++) {
                        hosts[i] = HttpHost.create(nodes[i]);
                    }

                    basicHeaders = new BasicHeader[] {
                            new BasicHeader("Accept", "application/json; charset=UTF-8"),
                            //new BasicHeader("Accept-Encoding", "gzip, deflate"),
                            new BasicHeader("User-Agent", elasticSearchConfig.getUserAgent())
                    };

                    RestClientBuilder restClientBuilder = RestClient.builder(hosts);
                    restClientBuilder.setDefaultHeaders(basicHeaders).setMaxRetryTimeoutMillis(MAX_RETRY_TIMEOUT_MILLIS).setRequestConfigCallback((RequestConfig.Builder requestConfigBuilder) -> {
                            requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
                            requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
                            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
                            return requestConfigBuilder;
                    });

                    // for the RestHighLevelClient
                    restClient = restClientBuilder.build();
                    restHighLevelClient = new RestHighLevelClient(restClientBuilder);
                    // 检查索引是否存在
                    indexService.checkIndexIsExist();
                }
            }
        }
    }
}
           

3、批量向索引 中插入数据service

package com.zqf.search.es.service;

import com.zqf.common.utils.SHA;
import com.zqf.common.utils.service.ThreadPoolService;
import com.zqf.db.urgerobot.enhance.dao.NativeSqlMapper;
import com.zqf.search.es.model.ElasticSearchConfig;
import org.apache.commons.collections.map.HashedMap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.collect.HppcMaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.servlet.http.HttpServletResponse;
import java.util.*;

/**
 * @author zhenghao
 * @description
 * @date 2018/12/8 13:12
 */
@Service
public class IndexService {

    private static Logger log = LoggerFactory.getLogger(IndexService.class);
    @Autowired
    private ElasticSearchService elasticSearchService;
    @Autowired
    private NativeSqlMapper nativeSqlMapper;
    @Autowired
    public ElasticSearchConfig elasticSearchConfig;
    @Autowired
    private ThreadPoolService threadPoolService;

    /**
     * 检查索引是否存在,不存在则创建
     */
    public void checkIndexIsExist() {

        // 检查通话记录索引是否存在
        checkAndCreateIndex("casetel",
                elasticSearchConfig.getCaseTel());
    }

    public void dealTel() {

        threadPoolService.execute(() -> {
            //多线程
            multiDealTel();
        });

    }


    public void multiDealTel() {
        StringBuilder sb = new StringBuilder();
        sb.append("select number tel from sys_number where state = 1");
        List<Map<String, Object>> caseCallInfoList = nativeSqlMapper.execReadSql(sb.toString());

        if (caseCallInfoList == null || caseCallInfoList.size() <= 0) {
            return;
        }
        //启动20个线程处理
        int threadCount = threadPoolService.getSuggestThreadNum();
        log.info("容量线程数" + threadCount);
        int size = caseCallInfoList.size();
        if (size < threadCount) {
            threadCount = caseCallInfoList.size();
        }

        int pageCount = size / threadCount;
        int index = 0;
        List<String> telList = new ArrayList<>();
        for (Map<String, Object> map : caseCallInfoList) {
            index++;
            String tel = (String) map.get("tel");
            if (tel == null) {
                continue;
            }
            telList.add(tel);

            if (index > pageCount) {
                final List<String> finalCaseIds = new ArrayList<>();
                finalCaseIds.addAll(telList);
                threadPoolService.execute(() -> {
                    threadDealTel(finalCaseIds);
                });
                telList.clear();
                index = 0;
            }
        }

        if (telList.size() > 0) {
            final List<String> finalCaseIds1 = telList;
            threadPoolService.execute(() -> {
                threadDealTel(finalCaseIds1);
            });


        }

    }


    /**
     * @param
     * @return
     * @description 多线程跑
     * @date 10:11 2018/12/9
     * @author zhenghao
     */
    public void threadDealTel(List<String> caseCallInfoList) {

        if ((caseCallInfoList == null) || (caseCallInfoList.size() <= 0)) {
            return;
        }

        List<Map<String, Object>> telList = new ArrayList<>();
        for (String tel : caseCallInfoList) {
            for (int i = 0; i <= 9; i++) {
                for (int l = 0; l <= 9; l++) {
                    for (int m = 0; m <= 9; m++) {
                        for (int n = 0; n <= 9; n++) {
                            String str = tel + i + l + m + n;
                            Map<String, Object> strMap = new HashedMap();
                            strMap.put("tel", str);
                            String ret = SHA.getSHA(str);
                            strMap.put("encryptTel", ret);
                            telList.add(strMap);

                            if (telList.size() == 1000) {
                                this.insertEs(telList);
                                log.info("size:" + telList.size());
                                telList.clear();

                            }
                        }
                    }
                }
            }
            if (telList.size() > 0 ) {
                this.insertEs(telList);
                telList.clear();
            }
            StringBuilder sb = new StringBuilder();
            sb.append("update sys_number set state =2 where number = ").append(tel);
            nativeSqlMapper.execWriteSql(sb.toString());
        }

    }

    public void insertEs(List<Map<String, Object>> mapList) {
        BulkRequest bulkRequest = new BulkRequest();
        mapList.forEach(caseCallInfo -> {
            IndexRequest indexRequest = new IndexRequest();
            indexRequest.index("casetel")
                    .type("casetel")
                    .source(caseCallInfo);
            bulkRequest.add(indexRequest);
        });

        elasticSearchService.bulkAsync(bulkRequest, new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkItemResponses) {
                System.out.println("使用时间" + bulkItemResponses.getTook().getMillis());
            }

            @Override
            public void onFailure(Exception e) {
                System.out.println("发生错误原因" + e.getMessage());
            }
        });
    }

    /**
     * 检查索引是否存在,不存在则进行创建
     *
     * @param indexName   索引名字
     * @param indexConfig 索引配置信息
     */

    private void checkAndCreateIndex(String indexName, String indexConfig) {
        Response response = elasticSearchService.performRequest("HEAD",
                String.format("/%s", indexName),
                Collections.emptyMap());
        // 检查索引是否存在,不存在则创建
        if ((response == null) || (response.getStatusLine().getStatusCode() != HttpServletResponse.SC_OK)) {
            elasticSearchService.performRequest("PUT",
                    String.format("/%s/", indexName),
                    Collections.emptyMap(),
                    indexConfig);
        }
    }
}
           

4、查询service

package com.zqf.search.es.service;

import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author zhenghao
 * @description
 * @date 2018/12/9 10:26
 */

@Service
public class QueryService {

    @Autowired
    private ElasticSearchService elasticSearchService;

    /**
      * @description 查询电话号码是否存在
      * @param
      * @date 10:34 2018/12/9
      * @return
      * @author zhenghao
    */
    public Long getTelCount(String tel) {
        if (tel == null) {
            return null;
        }

        /**
         * 构造查询条件
         */
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        String temTel = (tel == null) ? null : tel.replace("*", "").trim();
        boolQueryBuilder.filter(QueryBuilders.wildcardQuery("tel", String.format("*%s*", temTel)));

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQueryBuilder);
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices("casetel").types("casetel").source(searchSourceBuilder);

        SearchResponse searchResponse = elasticSearchService.search(searchRequest);
        SearchHits searchHits = searchResponse.getHits();
        long total = searchHits.getTotalHits();
        return total;
    }

}
           

6、从mysql查询数据

package com.zqf.search.es.service;

import java.util.*;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import com.zqf.db.plugin.Page;
import com.zqf.db.urgerobot.enhance.dao.NativeSqlMapper;
import com.zqf.db.urgerobot.user.dao.RobotUserMapper;
import com.zqf.db.urgerobot.user.model.RobotUser;
import com.zqf.db.urgerobot.user.model.RobotUserExample;


/**
 * @author zhenghao
 * @description
 * @date 2018/5/30 18:29
 */
@Service
public class UserService {

	@Autowired
	private RobotUserMapper userMapper;
	@Autowired
	private NativeSqlMapper nativeSqlMapper;


	/**
	 * @param
	 * @return
	 * @Description: 根据id查询员工的详细信息
	 * @date 19:20 2017/9/28
	 * @author zhenghao
	 */
	public void getUserInfoById(Long userId) {
		StringBuilder sb = new StringBuilder();
		sb.append("select id telId, number tel,area_code encryptTel from management_sys_tel_ownership");
		List<Map<String, Object>> caseCallInfoList = nativeSqlMapper.execReadSql(sb.toString());
		caseCallInfoList.forEach(caseInfo->{
			Long telId = (Long) caseInfo.get("telId");
			String tel = (String) caseInfo.get("tel");
			if (tel.length()!=7){
				StringBuilder sb1 = new StringBuilder();
				sb1.append("delete from management_sys_tel_ownership where id = ").append(telId);
				nativeSqlMapper.execReadSql(sb1.toString());
			}
		});

	}

}
           

7、方法调用

/**
     * @param
     * @return
     * @description 用户登陆
     * @date 0:22 2017/12/20
     * @author zhenghao
     */
    @RequestMapping("/dealData")
    public void login(HttpServletRequest req, HttpServletResponse res) {
         indexService.dealTel();
        ServletUtils.toJson(req,res);
    }
           

总结:

通过上面的几个步骤,我们就可以 快速的搭建起一个search项目,所有 操作es的服务都有这个项目提供,这样其余的项目直接调用接口即可, 从 创建索引到插入数据,查询数据,基本都是模板代码,通过本篇博客可以快速构建!

继续阅读