天天看點

ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

 ElasticSearch版本

elasticsearch-5.4.3.jar      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

指定 ip位址建立client

private TransportClient client = null;
    /**
     *指定 ip位址建立client
     */
    @Before
    public void init() throws Exception {
        //設定叢集名稱
        Settings settings = Settings.builder()
                .put("cluster.name", "my-es")
                //自動感覺的功能(可以通過目前指定的節點擷取所有es節點的資訊)
                .put("client.transport.sniff", true)
                .build();
        //建立client
        client = new PreBuiltTransportClient(settings).addTransportAddresses(
                // Java對應的API操作的端口都是9300,記住是9300
                new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
                new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
                new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

添加資料

/**
     *添加資料
     */
    @Test
    public void testCreate() throws IOException {
        // index可以了解為資料庫;type了解為資料表;id相當于資料庫表中記錄的主鍵,是唯一的。
        IndexResponse response = client.prepareIndex("gamelog", "users", "1")
                .setSource(
                        jsonBuilder()
                                .startObject()
                                // field了解為列
                                    .field("username", "老趙")
                                    .field("gender", "male")
                                    .field("birthday", new Date())
                                    .field("fv", 9999)
                                    .field("message", "trying out Elasticsearch")
                                .endObject()
                ).get();
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

查找一條

/**
     *查找一條
     */
    @Test
    public void testGet() throws IOException {
        GetResponse response = client.prepareGet("gamelog", "users", "1").get();
        System.out.println(response.getSourceAsString());
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

查找多條

/**
     * 查找多條
     */
    @Test
    public void testMultiGet() throws IOException {
        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("gamelog", "users", "1")
                .add("gamelog", "users", "2", "3")
                .add("news", "fulltext", "1")
                .get();
        for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String json = response.getSourceAsString();
                System.out.println(json);
            }
        }
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

資料更新

/**
     * 資料更新
     */
    @Test
    public void testUpdate() throws Exception {
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("gamelog");
        updateRequest.type("users");
        updateRequest.id("2");
        updateRequest.doc(
                jsonBuilder()
                    .startObject()
                        .field("fv", 999.9)
                    .endObject());
        client.update(updateRequest).get();
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

資料删除-指定ID

/**
     * 資料删除-指定ID
     */
    @Test
    public void testDelete() {
        DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get();
        System.out.println(response);
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

資料删除--指定任意某個字段

/**
     * 資料删除--指定任意某個字段
     */
    @Test
    public void testDeleteByQuery() {
        BulkByScrollResponse response =
                DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                        //指定查詢條件
                        .filter(QueryBuilders.matchQuery("username", "老段"))
                        //指定索引名稱
                        .source("gamelog")
                        .get();
        long deleted = response.getDeleted();
        System.out.println(deleted);
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

異步删除

/**
     * 異步删除
     */
    @Test
    public void testDeleteByQueryAsync() {
        DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("gender", "male"))
                .source("gamelog")
                .execute(new ActionListener<BulkByScrollResponse>() {
                    @Override
                    public void onResponse(BulkByScrollResponse response) {
                        long deleted = response.getDeleted();
                        System.out.println("資料删除了");
                        System.out.println(deleted);
                    }
                    @Override
                    public void onFailure(Exception e) {
                        e.printStackTrace();
                    }
                });
        try {
            System.out.println("異步删除");
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

範圍查詢

/**
     *  範圍查詢
     */
    @Test
    public void testRange() {
        QueryBuilder qb = rangeQuery("fv")
                // [88.99, 10000)
                .from(88.99)
                .to(10000)
                .includeLower(true)
                .includeUpper(false);
        SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get();
        System.out.println(response);
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

聚合查詢

先添加一些資料

/**
     * curl -XPUT 'http://192.168.5.251:9200/player_info/player/1' -d '{ "name": "curry", "age": 29, "salary": 3500,"team": "war", "position": "pg"}'
     * curl -XPUT 'http://192.168.5.251:9200/player_info/player/2' -d '{ "name": "thompson", "age": 26, "salary": 2000,"team": "war", "position": "pg"}'
     * curl -XPUT 'http://192.168.5.251:9200/player_info/player/3' -d '{ "name": "irving", "age": 25, "salary": 2000,"team": "cav", "position": "pg"}'
     * curl -XPUT 'http://192.168.5.251:9200/player_info/player/4' -d '{ "name": "green", "age": 26, "salary": 2000,"team": "war", "position": "pf"}'
     * curl -XPUT 'http://192.168.5.251:9200/player_info/player/5' -d '{ "name": "james", "age": 33, "salary": 4000,"team": "cav", "position": "sf"}'
     */      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢
@Test
    public void testAddPlayer() throws IOException {
        IndexResponse response = client.prepareIndex("player_info", "player", "1")
                .setSource(
                        jsonBuilder()
                                .startObject()
                                .field("name", "James")
                                .field("age", 33)
                                .field("salary", 3000)
                                .field("team", "cav")
                                .field("position", "sf")
                                .endObject()
                ).get();
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

group by/count

例如要計算每個球隊的球員數,如果使用SQL語句,應表達如下:

select team, count(*) as player_count from player group by team;      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

ES的java api:

@Test
    public void testAgg1() {
        //指定索引和type
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //按team分組然後聚合,但是并沒有指定聚合函數
        TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team");
        //添加聚合器
        builder.addAggregation(teamAgg);
        //觸發
        SearchResponse response = builder.execute().actionGet();
        //System.out.println(response);
        //将傳回的結果放入到一個map中
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//        Set<String> keys = aggMap.keySet();
//
//        for (String key: keys) {
//            System.out.println(key);
//        }
//        //取出聚合屬性
        StringTerms terms = (StringTerms) aggMap.get("player_count");
        //
////        //依次疊代出分組聚合資料
//        for (Terms.Bucket bucket : terms.getBuckets()) {
//            //分組的名字
//            String team = (String) bucket.getKey();
//            //count,分組後一個組有多少資料
//            long count = bucket.getDocCount();
//            System.out.println(team + " " + count);
//        }
        Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
        while (teamBucketIt .hasNext()) {
            Terms.Bucket bucket = teamBucketIt.next();
            String team = (String) bucket.getKey();
            long count = bucket.getDocCount();
            System.out.println(team + " " + count);
        }
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

group by多個field

例如要計算每個球隊每個位置的球員數,如果使用SQL語句,應表達如下:

select team, position, count(*) as pos_count from player group by team, position;      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢
/**
     * group by多個field
     * 例如要計算每個球隊每個位置的球員數,如果使用SQL語句
     * select team, position, count(*) as pos_count from player group by team, position;
     */
    @Test
    public void testAgg2() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //指定别名和分組的字段
        TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
        TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
        //添加兩個聚合建構器
        builder.addAggregation(teamAgg.subAggregation(posAgg));
        //執行查詢
        SearchResponse response = builder.execute().actionGet();
        //将查詢結果放入map中
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        //根據屬性名到map中查找
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        //循環查找結果
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            //先按球隊進行分組
            String team = (String) teamBucket.getKey();
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            StringTerms positions = (StringTerms) subAggMap.get("pos_count");
            //因為一個球隊有很多位置,那麼還要依次拿出位置資訊
            for (Terms.Bucket posBucket : positions.getBuckets()) {
                //拿到位置的名字
                String pos = (String) posBucket.getKey();
                //拿出該位置的數量
                long docCount = posBucket.getDocCount();
                //列印球隊,位置,人數
                System.out.println(team + " " + pos + " " + docCount);
            }
        }
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

max/min/sum/avg

例如要計算每個球隊年齡最大/最小/總/平均的球員年齡,如果使用SQL語句,應表達如下:

select team, max(age) as max_age from player group by team;      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢
/**
     * select team, max(age) as max_age from player group by team;
     */
    @Test
    public void testAgg3() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //指定安球隊進行分組
        TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
        //指定分組求最大值
        MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
        //分組後求最大值
        builder.addAggregation(teamAgg.subAggregation(maxAgg));
        //查詢
        SearchResponse response = builder.execute().actionGet();
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        //根據team屬性,擷取map中的内容
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            //分組的屬性名
            String team = (String) teamBucket.getKey();
            //在将聚合後取最大值的内容取出來放到map中
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            //取分組後的最大值
            InternalMax ages = (InternalMax)subAggMap.get("max_age");
            double max = ages.getValue();
            System.out.println(team + " " + max);
        }
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

對多個field求max/min/sum/avg

例如要計算每個球隊球員的平均年齡,同時又要計算總年薪,如果使用SQL語句,應表達如下:

select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢
/**
     * select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team;
     */
    @Test
    public void testAgg4() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //指定分組字段
        TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
        //指定聚合函數是求平均資料
        AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
        //指定另外一個聚合函數是求和
        SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
        //分組的聚合器關聯了兩個聚合函數
        builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
        SearchResponse response = builder.execute().actionGet();
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        //按分組的名字取出資料
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            //擷取球隊名字
            String team = (String) teamBucket.getKey();
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            //根據别名取出平均年齡
            InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age");
            //根據别名取出薪水總和
            InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
            double avgAgeValue = avgAge.getValue();
            double totalSalaryValue = totalSalary.getValue();
            System.out.println(team + " " + avgAgeValue + " " + totalSalaryValue);
        }
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

聚合後對Aggregation結果排序

例如要計算每個球隊總年薪,并按照總年薪倒序排列,如果使用SQL語句,應表達如下:

select team, sum(salary) as total_salary from player group by team order by total_salary desc;      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢
/**
     * select team, sum(salary) as total_salary from player group by team order by total_salary desc;
     */
    @Test
    public void testAgg5() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //按team進行分組,然後指定排序規則
        TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true));
        SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
        builder.addAggregation(termsAgg.subAggregation(sumAgg));
        SearchResponse response = builder.execute().actionGet();
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            String team = (String) teamBucket.getKey();
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
            double totalSalaryValue = totalSalary.getValue();
            System.out.println(team + " " + totalSalaryValue);
        }
    }      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢

需要特别注意的是,排序是在TermAggregation處執行的,Order.aggregation函數的第一個參數是aggregation的名字,第二個參數是boolean型,true表示正序,false表示倒序。 

  • Aggregation結果條數的問題

預設情況下,search執行後,僅傳回10條聚合結果,如果想反悔更多的結果,需要在建構TermsBuilder 時指定size:

TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢
  • Aggregation結果的解析/輸出

得到response後:

Map<String, Aggregation> aggMap = response.getAggregations().asMap();
StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg");
Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();
while (teamBucketIt .hasNext()) {
Bucket buck = teamBucketIt .next();
//球隊名
String team = buck.getKey();
//記錄數
long count = buck.getDocCount();
//得到所有子聚合
Map subaggmap = buck.getAggregations().asMap();
//avg值擷取方法
double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue();
//sum值擷取方法
double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue();
//...
//max/min以此類推
}      
ElasticSearch: java API - 基本增删改查和聚合查詢 ElasticSearch版本指定 ip位址建立client添加資料查找一條查找多條資料更新資料删除-指定ID資料删除--指定任意某個字段異步删除範圍查詢聚合查詢
  • 總結

綜上,聚合操作主要是調用了SearchRequestBuilder的addAggregation方法,通常是傳入一個TermsBuilder,子聚合調用TermsBuilder的subAggregation方法,可以添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常見的聚合操作。

從實作上來講,SearchRequestBuilder在内部保持了一個私有的 SearchSourceBuilder執行個體, SearchSourceBuilder内部包含一個List<AbstractAggregationBuilder>,每次調用addAggregation時會調用 SearchSourceBuilder執行個體,添加一個AggregationBuilder。

同樣的,TermsBuilder也在内部保持了一個List<AbstractAggregationBuilder>,調用addAggregation方法(來自父類addAggregation)時會添加一個AggregationBuilder。有興趣的讀者也可以閱讀源碼的實作。

參考來源:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/index.html ElasticSearch java API - 聚合查詢 - Elastic 中文社群