天天看點

在 EMR 中使用 ES-Hadoop在 EMR 中使用 ES-Hadoop

在 EMR 中使用 ES-Hadoop

ES-Hadoop 是 Elasticsearch(ES) 推出的專門用于對接 Hadoop 生态的工具,使得使用者可以使用 Mapreduce(MR)、Spark、Hive 等工具處理 ES 上的資料(ES-Hadoop 還包含另外一部分:将 ES 的索引 snapshot 到 HDFS,對于該内容本文暫不讨論)。衆所周知,Hadoop 生态的長處是處理大規模資料集,但是其缺點也很明顯,就是當用于互動式分析時,查詢時延會比較長。而 ES 是這方面的好手,對于很多查詢類型,特别是 ad-hoc 查詢,基本可以做到秒級。ES-Hadoop 的推出提供了一種組合兩者優勢的可能性。使用 ES-Hadoop,使用者隻需要對自己代碼做出很小的改動,即可以快速處理存儲在 ES 中的資料,并且能夠享受到 ES 帶來的加速效果。

ES-Hadoop 的邏輯是将 ES 作為 MR/Spark/Hive 等資料處理引擎的“資料源”,在計算存儲分離的架構中扮演存儲的角色。這和 MR/Spark/Hive 的其他資料源并無差異。但相對于其他資料源, ES 具有更快的資料選擇過濾能力。這種能力正是分析引擎最為關鍵的能力之一。

EMR 中已經添加了對 ES-Hadoop 的支援,使用者不需要做任何配置即可使用 ES-Hadoop。下面我們通過幾個例子,介紹如何在 EMR 中使用 ES-Hadoop。

準備

ES 有自動建立索引的功能,能夠根據輸入資料自動推測資料類型。這個功能在某些情況下很友善,避免了使用者很多額外的操作,但是也産生了一些問題。最重要的問題是 ES 推測的類型和我們預期的類型不一緻。比如我們定義了一個字段叫 age,INT 型,在 ES 索引中可能被索引成了 LONG 型。在執行一些操作的時候會帶來類型轉換問題。為此,我們建議手動建立索引。

在下面幾個例子中,我們将使用同一個索引 company 和一個類型 employees(ES 索引可以看成一個 database,類型可以看做 database 下的一張表),該類型定義了四個字段(字段類型均為 ES 定義的類型):

{
  "id": long,
  "name": text,
  "age": integer,
  "birth": date
}           

在 kibana 中運作如下指令建立索引(或用相應的 curl 指令)

PUT company
{
  "mappings": {
    "employees": {
      "properties": {
        "id": {
          "type": "long"
        },
        "name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "birth": {
          "type": "date"
        },
        "addr": {
          "type": "text"
        }
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "5",
      "number_of_replicas": "1"
    }
  }
}           

其中 settings 中的索引參數可根據需要設定,也可以不具體設定 settings。

準備一個檔案,每一行為一個 json 對象,如下所示,

{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
{"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
{"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}           

并儲存至 HDFS 指定目錄(如 "/es-hadoop/employees.txt")。

Mapreduce

在下面這個例子中,我們讀取 hdfs 上 /es-hadoop 目錄下的 json 檔案,并将這些 json 檔案中的每一行作為一個 document 寫入 es。寫入過程由

EsOutputFormat

在 map 階段完成。

這裡對 ES 的設定主要是如下幾個選項

  • es.nodes: ES 節點,為 host:port 格式。對于阿裡雲托管式 ES,此處應為阿裡雲提供的 ES 通路域名
  • es.net.http.auth.user: 使用者名
  • es.net.http.auth.pass: 使用者密碼
  • es.nodes.wan.only: 對于阿裡雲托管式 ES,此處應設定為 true
  • es.resource: ES 索引和類型
  • es.input.json: 如果原始檔案為 json 類型,設定為 true,否則,需要在 map 函數中自己解析原始資料,生成相應的 Writable 輸出

注意:

  • 關閉 map 和 reduce 的推測執行機制
package com.aliyun.emr;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class Test implements Tool {

  private Configuration conf;

  @Override
  public int run(String[] args) throws Exception {

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    conf.setBoolean("mapreduce.map.speculative", false);
    conf.setBoolean("mapreduce.reduce.speculative", false);
    conf.set("es.nodes", "<your_es_host>:9200");
    conf.set("es.net.http.auth.user", "<your_username>");
    conf.set("es.net.http.auth.pass", "<your_password>");
    conf.set("es.nodes.wan.only", "true");
    conf.set("es.resource", "company/employees");
    conf.set("es.input.json", "yes");

    Job job = Job.getInstance(conf);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(EsOutputFormat.class);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setJarByClass(Test.class);
    job.setMapperClass(EsMapper.class);

    FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

    return job.waitForCompletion(true) ? 0 : 1;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
    private Text doc = new Text();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      if (value.getLength() > 0) {
        doc.set(value);
        context.write(NullWritable.get(), doc);
      }
    }
  }

  public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new Test(), args);
    System.exit(ret);
  }
}
           

将該代碼編譯打包為 mr-test.jar, 上傳至裝有 emr 用戶端的機器(如 gateway,或者 EMR cluster 任意一台機器)。

在裝有 EMR 用戶端的機器上運作如下指令執行 mapreduce 程式:

hadoop jar mr-test.jar com.aliyun.emr.Test -Dmapreduce.job.reduces=0 -libjars mr-test.jar /es-hadoop           

即可完成向 ES 寫資料。具體寫入的資料可以通過 kibana 查詢(或者通過相應的 curl 指令):

GET
{
  "query": {
    "match_all": {}
  }
}           

Spark

本示例同 mapreduce 一樣,也是向 ES 的一個索引寫入資料,隻不過是通過 spark 來執行。這裡 spark 借助 JavaEsSpark 類将一份 RDD 持久化到 es。同上述 mapreduce 程式一樣,使用者也需要注意幾個選項的設定。

package com.aliyun.emr;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.spark_project.guava.collect.ImmutableMap;

public class Test {

  public static void main(String[] args) {
    SparkConf conf = new SparkConf();
    conf.setAppName("Es-test");
    conf.set("es.nodes", "<your_es_host>:9200");
    conf.set("es.net.http.auth.user", "<your_username>");
    conf.set("es.net.http.auth.pass", "<your_password>");
    conf.set("es.nodes.wan.only", "true");

    SparkSession ss = new SparkSession(new SparkContext(conf));
    final AtomicInteger employeesNo = new AtomicInteger(0);
    JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("hdfs://emr-header-1:9000/es-hadoop/employees.txt")
        .javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees" + employeesNo.getAndAdd(1), row.mkString()));

    JavaEsSpark.saveToEs(javaRDD, "company/employees");
  }
}           

将其打包成 spark-test.jar,運作如下指令執行寫入過程

spark-submit --master yarn --class com.aliyun.emr.Test spark-test.jar           

待任務執行完畢後可以使用 kibana 或者 curl 指令查詢結果。

除了 spark rdd 操作,es-hadoop 還提供了使用 sparksql 來讀寫 ES。詳細請參考 ES-Hadoop

官方頁面

Hive

這裡展示使用 Hive 通過 SQL 來讀寫 ES 的方法。

首先運作

hive

指令進入互動式環境,先建立一個表

CREATE DATABASE IF NOT EXISTS company;           

之後建立一個外部表,表存儲在 ES 上,通過 TBLPROPERTIES 來設定對接 ES 的各個選項:

CREATE EXTERNAL table IF NOT EXISTS employees(
  id BIGINT,
  name STRING,
  birth TIMESTAMP,
  addr STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource' = 'tpcds/ss',
    'es.nodes' = '<your_es_host>',
    'es.net.http.auth.user' = '<your_username>',
    'es.net.http.auth.pass' = '<your_password>',
    'es.nodes.wan.only' = 'true',
    'es.resource' = 'company/employees'
);           

注意在 Hive 表中我們将 birth 設定成了 TIMESTAMP 類型,而在 ES 中我們将其設定成了 DATE 型。這是因為 Hive 和 ES 對于資料格式處理不一緻。在寫入時,Hive 将原始 date 轉換後發送給 ES 可能會解析失敗,相反在讀取時,ES 傳回的格式 Hive 也可能解析失敗。參見

這裡

往表中插入一些資料:

INSERT INTO TABLE employees VALUES (1, "zhangsan", "1990-01-01","No.969, wenyixi Rd, yuhang, hangzhou");
INSERT INTO TABLE employees VALUES (2, "lisi", "1991-01-01", "No.556, xixi Rd, xihu, hangzhou");
INSERT INTO TABLE employees VALUES (3, "wangwu", "1992-01-01", "No.699 wangshang Rd, binjiang, hangzhou");           

執行查詢即可看到結果:

SELECT * FROM employees LIMIT 100;
OK
1    zhangsan    1990-01-01    No.969, wenyixi Rd, yuhang, hangzhou
2    lisi    1991-01-01    No.556, xixi Rd, xihu, hangzhou
3    wangwu    1992-01-01    No.699 wangshang Rd, binjiang, hangzhou