1.es簡介
1.1 起源
https://www.elastic.co/cn/what-is/elasticsearch,es的起源,是因為程式員Shay Banon在使用Apache Lucene發現不太好用,然後手動改造更新的過程中發展起來的。(程式員就是需要有這種動力~)實際上es也是一個java應用,跑在jvm裡面的
1.2 與關系型資料庫的差別
關系型資料庫 | schema(庫) | 表 | 每一行的資料 | 字段columns |
elasticsearch | index(索引) | document | 字段fields |
1.3 為什麼這麼快
索引方式的差別,es主要是利用反向索引(inverted index),這個翻譯可能會讓初次接觸的人産生誤解,誤以為是倒着排序?其實不是這樣,一般關系型資料庫索引是把某個字段建立起一張索引表,傳入這個字段的某個值,再去索引中判斷是否有這個值,進而找到這個值所在資料(id)的位置。而反向索引則是把這個值所在的文檔id記錄下來,當輸入這個值的時候,直接查詢出這個值所比對的文檔id,再取出id。是以我們在建立es索引的時候,有分詞的概念,相當于可以把filed字段内容拆分,然後索引記錄下來。例如“我愛中國”,可以拆分成“我”,“愛”,“中國”,“我愛中國”這五個詞,同時記錄下來這幾個關鍵詞的對應文檔資料id是1,當我們查詢“我”,“中國”時,都能查出這條資料。而如果使用關系型資料庫去查包含“中國”這個關鍵字的資料的時候,則需要like前後通配全表掃描,無法快速找到關鍵詞所在的資料行。
1.4 下載下傳安裝
https://www.elastic.co/cn/start 在這個位址裡面下載下傳最新版本,目前是7.10.2(拖了一個月寫完,我下載下傳的時候是7.9.3- -!)

Windows版是一個壓縮封包件,解壓後(進入bin點開bat)即可使用。Linux版由于是直接在k8s裡拉的鏡像,這裡就不做贅述。
啟動完成之後通路:http://127.0.0.1:9200/,看見如下頁面:You Know, for Search,就算啟動成功啦。
1.5 安裝可視化軟體
像資料庫一樣,可視化界面有Navicat,SQLyog,MySql自帶的Workbench。es也是需要一個可視化ui界面來友善我們操作的。這裡選擇的也是官方的的kibana:
https://www.elastic.co/cn/downloads/kibana :
請注意需要選擇與es比對的版本,如果版本不比對,則會提示你:
或者是其他類似版本不比對的錯誤。
安裝完成後就可以打開kibana玩耍啦,由于我本地沒有資料,拿的是7.6.2版本搭建的elk中kibana界面:
如果需要連接配接環境上的es,則可以在這裡配置使用者名和密碼:
這個工具的搜尋很友善,不需要指定查哪個字段的哪個值,直接在輸入框搜尋想要查詢的字段即可。如果想看他對應的查詢語句,點開F12打開控制台即可研究:
es的查詢條件還是比較複雜的,但是在業務查詢當中,一些比較簡單的查詢就可以滿足大多數的通用分頁查詢了,除非是要開發報表查詢,會複雜一些。
1.6 機器要求
本地跑demo的話還是很容易的,這兩個應用預設占用記憶體都不大,有需求可以自行調小一點:
2.Java中使用Elasticsearch
2.1 使用spring-data提供的封裝
2.1.1 maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2.1.2 yml參數
2.1.3 代碼中映射索引實體
其中“omsElasticsearchSettings”這一段的意思是像mybatis那樣解析表達式,找到omsElasticsearchSettings這個bean的getSuffix方法擷取前字尾。這樣就可以實作動态的根據環境生成映射對應的索引
1 @Configuration
2 @AllArgsConstructor
3 public class ElasticsearchConfig {
5 private final Environment env;
7 @Bean
8 public ElasticsearchSettings omsElasticsearchSettings(){
9 return new ElasticsearchSettings().setSuffix(env.getActiveProfiles()[0]);
10 }
12 }
13
16 @Data
17 @Accessors(chain = true)
18 public class ElasticsearchSettings {
20 public String suffix;
22 }
2.1.4 索引mapping生成
在啟動項目的時候,SpringData會檢測配置中的es裡是否存在對應索引,如不存在,則會根據@Document實體中配置的@Field字段來生成mapping檔案:
生成的Mapping Demo如下:
PUT om_package_dev/?pretty
{
"settings": {
"number_of_shards" :1,
"number_of_replicas" : 1
},
"mappings": {
"properties": {
"_class": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"actualFreightCost": {
"type": "double"
},
"actualPackageCost": {
"type": "double"
}
}
}
}
2.1.5 增删改
建立一個@Repository像mybatis一樣來做增删改查的映射封裝:
底層是SpringData提供封裝的統一方法:
儲存資料的時候直接調用即可:
一般來說訂單這些重要資料不會删除,要删除也是邏輯删除,是以删除接口基本不調用。直接更新邏輯删除值就好。更新也是調用這個:save/saveAll
2.1.6 查
查是Es的重頭戲,我們打開org.elasticsearch.index.query.AbstractQueryBuilder檢視實作類可以發現,繼承這個抽象類的各種查詢類有四五十個之多,不得不讓人感歎es的查詢強大,(與反人類,學習成本太高了)。
好消息是,如果業務場景不複雜,僅僅是想在分頁查詢上提高速度,那麼隻需要掌握一下幾個類的用法即可:
我們封裝了兩個查詢枚舉,一個用來定義該實體是es查詢條件實體@interface QueryEntity:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface QueryEntity {
String[] dbOrders() default {};
String[] esOrders() default {};
String dbLogicField() default "";
String esLogicField() default "";
}
另外一個是用來定義字段,即使用es的哪個條件去查詢@interface QueryField:
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueryField {
String esField() default "";
String dbField() default "";
boolean like() default false;
boolean range() default false;
boolean require() default false;
boolean match() default false;
boolean commaSupported() default false;
boolean isBigDecimal() default false;
Class<?> searchTypeEnum() default void.class;
Class<?> sortTypeEnum() default void.class;
}
對應到實體上的用法demo就是:
這樣可以支援區間查詢,字段類型,對應es字段,從設計上規避了根據每個字段,調用每個拼接語句的上百個if/else噩夢。通過一個通用的查詢工具類,來封裝拼接這些查詢條件QueryUtils:
@Slf4j
public class QueryUtils {
private static ConcurrentHashMap<Class<?>, HashMap<String, Field>> classFieldMap = new ConcurrentHashMap<>();
/**
* 建構查詢
*
* @param obj
* @return 若為 null 說明該查詢必定不會傳回結果,無需查詢 ES
*/
public static BoolQueryBuilder boolQuery(Object obj) {
if (obj == null) {
return null;
}
BoolQueryBuilder root = QueryBuilders.boolQuery();
if (!classFieldMap.containsKey(obj.getClass())) {
HashMap<String, Field> filedNameMap = new HashMap<>(obj.getClass().getDeclaredFields().length);
for (Field field : obj.getClass().getDeclaredFields()) {
filedNameMap.put(field.getName(), field);
}
classFieldMap.put(obj.getClass(), filedNameMap);
}
HashMap<String, Field> filedNameMap = classFieldMap.get(obj.getClass());
QueryEntity entitySetting = obj.getClass().getAnnotation(QueryEntity.class);
for (Field field : filedNameMap.values()) {
QueryField fieldSetting;
if ((fieldSetting = field.getAnnotation(QueryField.class)) == null) {
continue;
}
Object value = ReflectionUtil.getValue(field, obj);
if (isNullOrEmpty(value)) {
if (!fieldSetting.require()) {
continue;
}
return null;
}
String fieldName = getEsQueryFieldName(field, fieldSetting);
if (fieldSetting.range()) {
BoolQueryBuilder bool = QueryBuilders.boolQuery();
String[] arr = (String[]) value;
RangeQueryBuilder range = QueryBuilders.rangeQuery(fieldName);
if (arr.length != 2 || (StringUtils.isEmpty(arr[0]) && StringUtils.isEmpty(arr[1]))) {
continue;
}
if (!StringUtils.isEmpty(arr[0]) && StringUtils.isEmpty(arr[1])) {
bool.must(range.from(
fieldSetting.isBigDecimal() ? new BigDecimal(arr[0]) : DateUtil.parseAndGetTimestamp(arr[0])));
} else if (StringUtils.isEmpty(arr[0]) && !StringUtils.isEmpty(arr[1])) {
bool.must(range.to(fieldSetting.isBigDecimal() ? new BigDecimal(arr[1]) : DateUtil.parseAndGetTimestamp(arr[1])));
} else {
bool.must(range.from(fieldSetting.isBigDecimal() ? new BigDecimal(arr[0]) : DateUtil.parseAndGetTimestamp(arr[0])).
to(fieldSetting.isBigDecimal() ? new BigDecimal(arr[1]) : DateUtil.parseAndGetTimestamp(arr[1])));
}
root.must(bool);
} else if (field.getType() == List.class) {
assert value instanceof List<?>;
List<?> list = (List<?>) value;
if (CollectionUtils.isEmpty(list)) {
if (fieldSetting.require()) {
return null;
}
continue;
}
if (list.get(0) instanceof StoreListBO) {
BoolQueryBuilder bool1 = QueryBuilders.boolQuery();
for (Object store : list) {
StoreListBO bo = (StoreListBO) store;
BoolQueryBuilder bool2 = QueryBuilders.boolQuery();
if (!bo.getFlagAll()) {
bool2.must(QueryBuilders.termQuery("platformCode", bo.getPlatformCode()));
bool2.must(QueryBuilders.termsQuery("storeCode", bo.getStoreCodeList()));
}
bool1.should(bool2);
}
root.must(bool1);
} else {
root.must(QueryBuilders.termsQuery(fieldName, (List<?>) value));
}
} else if (fieldSetting.like()) {
root.must(QueryBuilders.wildcardQuery(fieldName, String.format("*%s*", value)));
} else if (fieldSetting.commaSupported()) {
root.must(QueryBuilders.termsQuery(fieldName, StringUtility.splitCommaString((String) value)));
} else if (fieldSetting.match()) {
if (fieldSetting.commaSupported()) {
root.must(QueryBuilders.multiMatchQuery(fieldName, StringUtility.splitCommaString((String) value)));
} else {
root.must(QueryBuilders.matchQuery(fieldName, value));
}
} else if (fieldSetting.searchTypeEnum().isEnum()) {
try {
Object[] objects = fieldSetting.searchTypeEnum().getEnumConstants();
if (objects[0] instanceof IEsSearchTypeEnum) {
IEsSearchTypeEnum searchTypeEnum = (IEsSearchTypeEnum) objects[0];
fieldName = searchTypeEnum.getFiledName((Integer) value);
Field filed = filedNameMap.get(IEsSearchTypeEnum.searchContent);
filed.setAccessible(true);
String searchContent = (String) ReflectUtil.getField(filed, obj);
if (!StringUtils.isEmpty(fieldName) && !StringUtils.isEmpty(searchContent)) {
root.must(QueryBuilders.termsQuery(fieldName, searchContent.split(",")));
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("拼接搜尋類型有誤:", e.getMessage());
}
} else if (fieldSetting.sortTypeEnum().isEnum()) {
continue;
} else {
root.must(QueryBuilders.termQuery(fieldName, value));
}
}
if (entitySetting != null) {
if (!StringUtils.isEmpty(entitySetting.esLogicField())) {
root.must(QueryBuilders.termQuery(entitySetting.esLogicField(), LogicValueConstants.NORMAL));
}
}
root.must(QueryBuilders.termQuery("tenantId", AuthUtil.getTenantId()));
log.info("query : {}", Strings.toString(root));
return root;
}
private static boolean isNullOrEmpty(Object value) {
return Objects.isNull(value) || isEmptyString(value) || isEmptyCollection(value);
}
private static String getEsQueryFieldName(Field field, QueryField fieldSetting) {
return StringUtils.isEmpty(fieldSetting.esField()) ?
field.getName() : fieldSetting.esField();
}
private static boolean isEmptyCollection(Object value) {
return (value instanceof Collection) && CollectionUtils.isEmpty((Collection<?>) value);
}
private static boolean isEmptyString(Object value) {
return (value instanceof String) && StringUtils.isEmpty(value);
}
public static void handlePageable(Object obj, NativeSearchQueryBuilder builder) {
if (obj instanceof PageDTO) {
PageDTO pageDTO = (PageDTO) obj;
builder.withPageable(PageRequest.of(pageDTO.currForEsPaging(), pageDTO.size()));
}
}
public static void dealSort(Object obj, NativeSearchQueryBuilder builder) {
// 預設按最後更新時間倒序
String fieldName = null;
Boolean isAsc = false;
Boolean asc2Desc;
try {
HashMap<String, Field> fieldNameMap = classFieldMap.get(obj.getClass());
Field sortTypeField = fieldNameMap.get(IEsSortTypeEnum.SORT_TYPE);
if (sortTypeField != null) {
QueryField fieldSetting = sortTypeField.getAnnotation(QueryField.class);
if (fieldSetting != null && fieldSetting.sortTypeEnum().isEnum()) {
Object[] objects = fieldSetting.sortTypeEnum().getEnumConstants();
if (objects[0] instanceof IEsSortTypeEnum) {
IEsSortTypeEnum sortTypeEnum = (IEsSortTypeEnum) objects[0];
fieldName = sortTypeEnum.getFiledName((Integer) sortTypeField.get(obj));
asc2Desc = sortTypeEnum.getAsc2Desc((Integer) sortTypeField.get(obj));
Field filed = ReflectUtil.getField(obj.getClass(), IEsSortTypeEnum.SORT_ASC);
filed.setAccessible(true);
isAsc = (Boolean) ReflectUtil.getField(filed, obj);
if (isAsc != null) {
isAsc = asc2Desc ? !isAsc : isAsc;
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("拼接排序類型有誤:", e);
}
builder.withSort(SortBuilders.fieldSort(fieldName == null ? IEsSortTypeEnum.DEFAULT_SORT_FILED : fieldName).order(isAsc == null ? SortOrder.DESC :
isAsc ? SortOrder.ASC : SortOrder.DESC));
log.info("es 排序參數" + Strings.toString(builder.build().getElasticsearchSorts().get(0)));
}
支援排序拼接、count統計類型拼接、時間區間拼接,金額拼接、list集合查詢拼接,輸入多個單号的時候,通過分隔符分隔拼接
實體搜尋類型
(PS:1.8新增了枚舉類可以實作接口,這樣枚舉用起來也很舒服了)
這兩個枚舉類的作用主要是适配按照不同的搜尋條件以及排序條件排序
3.千萬級資料測試
3.1資料準備
标題寫的那麼誇張,千萬級資料,哈哈,其實就是老套路搞了個存儲過程往資料庫塞一千萬資料,然後同步到es測試啦
DROP PROCEDURE IF EXISTS test;
DELIMITER $$
CREATE PROCEDURE test()
BEGIN
DECLARE v_i INT UNSIGNED DEFAULT 10000001;
WHILE v_i < 10000894 DO
INSERT INTO ‘test’ VALUES('v_i')
SET v_i = v_i+1;
END WHILE;
END $$
DELIMITER ;
CALL test();
言歸正傳,測試的目的有兩個,一個是背景任務的同步代碼情況,驗證資料庫與es的資料一緻性,同時估算性能,做到上線時遷移資料心中有數。二是模拟es在大資料量的情況下會不會有什麼影響。
最終es結果如下:
- 背景任務分頁查資料庫,每頁五千條,加上其他關聯表查詢,5000條差不多1~2秒。機器性能i7 9700 32g,10000000 / 5000 * 2s / 3600s = 大概一個小時左右同步完成。一千萬資料大概占用1.3gb空間,要根據mapping字段多少來看。僅供參考
- 分頁這邊需要調整參數:
要不然es預設隻能查出最大10000條。同時也需要調整es的參數:
PUT om_package_dev/_settings
{
"index" : {"max_result_window" : 10000000}
}
其實這個地方可以從業務角度思考一下,es預設10000也不是沒有道理。對于大資料量,精準點選第666666頁的人都是像我這樣吃飽了沒事幹的。點到那頁去幹嘛?點之前你也不知道那頁有啥呀。。。并且es分頁效率也很低,選最後一頁很慢。大資料量如果需要查詢,一般根據條件精準查詢。目前這點資料量查詢還是非常快的。
4.小結
4.1 資料一緻性
目前我們的方案主要是靠代碼層面實作。當資料有變動時,發送一條消息給mq,由mq異步去同步es。同時,有一個背景任務一直在跑三分鐘(根據資料量決定)以内的資料,以防mq失效有一個兜底任務。當然還有其他方案,比如通過MySQL的binlog寫到es裡面去,這種方案對性能要求高,同時需要引入第三方元件。最終我們選擇了代碼層面自己比較可控的一種方案。
4.2 elasticsearch-sql
從開始看見拼es查詢條件,就在想如果能直接把sql轉化成es就好了。後來搜了一下,果然有這種好東西,是中國自然語言處理開源組織提供的插件。但是已經寫完了通用查詢,就沒有去研究這個插件怎麼用。有興趣的小夥伴可以試試。https://github.com/NLPchina/elasticsearch-sql 另外:kibana的工具控制台也可以直接發送sql請求
POST _sql/translate
{
"query": """
SELECT doc.message FROM "filebeat-7.6.2-2021.01.30"
""",
"fetch_size": 100
}
4.3 Connection Rest By Peer
測試發現,有些時候:早上剛來、中午剛起床、晚上準備下班。也就是很久沒人點了,第一次點選的時候會報這個錯誤。(我們的測試真敬業- -)
出現Connection Rest By Peer的問題,一般是一端關閉了連接配接,而另一端還以為對方在呢,然後傻乎乎的發請求過去,發現對方已經不跟它玩了。檢視了es所在機器的k8s keepalive設定:
可以看見預設的keepalive連接配接逾時時間是7200s,也就是兩小時。吻合了測試發現的報錯時間點,也符合日志中記錄的時間。而es用戶端這邊如果不指定keepalive的話,預設取的是ConnectionKeepAliveStrategy裡面的-1。是以java用戶端這邊-1不會斷開連接配接,而Linux那邊兩小時就會斷開,進而造成了Connection Rest By Peer。研究了一下SpringData配置es參數的地方,發現Spring配置除了擷取配置中的url和密碼之外,沒有可以配置keepalive的地方。隻有重寫RestClientBuilder的建構邏輯,實際上SpringData底層也是用的es提供的用戶端,隻不過在上層再封裝了一下:
package com.zhkj.oms.config;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.Args;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientProperties;
import org.springframework.boot.autoconfigure.elasticsearch.RestClientBuilderCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @author Xxx
* @since 2021/1/19/0019 11:04
*
* 0.運維那邊沒有改keepalive,linux預設7200s 我們這邊預設-1無限制
* 1.SpringData裡面沒有設定keepalive的地方,隻有重寫RestClientBuilder的建構
* 2.再重新實作HttpAsyncClientBuilder裡面的ConnectionKeepAliveStrategy擷取keepalive的方法
*/
@Configuration
public class EsRestClientBuilderConfig {
@Bean
RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties,
ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) {
HttpHost[] hosts = properties.getUris().stream().map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts);
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword()));
builder.setHttpClientConfigCallback((httpClientBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(httpClientBuilder));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
httpClientBuilder.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
Args.notNull(response, "HTTP response");
final HeaderElementIterator it = new BasicHeaderElementIterator(
response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
final HeaderElement he = it.nextElement();
final String param = he.getName();
final String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
try {
return Long.parseLong(value) * 1000;
} catch (final NumberFormatException ignore) {
}
}
}
// 三分鐘
return 1 * 60 * 1 * 1000;
}});
return httpClientBuilder;
});
builder.setRequestConfigCallback((requestConfigBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(requestConfigBuilder));
return requestConfigBuilder;
});
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
return builder;
}
private HttpHost createHttpHost(String uri) {
try {
return createHttpHost(URI.create(uri));
}
catch (IllegalArgumentException ex) {
return HttpHost.create(uri);
}
}
private HttpHost createHttpHost(URI uri) {
if (!StringUtils.hasLength(uri.getUserInfo())) {
return HttpHost.create(uri.toString());
}
try {
return HttpHost.create(new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(),
uri.getQuery(), uri.getFragment()).toString());
}
catch (URISyntaxException ex) {
throw new IllegalStateException(ex);
}
}
}
Create By Xxx 2021年1月30日16:10:56 轉載請注明出處,3q!