在 EMR 中使用 ES-Hadoop
ES-Hadoop 是 Elasticsearch(ES) 推出的專門用于對接 Hadoop 生态的工具,使得使用者可以使用 Mapreduce(MR)、Spark、Hive 等工具處理 ES 上的資料(ES-Hadoop 還包含另外一部分:将 ES 的索引 snapshot 到 HDFS,對于該内容本文暫不讨論)。衆所周知,Hadoop 生态的長處是處理大規模資料集,但是其缺點也很明顯,就是當用于互動式分析時,查詢時延會比較長。而 ES 是這方面的好手,對于很多查詢類型,特别是 ad-hoc 查詢,基本可以做到秒級。ES-Hadoop 的推出提供了一種組合兩者優勢的可能性。使用 ES-Hadoop,使用者隻需要對自己代碼做出很小的改動,即可以快速處理存儲在 ES 中的資料,并且能夠享受到 ES 帶來的加速效果。
ES-Hadoop 的邏輯是将 ES 作為 MR/Spark/Hive 等資料處理引擎的“資料源”,在計算存儲分離的架構中扮演存儲的角色。這和 MR/Spark/Hive 的其他資料源并無差異。但相對于其他資料源, ES 具有更快的資料選擇過濾能力。這種能力正是分析引擎最為關鍵的能力之一。
EMR 中已經添加了對 ES-Hadoop 的支援,使用者不需要做任何配置即可使用 ES-Hadoop。下面我們通過幾個例子,介紹如何在 EMR 中使用 ES-Hadoop。
準備
ES 有自動建立索引的功能,能夠根據輸入資料自動推測資料類型。這個功能在某些情況下很友善,避免了使用者很多額外的操作,但是也産生了一些問題。最重要的問題是 ES 推測的類型和我們預期的類型不一緻。比如我們定義了一個字段叫 age,INT 型,在 ES 索引中可能被索引成了 LONG 型。在執行一些操作的時候會帶來類型轉換問題。為此,我們建議手動建立索引。
在下面幾個例子中,我們将使用同一個索引 company 和一個類型 employees(ES 索引可以看成一個 database,類型可以看做 database 下的一張表),該類型定義了四個字段(字段類型均為 ES 定義的類型):
{
"id": long,
"name": text,
"age": integer,
"birth": date
}
在 kibana 中運作如下指令建立索引(或用相應的 curl 指令)
PUT company
{
"mappings": {
"employees": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"birth": {
"type": "date"
},
"addr": {
"type": "text"
}
}
}
},
"settings": {
"index": {
"number_of_shards": "5",
"number_of_replicas": "1"
}
}
}
其中 settings 中的索引參數可根據需要設定,也可以不具體設定 settings。
準備一個檔案,每一行為一個 json 對象,如下所示,
{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
{"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
{"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
并儲存至 HDFS 指定目錄(如 "/es-hadoop/employees.txt")。
Mapreduce
在下面這個例子中,我們讀取 hdfs 上 /es-hadoop 目錄下的 json 檔案,并将這些 json 檔案中的每一行作為一個 document 寫入 es。寫入過程由
EsOutputFormat
在 map 階段完成。
這裡對 ES 的設定主要是如下幾個選項
- es.nodes: ES 節點,為 host:port 格式。對于阿裡雲托管式 ES,此處應為阿裡雲提供的 ES 通路域名
- es.net.http.auth.user: 使用者名
- es.net.http.auth.pass: 使用者密碼
- es.nodes.wan.only: 對于阿裡雲托管式 ES,此處應設定為 true
- es.resource: ES 索引和類型
- es.input.json: 如果原始檔案為 json 類型,設定為 true,否則,需要在 map 函數中自己解析原始資料,生成相應的 Writable 輸出
注意:
- 關閉 map 和 reduce 的推測執行機制
package com.aliyun.emr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
public class Test implements Tool {
private Configuration conf;
@Override
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.set("es.nodes", "<your_es_host>:9200");
conf.set("es.net.http.auth.user", "<your_username>");
conf.set("es.net.http.auth.pass", "<your_password>");
conf.set("es.nodes.wan.only", "true");
conf.set("es.resource", "company/employees");
conf.set("es.input.json", "yes");
Job job = Job.getInstance(conf);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setJarByClass(Test.class);
job.setMapperClass(EsMapper.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
private Text doc = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (value.getLength() > 0) {
doc.set(value);
context.write(NullWritable.get(), doc);
}
}
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new Test(), args);
System.exit(ret);
}
}
将該代碼編譯打包為 mr-test.jar, 上傳至裝有 emr 用戶端的機器(如 gateway,或者 EMR cluster 任意一台機器)。
在裝有 EMR 用戶端的機器上運作如下指令執行 mapreduce 程式:
hadoop jar mr-test.jar com.aliyun.emr.Test -Dmapreduce.job.reduces=0 -libjars mr-test.jar /es-hadoop
即可完成向 ES 寫資料。具體寫入的資料可以通過 kibana 查詢(或者通過相應的 curl 指令):
GET
{
"query": {
"match_all": {}
}
}
Spark
本示例同 mapreduce 一樣,也是向 ES 的一個索引寫入資料,隻不過是通過 spark 來執行。這裡 spark 借助 JavaEsSpark 類将一份 RDD 持久化到 es。同上述 mapreduce 程式一樣,使用者也需要注意幾個選項的設定。
package com.aliyun.emr;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.spark_project.guava.collect.ImmutableMap;
public class Test {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("Es-test");
conf.set("es.nodes", "<your_es_host>:9200");
conf.set("es.net.http.auth.user", "<your_username>");
conf.set("es.net.http.auth.pass", "<your_password>");
conf.set("es.nodes.wan.only", "true");
SparkSession ss = new SparkSession(new SparkContext(conf));
final AtomicInteger employeesNo = new AtomicInteger(0);
JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("hdfs://emr-header-1:9000/es-hadoop/employees.txt")
.javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees" + employeesNo.getAndAdd(1), row.mkString()));
JavaEsSpark.saveToEs(javaRDD, "company/employees");
}
}
将其打包成 spark-test.jar,運作如下指令執行寫入過程
spark-submit --master yarn --class com.aliyun.emr.Test spark-test.jar
待任務執行完畢後可以使用 kibana 或者 curl 指令查詢結果。
除了 spark rdd 操作,es-hadoop 還提供了使用 sparksql 來讀寫 ES。詳細請參考 ES-Hadoop
官方頁面。
Hive
這裡展示使用 Hive 通過 SQL 來讀寫 ES 的方法。
首先運作
hive
指令進入互動式環境,先建立一個表
CREATE DATABASE IF NOT EXISTS company;
之後建立一個外部表,表存儲在 ES 上,通過 TBLPROPERTIES 來設定對接 ES 的各個選項:
CREATE EXTERNAL table IF NOT EXISTS employees(
id BIGINT,
name STRING,
birth TIMESTAMP,
addr STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'tpcds/ss',
'es.nodes' = '<your_es_host>',
'es.net.http.auth.user' = '<your_username>',
'es.net.http.auth.pass' = '<your_password>',
'es.nodes.wan.only' = 'true',
'es.resource' = 'company/employees'
);
注意在 Hive 表中我們将 birth 設定成了 TIMESTAMP 類型,而在 ES 中我們将其設定成了 DATE 型。這是因為 Hive 和 ES 對于資料格式處理不一緻。在寫入時,Hive 将原始 date 轉換後發送給 ES 可能會解析失敗,相反在讀取時,ES 傳回的格式 Hive 也可能解析失敗。參見
這裡往表中插入一些資料:
INSERT INTO TABLE employees VALUES (1, "zhangsan", "1990-01-01","No.969, wenyixi Rd, yuhang, hangzhou");
INSERT INTO TABLE employees VALUES (2, "lisi", "1991-01-01", "No.556, xixi Rd, xihu, hangzhou");
INSERT INTO TABLE employees VALUES (3, "wangwu", "1992-01-01", "No.699 wangshang Rd, binjiang, hangzhou");
執行查詢即可看到結果:
SELECT * FROM employees LIMIT 100;
OK
1 zhangsan 1990-01-01 No.969, wenyixi Rd, yuhang, hangzhou
2 lisi 1991-01-01 No.556, xixi Rd, xihu, hangzhou
3 wangwu 1992-01-01 No.699 wangshang Rd, binjiang, hangzhou