天天看點

基于elasticsearch8.3.3的java-api操作

作者:小碼搬磚

環境

jdk版本:jdk17

elasticsearch版本:8.3

引入的依賴

<dependencies>
  		<!--用戶端版本-->
        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.3.3</version>
        </dependency>

			<!--elasticsearch用到的依賴-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.4</version>
        </dependency>
			<!--elasticsearch用到的依賴-->
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>           

索引及文檔操作

參考官網的一些例子,以下代碼不能直接運作,隻是提供一些例子,但這些例子在本地已經過驗證,都是可以運作的

package com.yfway.es;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.cat.IndicesResponse;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.yfway.es.dto.Product;
import jakarta.json.spi.JsonProvider;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@SpringBootTest
@Slf4j
class EsApplicationTests {
    static ElasticsearchClient client;
    static RestClient restClient;

    @BeforeEach
    void init() {
        restClient = RestClient.builder(
                new HttpHost("127.0.0.1", 9200)).build();

        ElasticsearchTransport transport = new RestClientTransport(
                restClient,new JacksonJsonpMapper());

        client  = new ElasticsearchClient(transport);
        log.info("建立連接配接");
    }
    @AfterEach
    void end() throws IOException {
        restClient.close();
        log.info("關閉連接配接");
    }

    /**
     *建立索引
     */
    @Test
    void createIndex() throws IOException {
        CreateIndexResponse response = client.indices().create(c -> c.index("products"));
        boolean acknowledged = response.acknowledged();
        String index = response.index();
        log.info("是否建立成功:"+acknowledged);
        log.info("索引名稱:"+index);
    }

    /**
     *檢視索引
     */
    @Test
    void getIndex() throws IOException {
        //檢視指定索引
        GetIndexResponse response = client.indices().get(c -> c.index("products"));
        response.result().forEach((k,v)->{
            log.info("key:"+k);
            log.info("value:"+v);
        });

        //檢視所有索引
        IndicesResponse indices = client.cat().indices();
        indices.valueBody().forEach(info->{
            log.info("索引:"+info.index());
        });
    }

    /**
     *判斷索引是否存在
     */
    @Test
    void checkIndexExist() throws IOException {
        boolean exist = client.indices().exists(c -> c.index("products")).value();
        log.info("判斷索引是否存在:"+exist);
    }

    /**
     *删除索引
     */
    @Test
    void deleteIndex() throws IOException {
        DeleteIndexResponse response = client.indices().delete(c -> c.index("demo"));
        log.info("删除索引:"+response.acknowledged());
    }

    /**
     *添加一篇文檔,3種方式
     */
    @Test
    void addOneDoc() throws IOException {
        //方法1
        Product product1 = new Product("bk-1", "City bike", 123.0);
        IndexResponse response1 = client.index(
                //如果不指定ID,則随機生成
                d -> d.index("products").id(product1.getSku()).document(product1)
        );
        log.info(response1.result().jsonValue());

        //方法2
        Product product2 = new Product("bk-2", "City bike 2", 133.0);
        IndexRequest<Product> request = IndexRequest.of(i -> i
                .index("products")
                .id(product2.getSku())
                .document(product2)
        );
        IndexResponse response2 = client.index(request);
        log.info(response2.result().jsonValue());

        //方法3
        Product product3 = new Product("bk-3", "City bike 3", 93.0);
        IndexRequest.Builder<Product> indexReqBuilder = new IndexRequest.Builder<>();
        indexReqBuilder.index("products");
        indexReqBuilder.id(product3.getSku());
        indexReqBuilder.document(product3);
        IndexResponse response3 = client.index(indexReqBuilder.build());
        log.info(response3.result().jsonValue());
    }

    /**
     *批量新增
     */
    @Test
    void addAllDoc() throws IOException {
        List<Product> products = new ArrayList<>();
        products.add(new Product("bk-4", "City bike 4", 213.0));
        products.add(new Product("bk-5", "City bike 5", 105.0));

        BulkRequest.Builder br = new BulkRequest.Builder();
        for (Product product : products) {
            br.operations(op -> op
                    .index(idx -> idx
                            .index("products")
                           // .id(product.getSku())
                            .document(product)
                    )
            );
        }

        BulkResponse result = client.bulk(br.build());
        log.info("批量新增:"+!result.errors());
        // 如果有錯誤,錯誤日志
        if (result.errors()) {
            log.error("Bulk had errors");
            for (BulkResponseItem item: result.items()) {
                if (item.error() != null) {
                    log.error(item.error().reason());
                }
            }
        }
    }

    /**
     *批量新增,讀取json檔案
     */
    @Test
    void addAllDoc2() throws IOException {
        File[] files = new File[2];
        files[0] = new File("F:\\javaStudy\\es\\product100.json");
        files[1] = new File("F:\\javaStudy\\es\\product101.json");

        BulkRequest.Builder br = new BulkRequest.Builder();

        for (File file: files) {
            JsonData json = readJson(new FileInputStream(file), client);
            br.operations(op -> op
                    .index(idx -> idx
                            .index("products")
                            .document(json)
                    )
            );
        }
        BulkResponse result = client.bulk(br.build());
        log.info("批量新增:"+!result.errors());
    }

    public static JsonData readJson(InputStream input, ElasticsearchClient esClient) {
        JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
        JsonProvider jsonProvider = jsonpMapper.jsonProvider();

        return JsonData.from(jsonProvider.createParser(input), jsonpMapper);
    }

    /**
     *檢視文檔
     */
    @Test
    void getDoc() throws IOException {
        GetResponse<Product> response = client.get(s -> s.index("products").id("bk-100"), Product.class);
        Product product = response.source();
        assert product != null;
       log.info("商品:"+ product);
    }

    /**
     *分頁查詢
     */
    @Test
    void selectPage() throws IOException {
        //from:從哪條開始,size:查詢幾條
        SearchResponse<Product> products = client.search(
                s -> s.index("products").from(0).size(3)
                , Product.class
        );
        assert products.hits().total() != null;
        long total = products.hits().total().value();
        log.info("總數:"+total);

        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });
    }

    /**
     *排序
     */
    @Test
    void selectSort() throws IOException {
        SearchResponse<Product> products = client.search(
                s -> s.index("products").sort(sort -> sort.field(f -> f.field("price").order(SortOrder.Desc)))
                , Product.class
        );
        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });
    }

    /**
     *過濾查詢字段
     */
    @Test
    void selectField() throws IOException {
        //includes包含,excludes不包含
        SearchResponse<Product> products = client.search(
                s -> s.index("products").source(source->source.filter(f->f.includes("sku","title","price").excludes("price")))
                , Product.class
        );
        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });
    }

    /**
     *組合查詢-must(類似與and)
     */
    @Test
    void selectMust() throws IOException {
        Query bySku = TermQuery.of(m -> m
                .field("sku.keyword")
                .value("bk-4")
        )._toQuery();

        Query byPrice = MatchQuery.of(m -> m
                .field("price")
                .query("213.0")
        )._toQuery();

        //bool裡面的must相當于and
        SearchResponse<Product> products = client.search(
                s -> s.index("products").query(q->q.bool(b->b.must(bySku).must(byPrice)))
                , Product.class
        );

        //也可以這樣寫
//        SearchResponse<Product> products = client.search(
//                s -> s.index("products").query(q->q.bool(b->b.must(
//                        m->m.term(term->term.field("sku.keyword").value("bk-1"))
//                ).must(
//                        m->m.match(match->match.field("price").query("123.0"))
//                )))
//                , Product.class
//        );

        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });

    }

    /**
     *組合查詢-MustNot(都不滿足)
     */
    @Test
    void selectMustNot() throws IOException {
        Query bySku = TermQuery.of(m -> m
                .field("sku.keyword")
                .value("bk-4")
        )._toQuery();

        List<Query> queryList = new ArrayList<>();
        queryList.add(bySku);
      //  queryList.add(bySku2);

        SearchResponse<Product> products = client.search(
                s -> s.index("products").query(q->q.bool(b->b.mustNot(queryList)))
                , Product.class
        );


        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });

    }

    /**
     *組合查詢-should(類似與or,但如果送出裡面存在must,should則無效)
     */
    @Test
    void selectShould() throws IOException {
        Query bySku = TermQuery.of(m -> m
                .field("sku.keyword")
                .value("bk-4")
        )._toQuery();

        Query bySku2 = TermQuery.of(m -> m
                .field("sku.keyword")
                .value("bk-5")
        )._toQuery();

        List<Query> queryList = new ArrayList<>();
        queryList.add(bySku);
        queryList.add(bySku2);

        SearchResponse<Product> products = client.search(
                s -> s.index("products").query(q->q.bool(b->b.should(queryList)))
                , Product.class
        );


        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });

    }

    /**
     *範圍查詢 range的用法
     */
    @Test
    void selectRange() throws IOException {
        RangeQuery byPrice = RangeQuery.of(r -> r
                .field("price")
                .gt(JsonData.of(155))
        );
        Query bySku = TermQuery.of(m -> m
                .field("sku.keyword")
                .value("bk-5")
        )._toQuery();

        SearchResponse<Product> products = client.search(
                s -> s.index("products").query(q->q
                        .bool(b->b
                                .must(bySku)
                                .must(m->m.range(byPrice))
                        )
                )
                , Product.class
        );
        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });
    }

    /**
     *模糊查詢-具體了解Fuzzy的用法
     */
    @Test
    void selectFuzzy() throws IOException {
        FuzzyQuery bySku = FuzzyQuery.of(f -> f
                .field("sku")
                .value("100")
                .fuzziness("1")
        );

        SearchResponse<Product> products = client.search(
                s -> s.index("products").query(q->q
                        .fuzzy(bySku)
                )
                , Product.class
        );
        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });
    }

    /**
     *模糊查詢-字首模糊,類似與 like "xx%"
     */
    @Test
    void selectMatchPhrase() throws IOException {
        MatchPhraseQuery bySku = MatchPhraseQuery.of(m -> m
                .field("sku")
                .query("bk-1")
        )._toQuery().matchPhrase();

        SearchResponse<Product> products = client.search(
                s -> s.index("products").query(q->q
                        .matchPhrase(bySku)
                )
                , Product.class
        );
        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });
    }

    /**
     *聚合查詢-這裡示範max方法
     */
    @Test
    void selectAggregations() throws IOException {
        SearchResponse<Product> products = client.search(
                s -> s.index("products").aggregations("maxPrice",a->a
                        .max(m->m.field("price"))
                )
                , Product.class
        );
        products.hits().hits().forEach(item->{
            assert item.source() != null;
            log.info(item.source().toString());
        });

        for (Map.Entry<String, Aggregate> entry : products.aggregations().entrySet()) {
            log.info("Key = " + entry.getKey() + ", Value = " + entry.getValue().max().value());
        }
    }

    /**
     * 分組查詢->數值型分組
     * 使用的關鍵方法 aggregations 和 terms
     */
    @Test
    void selectGroup() throws IOException {
        SearchResponse<Product> products = client.search(
                //priceGroup自定義别名,price分組的字段
                s -> s.index("products").aggregations("priceGroup",a->a
                        .terms(t->t.field("price"))
                )
                , Product.class
        );
        //擷取分組資料
        Aggregate aggregate = products.aggregations().get("priceGroup");
        //dterms辨別double型,同理如果分組字段是long型,則用lterms,可看源碼
        DoubleTermsAggregate dterms = aggregate.dterms();
        Buckets<DoubleTermsBucket> buckets = dterms.buckets();
        for (DoubleTermsBucket b : buckets.array()) {
            log.info(b.key() + " : " + b.docCount());
        }
    }

    /**
     * 分組查詢->字元型分組,text預設不支援分組操作,如果需要支援,則需要使用索引字段,即字段+keyword
     */
    @Test
    void selectGroup2() throws IOException {
        SearchResponse<Product> products = client.search(
                //分組字段必須是sku.keyword才生效
                s -> s.index("products").aggregations("skuGroup",a->a
                        .terms(t->t.field("sku.keyword"))
                )
                , Product.class
        );
        //擷取分組資料
        Aggregate aggregate = products.aggregations().get("skuGroup");
        StringTermsAggregate sterms = aggregate.sterms();
        Buckets<StringTermsBucket> buckets = sterms.buckets();
        for (StringTermsBucket b : buckets.array()) {
            log.info(b.key() + " : " + b.docCount());
        }
    }
}
           

繼續閱讀