ElasticSearch版本
elasticsearch-5.4.3.jar

指定 ip位址建立client
private TransportClient client = null;
/**
*指定 ip位址建立client
*/
@Before
public void init() throws Exception {
//設定叢集名稱
Settings settings = Settings.builder()
.put("cluster.name", "my-es")
//自動感覺的功能(可以通過目前指定的節點擷取所有es節點的資訊)
.put("client.transport.sniff", true)
.build();
//建立client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
// Java對應的API操作的端口都是9300,記住是9300
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));
}
添加資料
/**
*添加資料
*/
@Test
public void testCreate() throws IOException {
// index可以了解為資料庫;type了解為資料表;id相當于資料庫表中記錄的主鍵,是唯一的。
IndexResponse response = client.prepareIndex("gamelog", "users", "1")
.setSource(
jsonBuilder()
.startObject()
// field了解為列
.field("username", "老趙")
.field("gender", "male")
.field("birthday", new Date())
.field("fv", 9999)
.field("message", "trying out Elasticsearch")
.endObject()
).get();
}
查找一條
/**
*查找一條
*/
@Test
public void testGet() throws IOException {
GetResponse response = client.prepareGet("gamelog", "users", "1").get();
System.out.println(response.getSourceAsString());
}
查找多條
/**
* 查找多條
*/
@Test
public void testMultiGet() throws IOException {
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("gamelog", "users", "1")
.add("gamelog", "users", "2", "3")
.add("news", "fulltext", "1")
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String json = response.getSourceAsString();
System.out.println(json);
}
}
}
資料更新
/**
* 資料更新
*/
@Test
public void testUpdate() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("gamelog");
updateRequest.type("users");
updateRequest.id("2");
updateRequest.doc(
jsonBuilder()
.startObject()
.field("fv", 999.9)
.endObject());
client.update(updateRequest).get();
}
資料删除-指定ID
/**
* 資料删除-指定ID
*/
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get();
System.out.println(response);
}
資料删除--指定任意某個字段
/**
* 資料删除--指定任意某個字段
*/
@Test
public void testDeleteByQuery() {
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
//指定查詢條件
.filter(QueryBuilders.matchQuery("username", "老段"))
//指定索引名稱
.source("gamelog")
.get();
long deleted = response.getDeleted();
System.out.println(deleted);
}
異步删除
/**
* 異步删除
*/
@Test
public void testDeleteByQueryAsync() {
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male"))
.source("gamelog")
.execute(new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("資料删除了");
System.out.println(deleted);
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
});
try {
System.out.println("異步删除");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
範圍查詢
/**
* 範圍查詢
*/
@Test
public void testRange() {
QueryBuilder qb = rangeQuery("fv")
// [88.99, 10000)
.from(88.99)
.to(10000)
.includeLower(true)
.includeUpper(false);
SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get();
System.out.println(response);
}
聚合查詢
先添加一些資料
/**
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/1' -d '{ "name": "curry", "age": 29, "salary": 3500,"team": "war", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/2' -d '{ "name": "thompson", "age": 26, "salary": 2000,"team": "war", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/3' -d '{ "name": "irving", "age": 25, "salary": 2000,"team": "cav", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/4' -d '{ "name": "green", "age": 26, "salary": 2000,"team": "war", "position": "pf"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/5' -d '{ "name": "james", "age": 33, "salary": 4000,"team": "cav", "position": "sf"}'
*/
@Test
public void testAddPlayer() throws IOException {
IndexResponse response = client.prepareIndex("player_info", "player", "1")
.setSource(
jsonBuilder()
.startObject()
.field("name", "James")
.field("age", 33)
.field("salary", 3000)
.field("team", "cav")
.field("position", "sf")
.endObject()
).get();
}
group by/count
例如要計算每個球隊的球員數,如果使用SQL語句,應表達如下:
select team, count(*) as player_count from player group by team;
ES的java api:
@Test
public void testAgg1() {
//指定索引和type
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team分組然後聚合,但是并沒有指定聚合函數
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team");
//添加聚合器
builder.addAggregation(teamAgg);
//觸發
SearchResponse response = builder.execute().actionGet();
//System.out.println(response);
//将傳回的結果放入到一個map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
// Set<String> keys = aggMap.keySet();
//
// for (String key: keys) {
// System.out.println(key);
// }
// //取出聚合屬性
StringTerms terms = (StringTerms) aggMap.get("player_count");
//
//// //依次疊代出分組聚合資料
// for (Terms.Bucket bucket : terms.getBuckets()) {
// //分組的名字
// String team = (String) bucket.getKey();
// //count,分組後一個組有多少資料
// long count = bucket.getDocCount();
// System.out.println(team + " " + count);
// }
Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
while (teamBucketIt .hasNext()) {
Terms.Bucket bucket = teamBucketIt.next();
String team = (String) bucket.getKey();
long count = bucket.getDocCount();
System.out.println(team + " " + count);
}
}
group by多個field
例如要計算每個球隊每個位置的球員數,如果使用SQL語句,應表達如下:
select team, position, count(*) as pos_count from player group by team, position;
/**
* group by多個field
* 例如要計算每個球隊每個位置的球員數,如果使用SQL語句
* select team, position, count(*) as pos_count from player group by team, position;
*/
@Test
public void testAgg2() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定别名和分組的字段
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
//添加兩個聚合建構器
builder.addAggregation(teamAgg.subAggregation(posAgg));
//執行查詢
SearchResponse response = builder.execute().actionGet();
//将查詢結果放入map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根據屬性名到map中查找
StringTerms teams = (StringTerms) aggMap.get("team_name");
//循環查找結果
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//先按球隊進行分組
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
StringTerms positions = (StringTerms) subAggMap.get("pos_count");
//因為一個球隊有很多位置,那麼還要依次拿出位置資訊
for (Terms.Bucket posBucket : positions.getBuckets()) {
//拿到位置的名字
String pos = (String) posBucket.getKey();
//拿出該位置的數量
long docCount = posBucket.getDocCount();
//列印球隊,位置,人數
System.out.println(team + " " + pos + " " + docCount);
}
}
}
max/min/sum/avg
例如要計算每個球隊年齡最大/最小/總/平均的球員年齡,如果使用SQL語句,應表達如下:
select team, max(age) as max_age from player group by team;
/**
* select team, max(age) as max_age from player group by team;
*/
@Test
public void testAgg3() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定安球隊進行分組
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
//指定分組求最大值
MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
//分組後求最大值
builder.addAggregation(teamAgg.subAggregation(maxAgg));
//查詢
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根據team屬性,擷取map中的内容
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//分組的屬性名
String team = (String) teamBucket.getKey();
//在将聚合後取最大值的内容取出來放到map中
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//取分組後的最大值
InternalMax ages = (InternalMax)subAggMap.get("max_age");
double max = ages.getValue();
System.out.println(team + " " + max);
}
}
對多個field求max/min/sum/avg
例如要計算每個球隊球員的平均年齡,同時又要計算總年薪,如果使用SQL語句,應表達如下:
select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;
/**
* select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team;
*/
@Test
public void testAgg4() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定分組字段
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
//指定聚合函數是求平均資料
AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
//指定另外一個聚合函數是求和
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
//分組的聚合器關聯了兩個聚合函數
builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//按分組的名字取出資料
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//擷取球隊名字
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//根據别名取出平均年齡
InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age");
//根據别名取出薪水總和
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double avgAgeValue = avgAge.getValue();
double totalSalaryValue = totalSalary.getValue();
System.out.println(team + " " + avgAgeValue + " " + totalSalaryValue);
}
}
聚合後對Aggregation結果排序
例如要計算每個球隊總年薪,并按照總年薪倒序排列,如果使用SQL語句,應表達如下:
select team, sum(salary) as total_salary from player group by team order by total_salary desc;
/**
* select team, sum(salary) as total_salary from player group by team order by total_salary desc;
*/
@Test
public void testAgg5() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team進行分組,然後指定排序規則
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true));
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
builder.addAggregation(termsAgg.subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double totalSalaryValue = totalSalary.getValue();
System.out.println(team + " " + totalSalaryValue);
}
}
需要特别注意的是,排序是在TermAggregation處執行的,Order.aggregation函數的第一個參數是aggregation的名字,第二個參數是boolean型,true表示正序,false表示倒序。
- Aggregation結果條數的問題
預設情況下,search執行後,僅傳回10條聚合結果,如果想反悔更多的結果,需要在建構TermsBuilder 時指定size:
TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);
- Aggregation結果的解析/輸出
得到response後:
Map<String, Aggregation> aggMap = response.getAggregations().asMap();
StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg");
Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();
while (teamBucketIt .hasNext()) {
Bucket buck = teamBucketIt .next();
//球隊名
String team = buck.getKey();
//記錄數
long count = buck.getDocCount();
//得到所有子聚合
Map subaggmap = buck.getAggregations().asMap();
//avg值擷取方法
double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue();
//sum值擷取方法
double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue();
//...
//max/min以此類推
}
- 總結
綜上,聚合操作主要是調用了SearchRequestBuilder的addAggregation方法,通常是傳入一個TermsBuilder,子聚合調用TermsBuilder的subAggregation方法,可以添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常見的聚合操作。
從實作上來講,SearchRequestBuilder在内部保持了一個私有的 SearchSourceBuilder執行個體, SearchSourceBuilder内部包含一個List<AbstractAggregationBuilder>,每次調用addAggregation時會調用 SearchSourceBuilder執行個體,添加一個AggregationBuilder。
同樣的,TermsBuilder也在内部保持了一個List<AbstractAggregationBuilder>,調用addAggregation方法(來自父類addAggregation)時會添加一個AggregationBuilder。有興趣的讀者也可以閱讀源碼的實作。
參考來源:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/index.html ElasticSearch java API - 聚合查詢 - Elastic 中文社群