環境
jdk版本:jdk17
elasticsearch版本:8.3
引入的依賴
<dependencies>
<!--用戶端版本-->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.3.3</version>
</dependency>
<!--elasticsearch用到的依賴-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
<!--elasticsearch用到的依賴-->
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
索引及文檔操作
參考官網的一些例子,以下代碼不能直接運作,隻是提供一些例子,但這些例子在本地已經過驗證,都是可以運作的
package com.yfway.es;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.cat.IndicesResponse;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.yfway.es.dto.Product;
import jakarta.json.spi.JsonProvider;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@SpringBootTest
@Slf4j
class EsApplicationTests {
static ElasticsearchClient client;
static RestClient restClient;
@BeforeEach
void init() {
restClient = RestClient.builder(
new HttpHost("127.0.0.1", 9200)).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient,new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
log.info("建立連接配接");
}
@AfterEach
void end() throws IOException {
restClient.close();
log.info("關閉連接配接");
}
/**
*建立索引
*/
@Test
void createIndex() throws IOException {
CreateIndexResponse response = client.indices().create(c -> c.index("products"));
boolean acknowledged = response.acknowledged();
String index = response.index();
log.info("是否建立成功:"+acknowledged);
log.info("索引名稱:"+index);
}
/**
*檢視索引
*/
@Test
void getIndex() throws IOException {
//檢視指定索引
GetIndexResponse response = client.indices().get(c -> c.index("products"));
response.result().forEach((k,v)->{
log.info("key:"+k);
log.info("value:"+v);
});
//檢視所有索引
IndicesResponse indices = client.cat().indices();
indices.valueBody().forEach(info->{
log.info("索引:"+info.index());
});
}
/**
*判斷索引是否存在
*/
@Test
void checkIndexExist() throws IOException {
boolean exist = client.indices().exists(c -> c.index("products")).value();
log.info("判斷索引是否存在:"+exist);
}
/**
*删除索引
*/
@Test
void deleteIndex() throws IOException {
DeleteIndexResponse response = client.indices().delete(c -> c.index("demo"));
log.info("删除索引:"+response.acknowledged());
}
/**
*添加一篇文檔,3種方式
*/
@Test
void addOneDoc() throws IOException {
//方法1
Product product1 = new Product("bk-1", "City bike", 123.0);
IndexResponse response1 = client.index(
//如果不指定ID,則随機生成
d -> d.index("products").id(product1.getSku()).document(product1)
);
log.info(response1.result().jsonValue());
//方法2
Product product2 = new Product("bk-2", "City bike 2", 133.0);
IndexRequest<Product> request = IndexRequest.of(i -> i
.index("products")
.id(product2.getSku())
.document(product2)
);
IndexResponse response2 = client.index(request);
log.info(response2.result().jsonValue());
//方法3
Product product3 = new Product("bk-3", "City bike 3", 93.0);
IndexRequest.Builder<Product> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("products");
indexReqBuilder.id(product3.getSku());
indexReqBuilder.document(product3);
IndexResponse response3 = client.index(indexReqBuilder.build());
log.info(response3.result().jsonValue());
}
/**
*批量新增
*/
@Test
void addAllDoc() throws IOException {
List<Product> products = new ArrayList<>();
products.add(new Product("bk-4", "City bike 4", 213.0));
products.add(new Product("bk-5", "City bike 5", 105.0));
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : products) {
br.operations(op -> op
.index(idx -> idx
.index("products")
// .id(product.getSku())
.document(product)
)
);
}
BulkResponse result = client.bulk(br.build());
log.info("批量新增:"+!result.errors());
// 如果有錯誤,錯誤日志
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
}
}
/**
*批量新增,讀取json檔案
*/
@Test
void addAllDoc2() throws IOException {
File[] files = new File[2];
files[0] = new File("F:\\javaStudy\\es\\product100.json");
files[1] = new File("F:\\javaStudy\\es\\product101.json");
BulkRequest.Builder br = new BulkRequest.Builder();
for (File file: files) {
JsonData json = readJson(new FileInputStream(file), client);
br.operations(op -> op
.index(idx -> idx
.index("products")
.document(json)
)
);
}
BulkResponse result = client.bulk(br.build());
log.info("批量新增:"+!result.errors());
}
public static JsonData readJson(InputStream input, ElasticsearchClient esClient) {
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
JsonProvider jsonProvider = jsonpMapper.jsonProvider();
return JsonData.from(jsonProvider.createParser(input), jsonpMapper);
}
/**
*檢視文檔
*/
@Test
void getDoc() throws IOException {
GetResponse<Product> response = client.get(s -> s.index("products").id("bk-100"), Product.class);
Product product = response.source();
assert product != null;
log.info("商品:"+ product);
}
/**
*分頁查詢
*/
@Test
void selectPage() throws IOException {
//from:從哪條開始,size:查詢幾條
SearchResponse<Product> products = client.search(
s -> s.index("products").from(0).size(3)
, Product.class
);
assert products.hits().total() != null;
long total = products.hits().total().value();
log.info("總數:"+total);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*排序
*/
@Test
void selectSort() throws IOException {
SearchResponse<Product> products = client.search(
s -> s.index("products").sort(sort -> sort.field(f -> f.field("price").order(SortOrder.Desc)))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*過濾查詢字段
*/
@Test
void selectField() throws IOException {
//includes包含,excludes不包含
SearchResponse<Product> products = client.search(
s -> s.index("products").source(source->source.filter(f->f.includes("sku","title","price").excludes("price")))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*組合查詢-must(類似與and)
*/
@Test
void selectMust() throws IOException {
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-4")
)._toQuery();
Query byPrice = MatchQuery.of(m -> m
.field("price")
.query("213.0")
)._toQuery();
//bool裡面的must相當于and
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q.bool(b->b.must(bySku).must(byPrice)))
, Product.class
);
//也可以這樣寫
// SearchResponse<Product> products = client.search(
// s -> s.index("products").query(q->q.bool(b->b.must(
// m->m.term(term->term.field("sku.keyword").value("bk-1"))
// ).must(
// m->m.match(match->match.field("price").query("123.0"))
// )))
// , Product.class
// );
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*組合查詢-MustNot(都不滿足)
*/
@Test
void selectMustNot() throws IOException {
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-4")
)._toQuery();
List<Query> queryList = new ArrayList<>();
queryList.add(bySku);
// queryList.add(bySku2);
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q.bool(b->b.mustNot(queryList)))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*組合查詢-should(類似與or,但如果送出裡面存在must,should則無效)
*/
@Test
void selectShould() throws IOException {
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-4")
)._toQuery();
Query bySku2 = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-5")
)._toQuery();
List<Query> queryList = new ArrayList<>();
queryList.add(bySku);
queryList.add(bySku2);
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q.bool(b->b.should(queryList)))
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*範圍查詢 range的用法
*/
@Test
void selectRange() throws IOException {
RangeQuery byPrice = RangeQuery.of(r -> r
.field("price")
.gt(JsonData.of(155))
);
Query bySku = TermQuery.of(m -> m
.field("sku.keyword")
.value("bk-5")
)._toQuery();
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q
.bool(b->b
.must(bySku)
.must(m->m.range(byPrice))
)
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*模糊查詢-具體了解Fuzzy的用法
*/
@Test
void selectFuzzy() throws IOException {
FuzzyQuery bySku = FuzzyQuery.of(f -> f
.field("sku")
.value("100")
.fuzziness("1")
);
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q
.fuzzy(bySku)
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*模糊查詢-字首模糊,類似與 like "xx%"
*/
@Test
void selectMatchPhrase() throws IOException {
MatchPhraseQuery bySku = MatchPhraseQuery.of(m -> m
.field("sku")
.query("bk-1")
)._toQuery().matchPhrase();
SearchResponse<Product> products = client.search(
s -> s.index("products").query(q->q
.matchPhrase(bySku)
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
}
/**
*聚合查詢-這裡示範max方法
*/
@Test
void selectAggregations() throws IOException {
SearchResponse<Product> products = client.search(
s -> s.index("products").aggregations("maxPrice",a->a
.max(m->m.field("price"))
)
, Product.class
);
products.hits().hits().forEach(item->{
assert item.source() != null;
log.info(item.source().toString());
});
for (Map.Entry<String, Aggregate> entry : products.aggregations().entrySet()) {
log.info("Key = " + entry.getKey() + ", Value = " + entry.getValue().max().value());
}
}
/**
* 分組查詢->數值型分組
* 使用的關鍵方法 aggregations 和 terms
*/
@Test
void selectGroup() throws IOException {
SearchResponse<Product> products = client.search(
//priceGroup自定義别名,price分組的字段
s -> s.index("products").aggregations("priceGroup",a->a
.terms(t->t.field("price"))
)
, Product.class
);
//擷取分組資料
Aggregate aggregate = products.aggregations().get("priceGroup");
//dterms辨別double型,同理如果分組字段是long型,則用lterms,可看源碼
DoubleTermsAggregate dterms = aggregate.dterms();
Buckets<DoubleTermsBucket> buckets = dterms.buckets();
for (DoubleTermsBucket b : buckets.array()) {
log.info(b.key() + " : " + b.docCount());
}
}
/**
* 分組查詢->字元型分組,text預設不支援分組操作,如果需要支援,則需要使用索引字段,即字段+keyword
*/
@Test
void selectGroup2() throws IOException {
SearchResponse<Product> products = client.search(
//分組字段必須是sku.keyword才生效
s -> s.index("products").aggregations("skuGroup",a->a
.terms(t->t.field("sku.keyword"))
)
, Product.class
);
//擷取分組資料
Aggregate aggregate = products.aggregations().get("skuGroup");
StringTermsAggregate sterms = aggregate.sterms();
Buckets<StringTermsBucket> buckets = sterms.buckets();
for (StringTermsBucket b : buckets.array()) {
log.info(b.key() + " : " + b.docCount());
}
}
}