环境
jdk版本:jdk17
elasticsearch版本:8.3
引入的依赖
<dependencies>
<!--客户端版本-->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.3.3</version>
</dependency>
<!--elasticsearch用到的依赖-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
<!--elasticsearch用到的依赖-->
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
索引及文档操作
参考官网的一些例子,以下代码不能直接运行,只是提供一些例子,但这些例子在本地已经过验证,都是可以运行的
package com.yfway.es;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.cat.IndicesResponse;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.yfway.es.dto.Product;
import jakarta.json.spi.JsonProvider;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@SpringBootTest
@Slf4j
class EsApplicationTests {
static ElasticsearchClient client;
static RestClient restClient;
@BeforeEach
void init() {
restClient = RestClient.builder(
new HttpHost("127.0.0.1", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient,new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
log.info("建立连接");
}
@AfterEach
void end() throws IOException {
restClient.close();
log.info("关闭连接");
}
/**
*创建索引
*/
@Test
void createIndex() throws IOException {
CreateIndexResponse response = client.indices().create(c -> c.index("products"));
boolean acknowledged = response.acknowledged();
String index = response.index();
log.info("是否创建成功:"+acknowledged);
log.info("索引名称:"+index);
}
/**
*查看索引
*/
@Test
void getIndex() throws IOException {
//查看指定索引
GetIndexResponse response = client.indices().get(c -> c.index("products"));
response.result().forEach((k,v)->{
log.info("key:"+k);
log.info("value:"+v);
});
//查看所有索引
IndicesResponse indices = client.cat().indices();
indices.valueBody().forEach(info->{
log.info("索引:"+info.index());
});
}
/**
*判断索引是否存在
*/
@Test
void checkIndexExist() throws IOException {
boolean exist = client.indices().exists(c -> c.index("products")).value();
log.info("判断索引是否存在:"+exist);
}
/**
*删除索引
*/
@Test
void deleteIndex() throws IOException {
DeleteIndexResponse response = client.indices().delete(c -> c.index("demo"));
log.info("删除索引:"+response.acknowledged());
}
/**
*添加一篇文档,3种方式
*/
@Test
void addOneDoc() throws IOException {
//方法1
Product product1 = new Product("bk-1", "City bike", 123.0);
IndexResponse response1 = client.index(
//如果不指定ID,则随机生成
d -> d.index("products").id(product1.getSku()).document(product1)
);
log.info(response1.result().jsonValue());
//方法2
Product product2 = new Product("bk-2", "City bike 2", 133.0);
IndexRequest<Product> request = IndexRequest.of(i -> i
.index("products")
.id(product2.getSku())
.document(product2)
);
IndexResponse response2 = client.index(request);
log.info(response2.result().jsonValue());
//方法3
Product product3 = new Product("bk-3", "City bike 3", 93.0);
IndexRequest.Builder<Product> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("products");
indexReqBuilder.id(product3.getSku());
indexReqBuilder.document(product3);
IndexResponse response3 = client.index(indexReqBuilder.build());
log.info(response3.result().jsonValue());
}
/**
*批量新增
*/
@Test
void addAllDoc() throws IOException {
List<Product> products = new ArrayList<>();
products.add(new Product("bk-4", "City bike 4", 213.0));
products.add(new Product("bk-5", "City bike 5", 105.0));
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : products) {
br.operations(op -> op
.index(idx -> idx
.index("products")
// .id(product.getSku())
.document(product)
)
);
}
BulkResponse result = client.bulk(br.build());
log.info("批量新增:"+!result.errors());
// 如果有错误,错误日志
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
}
}
/**
*批量新增,读取json文件
*/
@Test
void addAllDoc2() throws IOException {
File[] files = new File[2];
files[0] = new File("F:\\javaStudy\\es\\product100.json");
files[1] = new File("F:\\javaStudy\\es\\product101.json");
BulkRequest.Builder br = new BulkRequest.Builder();
for (File file: files) {
JsonData json = readJson(new FileInputStream(file), client);
br.operations(op -> op
.index(idx -> idx
.index("products")
.document(json)
)
);
}
BulkResponse result = client.bulk(br.build());
log.info("批量新增:"+!result.errors());
}
public static JsonData readJson(InputStream input, ElasticsearchClient esClient) {
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
JsonProvider jsonProvider = jsonpMapper.jsonProvider();
return JsonData.from(jsonProvider.createParser(input), jsonpMapper);
}
/**
*查看文档
*/
@Test
void getDoc() throws IOException {
GetResponse<Product> response = client.get(s -> s.index("products").id("bk-100"), Product.class);
Product product = response.source();
assert product != null;
log.info("商品:"+ product);
}
/**
*分页查询
*/
@Test
void selectPage() throws IOException {
//from:从哪条开始,size:查询几条
SearchResponse<Product> products = client.search(
s -> s.index("products").from(0).size(3)
, Product.class
);
assert products.hits().total() != null;
long total = products.hits().total().value();
log.info("总数:"+total);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*排序
*/
@Test
void selectSort() throws IOException {
SearchResponse<Product> products = client.search(
s -> s.index("products").sort(sort -> sort.field(f -> f.field("price").order(SortOrder.Desc)))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*过滤查询字段
*/
@Test
void selectField() throws IOException {
//includes包含,excludes不包含
SearchResponse<Product> products = client.search(
s -> s.index("products").source(source->source.filter(f->f.includes("sku","title","price").excludes("price")))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*组合查询-must(类似与and)
*/
@Test
void selectMust() throws IOException {
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-4")
)._toQuery();
Query byPrice = MatchQuery.of(m -> m
.field("price")
.query("213.0")
)._toQuery();
//bool里面的must相当于and
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q.bool(b->b.must(bySku).must(byPrice)))
, Product.class
);
//也可以这样写
// SearchResponse<Product> products = client.search(
// s -> s.index("products").query(q->q.bool(b->b.must(
// m->m.term(term->term.field("sku.keyword").value("bk-1"))
// ).must(
// m->m.match(match->match.field("price").query("123.0"))
// )))
// , Product.class
// );
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*组合查询-MustNot(都不满足)
*/
@Test
void selectMustNot() throws IOException {
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-4")
)._toQuery();
List<Query> queryList = new ArrayList<>();
queryList.add(bySku);
// queryList.add(bySku2);
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q.bool(b->b.mustNot(queryList)))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*组合查询-should(类似与or,但如果提交里面存在must,should则无效)
*/
@Test
void selectShould() throws IOException {
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-4")
)._toQuery();
Query bySku2 = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-5")
)._toQuery();
List<Query> queryList = new ArrayList<>();
queryList.add(bySku);
queryList.add(bySku2);
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q.bool(b->b.should(queryList)))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*范围查询 range的用法
*/
@Test
void selectRange() throws IOException {
RangeQuery byPrice = RangeQuery.of(r -> r
.field("price")
.gt(JsonData.of(155))
);
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-5")
)._toQuery();
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q
.bool(b->b
.must(bySku)
.must(m->m.range(byPrice))
)
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*模糊查询-具体了解Fuzzy的用法
*/
@Test
void selectFuzzy() throws IOException {
FuzzyQuery bySku = FuzzyQuery.of(f -> f
.field("sku")
.value("100")
.fuzziness("1")
);
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q
.fuzzy(bySku)
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*模糊查询-前缀模糊,类似与 like "xx%"
*/
@Test
void selectMatchPhrase() throws IOException {
MatchPhraseQuery bySku = MatchPhraseQuery.of(m -> m
.field("sku")
.query("bk-1")
)._toQuery().matchPhrase();
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q
.matchPhrase(bySku)
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*聚合查询-这里演示max方法
*/
@Test
void selectAggregations() throws IOException {
SearchResponse<Product> products = client.search(
s -> s.index("products").aggregations("maxPrice",a->a
.max(m->m.field("price"))
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
for (Map.Entry<String, Aggregate> entry : products.aggregations().entrySet()) {
log.info("Key = " + entry.getKey() + ", Value = " + entry.getValue().max().value());
}
}
/**
* 分组查询->数值型分组
* 使用的关键方法 aggregations 和 terms
*/
@Test
void selectGroup() throws IOException {
SearchResponse<Product> products = client.search(
//priceGroup自定义别名,price分组的字段
s -> s.index("products").aggregations("priceGroup",a->a
.terms(t->t.field("price"))
)
, Product.class
);
//获取分组数据
Aggregate aggregate = products.aggregations().get("priceGroup");
//dterms标识double型,同理如果分组字段是long型,则用lterms,可看源码
DoubleTermsAggregate dterms = aggregate.dterms();
Buckets<DoubleTermsBucket> buckets = dterms.buckets();
for (DoubleTermsBucket b : buckets.array()) {
log.info(b.key() + " : " + b.docCount());
}
}
/**
* 分组查询->字符型分组,text默认不支持分组操作,如果需要支持,则需要使用索引字段,即字段+keyword
*/
@Test
void selectGroup2() throws IOException {
SearchResponse<Product> products = client.search(
//分组字段必须是sku.keyword才生效
s -> s.index("products").aggregations("skuGroup",a->a
.terms(t->t.field("sku.keyword"))
)
, Product.class
);
//获取分组数据
Aggregate aggregate = products.aggregations().get("skuGroup");
StringTermsAggregate sterms = aggregate.sterms();
Buckets<StringTermsBucket> buckets = sterms.buckets();
for (StringTermsBucket b : buckets.array()) {
log.info(b.key() + " : " + b.docCount());
}
}
}