ES-Hadoop是連接配接快速查詢和大資料分析的橋梁,它能夠無間隙的在Hadoop和ElasticSearch上移動資料。ES Hadoop索引Hadoop資料到Elasticsearch,充分利用其查詢速度,大量聚合能力來使它比以往更快,同時可以使用HDFS作為Elasticsearch長期存檔。ES-Hadoop可以本地內建Hadoop生态系統上的很多流行元件,比如Spark、Hive、Pig、Storm、MapReduce等。官方有張圖可以很好說明
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CNyIjN1M2MzMTOjN2NzITZyYzX4IjNxUTM4IzLcdDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
下面直接看一個簡單的ES與Hadoop之間資料移動的執行個體
項目依賴的jar包如下
[plain]
view plain
copy
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-hadoop</artifactId>
- <version>2.3.2</version>
- </dependency>
ElasticSearch到Hadoop最簡單的執行個體
[java]
view plain
copy
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.elasticsearch.hadoop.mr.EsInputFormat;
- import org.elasticsearch.hadoop.mr.LinkedMapWritable;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class E2HJob01 {
- private static Logger LOG = LoggerFactory.getLogger(E2HJob01.class);
- public static void main(String args[]) {
- try {
- new Configuration();
- "mapreduce.map.speculative", false);
- "mapreduce.reduce.speculative", false);
- //ElasticSearch節點
- "es.nodes", "centos.host1:9200");
- //ElaticSearch Index/Type
- "es.resource", "job/51/");
- new GenericOptionsParser(conf, args).getRemainingArgs();
- if (oArgs.length != 1) {
- "error");
- 2);
- }
- "51JOBE2H01");
- class);
- class);
- class);
- class);
- class);
- new Path(oArgs[0]));
- true));
- catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
- class E2HMapper01 extends Mapper<Text, LinkedMapWritable, Text, LinkedMapWritable> {
- private static final Logger LOG = LoggerFactory.getLogger(E2HMapper01.class);
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- }
- @Override
- protected void map(Text key, LinkedMapWritable value, Context context)
- throws IOException, InterruptedException {
- "key {} value {}", key, value);
- context.write(key, value);
- }
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- super.cleanup(context);
- }
- }
hadoop jar eshadoop.jar E2HJob01 /user/data/es/job/
從hadoop上的資料檔案可以看到第一列是ES的doc id,第二列是doc data
也可以添加ES查詢條件,執行個體如下
[java]
view plain
copy
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Map.Entry;
- import org.apache.commons.lang.StringUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.elasticsearch.hadoop.mr.EsInputFormat;
- import org.elasticsearch.hadoop.mr.LinkedMapWritable;
- import org.platform.eshadoop.modules.examples.writable.JobWritable;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class E2HJob02 {
- private static Logger LOG = LoggerFactory.getLogger(E2HJob02.class);
- public static void main(String args[]) {
- try {
- new Configuration();
- "mapreduce.map.speculative", false);
- "mapreduce.reduce.speculative", false);
- "es.nodes", "centos.host1:9200");
- "es.resource", "job/51/");
- "es.query", "?q=高*");
- new GenericOptionsParser(conf, args).getRemainingArgs();
- if (oArgs.length != 1) {
- "error");
- 2);
- }
- "51JOBE2H02");
- class);
- class);
- class);
- class);
- class);
- new Path(oArgs[0]));
- true));
- catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
- class E2HMapper02 extends Mapper<Text, LinkedMapWritable, NullWritable, JobWritable> {
- private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- }
- @Override
- protected void map(Text key, LinkedMapWritable value, Context context)
- throws IOException, InterruptedException {
- new JobWritable();
- writable.setId(key);
- new HashMap<String, String>();
- for (Entry<Writable, Writable> entry : value.entrySet()) {
- "key {} value {}", entry.getKey(), entry.getValue());
- map.put(entry.getKey().toString(), entry.getValue().toString());
- }
- "jobName");
- if (StringUtils.isNotBlank(jobName)) {
- new Text(jobName));
- }
- "jobUrl");
- if (StringUtils.isNotBlank(jobUrl)) {
- new Text(jobUrl));
- }
- "companyName");
- if (StringUtils.isNotBlank(companyName)) {
- new Text(companyName));
- }
- "companyUrl");
- if (StringUtils.isNotBlank(companyUrl)) {
- new Text(companyUrl));
- }
- "salary");
- if (StringUtils.isNotBlank(salary)) {
- new Text(salary));
- }
- "workPlace");
- if (StringUtils.isNotBlank(workPlace)) {
- new Text(workPlace));
- }
- "contact");
- if (StringUtils.isNotBlank(contact)) {
- new Text(contact));
- }
- "welfare");
- if (StringUtils.isNotBlank(welfare)) {
- new Text(welfare));
- }
- context.write(NullWritable.get(), writable);
- }
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- super.cleanup(context);
- }
- }
[java]
view plain
copy
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- public class JobWritable implements Writable, Cloneable {
- private Text id = null;
- private Text jobName = null;
- private Text jobUrl = null;
- private Text companyName = null;
- private Text companyUrl = null;
- private Text salary = null;
- private Text workPlace = null;
- private Text contact = null;
- private Text welfare = null;
- public JobWritable() {
- new Text();
- new Text();
- new Text();
- new Text();
- new Text();
- new Text();
- new Text();
- new Text();
- new Text();
- }
- public void readFields(DataInput dataInput) throws IOException {
- id.readFields(dataInput);
- jobName.readFields(dataInput);
- jobUrl.readFields(dataInput);
- companyName.readFields(dataInput);
- companyUrl.readFields(dataInput);
- salary.readFields(dataInput);
- workPlace.readFields(dataInput);
- contact.readFields(dataInput);
- welfare.readFields(dataInput);
- }
- public void write(DataOutput dataOutput) throws IOException {
- id.write(dataOutput);
- jobName.write(dataOutput);
- jobUrl.write(dataOutput);
- companyName.write(dataOutput);
- companyUrl.write(dataOutput);
- salary.write(dataOutput);
- workPlace.write(dataOutput);
- contact.write(dataOutput);
- welfare.write(dataOutput);
- }
- public Text getId() {
- return id;
- }
- public void setId(Text id) {
- this.id = id;
- }
- public Text getJobName() {
- return jobName;
- }
- public void setJobName(Text jobName) {
- this.jobName = jobName;
- }
- public Text getJobUrl() {
- return jobUrl;
- }
- public void setJobUrl(Text jobUrl) {
- this.jobUrl = jobUrl;
- }
- public Text getCompanyName() {
- return companyName;
- }
- public void setCompanyName(Text companyName) {
- this.companyName = companyName;
- }
- public Text getCompanyUrl() {
- return companyUrl;
- }
- public void setCompanyUrl(Text companyUrl) {
- this.companyUrl = companyUrl;
- }
- public Text getSalary() {
- return salary;
- }
- public void setSalary(Text salary) {
- this.salary = salary;
- }
- public Text getWorkPlace() {
- return workPlace;
- }
- public void setWorkPlace(Text workPlace) {
- this.workPlace = workPlace;
- }
- public Text getContact() {
- return contact;
- }
- public void setContact(Text contact) {
- this.contact = contact;
- }
- public Text getWelfare() {
- return welfare;
- }
- public void setWelfare(Text welfare) {
- this.welfare = welfare;
- }
- @Override
- public String toString() {
- return id + ":" + jobName + ":" + jobUrl + ":" + companyName + ":" + companyUrl +
- ":" + salary + ":" + workPlace + ":" + contact + ":" + welfare;
- }
- }
下面這個執行個體是每行直接以json格式存儲在hadoop上
[java]
view plain
copy
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Map.Entry;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.elasticsearch.hadoop.mr.EsInputFormat;
- import org.elasticsearch.hadoop.mr.LinkedMapWritable;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.gson.Gson;
- public class E2HJob03 {
- private static Logger LOG = LoggerFactory.getLogger(E2HJob03.class);
- public static void main(String args[]) {
- try {
- new Configuration();
- "mapreduce.map.speculative", false);
- "mapreduce.reduce.speculative", false);
- "es.nodes", "centos.host1:9200");
- "es.resource", "job/51/");
- new GenericOptionsParser(conf, args).getRemainingArgs();
- if (oArgs.length != 1) {
- "error");
- 2);
- }
- "51JOBE2H03");
- class);
- class);
- class);
- class);
- class);
- new Path(oArgs[0]));
- true));
- catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
- class E2HMapper03 extends Mapper<Text, LinkedMapWritable, NullWritable, Text> {
- private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);
- private Gson gson = null;
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- new Gson();
- }
- @Override
- protected void map(Text key, LinkedMapWritable value, Context context)
- throws IOException, InterruptedException {
- new JobInfo();
- jobInfo.setId(key.toString());
- new HashMap<String, String>();
- for (Entry<Writable, Writable> entry : value.entrySet()) {
- "key {} value {}", entry.getKey(), entry.getValue());
- map.put(entry.getKey().toString(), entry.getValue().toString());
- }
- "jobName"));
- "jobUrl"));
- "companyName"));
- "companyUrl"));
- "salary"));
- "workPlace"));
- "contact"));
- "welfare"));
- new Text(gson.toJson(jobInfo)));
- }
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- super.cleanup(context);
- }
- }
- class JobInfo {
- private String id = null;
- private String jobName = null;
- private String jobUrl = null;
- private String companyName = null;
- private String companyUrl = null;
- private String salary = null;
- private String workPlace = null;
- private String contact = null;
- private String welfare = null;
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
- public String getJobName() {
- return jobName;
- }
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
- public String getJobUrl() {
- return jobUrl;
- }
- public void setJobUrl(String jobUrl) {
- this.jobUrl = jobUrl;
- }
- public String getCompanyName() {
- return companyName;
- }
- public void setCompanyName(String companyName) {
- this.companyName = companyName;
- }
- public String getCompanyUrl() {
- return companyUrl;
- }
- public void setCompanyUrl(String companyUrl) {
- this.companyUrl = companyUrl;
- }
- public String getSalary() {
- return salary;
- }
- public void setSalary(String salary) {
- this.salary = salary;
- }
- public String getWorkPlace() {
- return workPlace;
- }
- public void setWorkPlace(String workPlace) {
- this.workPlace = workPlace;
- }
- public String getContact() {
- return contact;
- }
- public void setContact(String contact) {
- this.contact = contact;
- }
- public String getWelfare() {
- return welfare;
- }
- public void setWelfare(String welfare) {
- this.welfare = welfare;
- }
- }
接下來的執行個體是将hadoop上的資料移動到ElasticSearch上索引,這裡直接用上面存儲的JSON資料試驗
[java]
view plain
copy
1. import java.io.IOException;
2.
3. import org.apache.hadoop.conf.Configuration;
4. import org.apache.hadoop.fs.Path;
5. import org.apache.hadoop.io.LongWritable;
6. import org.apache.hadoop.io.NullWritable;
7. import org.apache.hadoop.io.Text;
8. import org.apache.hadoop.io.Writable;
9. import org.apache.hadoop.mapreduce.Job;
10. import org.apache.hadoop.mapreduce.Mapper;
11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12. import org.apache.hadoop.util.GenericOptionsParser;
13. import org.elasticsearch.hadoop.mr.EsOutputFormat;
14. import org.slf4j.Logger;
15. import org.slf4j.LoggerFactory;
16.
17. public class H2EJob {
18.
19. private static Logger LOG = LoggerFactory.getLogger(H2EJob.class);
20.
21. public static void main(String args[]) {
22. try {
23. new Configuration();
24. "mapreduce.map.speculative", false);
25. "mapreduce.reduce.speculative", false);
26. "es.nodes", "centos.host1:9200");
27. "es.resource", "job1/51");
28. //Hadoop上的資料格式為JSON,可以直接導入
29. "es.input.json", "yes");
30. new GenericOptionsParser(conf, args).getRemainingArgs();
31. if (oArgs.length != 1) {
32. "error");
33. 2);
34. }
35. "51JOBH2E");
36. class);
37. class);
38. class);
39. class);
40. class);
41.
42. new Path(oArgs[0]));
43.
44. true));
45. catch (Exception e) {
46. LOG.error(e.getMessage(), e);
47. }
48. }
49.
50. }
51.
52. class H2EMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
53.
54. @Override
55. protected void setup(Context context) throws IOException, InterruptedException {
56. super.setup(context);
57. }
58.
59. @Override
60. public void run(Context context) throws IOException, InterruptedException {
61. super.run(context);
62. }
63.
64. @Override
65. protected void map(LongWritable key, Text value, Context context)
66. throws IOException, InterruptedException {
67. context.write(NullWritable.get(), value);
68. }
69.
70. @Override
71. protected void cleanup(Context context) throws IOException,InterruptedException {
72. super.cleanup(context);
73. }
74.
75. }