天天看點

ES-Hadoop學習筆記-初識

ES-Hadoop是連接配接快速查詢和大資料分析的橋梁,它能夠無間隙的在Hadoop和ElasticSearch上移動資料。ES Hadoop索引Hadoop資料到Elasticsearch,充分利用其查詢速度,大量聚合能力來使它比以往更快,同時可以使用HDFS作為Elasticsearch長期存檔。ES-Hadoop可以本地內建Hadoop生态系統上的很多流行元件,比如Spark、Hive、Pig、Storm、MapReduce等。官方有張圖可以很好說明

ES-Hadoop學習筆記-初識

下面直接看一個簡單的ES與Hadoop之間資料移動的執行個體

項目依賴的jar包如下

[plain] 

​​view plain​​​

 ​​​copy​​

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-hadoop</artifactId>
  4. <version>2.3.2</version>
  5. </dependency>

ElasticSearch到Hadoop最簡單的執行個體

[java] 

​​view plain​​​

 ​​​copy​​

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import org.apache.hadoop.util.GenericOptionsParser;
  9. import org.elasticsearch.hadoop.mr.EsInputFormat;
  10. import org.elasticsearch.hadoop.mr.LinkedMapWritable;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. public class E2HJob01 {
  14. private static Logger LOG = LoggerFactory.getLogger(E2HJob01.class);
  15. public static void main(String args[]) {
  16. try {
  17. new Configuration();
  18. "mapreduce.map.speculative", false);
  19. "mapreduce.reduce.speculative", false);
  20. //ElasticSearch節點
  21. "es.nodes", "centos.host1:9200");
  22. //ElaticSearch Index/Type
  23. "es.resource", "job/51/");
  24. new GenericOptionsParser(conf, args).getRemainingArgs();
  25. if (oArgs.length != 1) {
  26. "error");
  27. 2);
  28. }
  29. "51JOBE2H01");
  30. class);
  31. class);
  32. class);
  33. class);
  34. class);
  35. new Path(oArgs[0]));
  36. true));
  37. catch (Exception e) {
  38. LOG.error(e.getMessage(), e);
  39. }
  40. }
  41. }
  42. class E2HMapper01 extends Mapper<Text, LinkedMapWritable, Text, LinkedMapWritable> {
  43. private static final Logger LOG = LoggerFactory.getLogger(E2HMapper01.class);
  44. @Override
  45. protected void setup(Context context) throws IOException, InterruptedException {
  46. super.setup(context);
  47. }
  48. @Override
  49. protected void map(Text key, LinkedMapWritable value, Context context)
  50. throws IOException, InterruptedException {
  51. "key {} value {}", key, value);
  52. context.write(key, value);
  53. }
  54. @Override
  55. protected void cleanup(Context context) throws IOException, InterruptedException {
  56. super.cleanup(context);
  57. }
  58. }

hadoop jar eshadoop.jar E2HJob01 /user/data/es/job/

從hadoop上的資料檔案可以看到第一列是ES的doc id,第二列是doc data

也可以添加ES查詢條件,執行個體如下

[java] 

​​view plain​​​

 ​​​copy​​

  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Map.Entry;
  5. import org.apache.commons.lang.StringUtils;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.io.Writable;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.GenericOptionsParser;
  15. import org.elasticsearch.hadoop.mr.EsInputFormat;
  16. import org.elasticsearch.hadoop.mr.LinkedMapWritable;
  17. import org.platform.eshadoop.modules.examples.writable.JobWritable;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. public class E2HJob02 {
  21. private static Logger LOG = LoggerFactory.getLogger(E2HJob02.class);
  22. public static void main(String args[]) {
  23. try {
  24. new Configuration();
  25. "mapreduce.map.speculative", false);
  26. "mapreduce.reduce.speculative", false);
  27. "es.nodes", "centos.host1:9200");
  28. "es.resource", "job/51/");
  29. "es.query", "?q=高*");
  30. new GenericOptionsParser(conf, args).getRemainingArgs();
  31. if (oArgs.length != 1) {
  32. "error");
  33. 2);
  34. }
  35. "51JOBE2H02");
  36. class);
  37. class);
  38. class);
  39. class);
  40. class);
  41. new Path(oArgs[0]));
  42. true));
  43. catch (Exception e) {
  44. LOG.error(e.getMessage(), e);
  45. }
  46. }
  47. }
  48. class E2HMapper02 extends Mapper<Text, LinkedMapWritable, NullWritable, JobWritable> {
  49. private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);
  50. @Override
  51. protected void setup(Context context) throws IOException, InterruptedException {
  52. super.setup(context);
  53. }
  54. @Override
  55. protected void map(Text key, LinkedMapWritable value, Context context)
  56. throws IOException, InterruptedException {
  57. new JobWritable();
  58. writable.setId(key);
  59. new HashMap<String, String>();
  60. for (Entry<Writable, Writable> entry : value.entrySet()) {
  61. "key {} value {}", entry.getKey(), entry.getValue());
  62. map.put(entry.getKey().toString(), entry.getValue().toString());
  63. }
  64. "jobName");
  65. if (StringUtils.isNotBlank(jobName)) {
  66. new Text(jobName));
  67. }
  68. "jobUrl");
  69. if (StringUtils.isNotBlank(jobUrl)) {
  70. new Text(jobUrl));
  71. }
  72. "companyName");
  73. if (StringUtils.isNotBlank(companyName)) {
  74. new Text(companyName));
  75. }
  76. "companyUrl");
  77. if (StringUtils.isNotBlank(companyUrl)) {
  78. new Text(companyUrl));
  79. }
  80. "salary");
  81. if (StringUtils.isNotBlank(salary)) {
  82. new Text(salary));
  83. }
  84. "workPlace");
  85. if (StringUtils.isNotBlank(workPlace)) {
  86. new Text(workPlace));
  87. }
  88. "contact");
  89. if (StringUtils.isNotBlank(contact)) {
  90. new Text(contact));
  91. }
  92. "welfare");
  93. if (StringUtils.isNotBlank(welfare)) {
  94. new Text(welfare));
  95. }
  96. context.write(NullWritable.get(), writable);
  97. }
  98. @Override
  99. protected void cleanup(Context context) throws IOException, InterruptedException {
  100. super.cleanup(context);
  101. }
  102. }

[java] 

​​view plain​​​

 ​​​copy​​

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.io.Writable;
  6. public class JobWritable implements Writable, Cloneable {
  7. private Text id = null;
  8. private Text jobName = null;
  9. private Text jobUrl = null;
  10. private Text companyName = null;
  11. private Text companyUrl = null;
  12. private Text salary = null;
  13. private Text workPlace = null;
  14. private Text contact = null;
  15. private Text welfare = null;
  16. public JobWritable() {
  17. new Text();
  18. new Text();
  19. new Text();
  20. new Text();
  21. new Text();
  22. new Text();
  23. new Text();
  24. new Text();
  25. new Text();
  26. }
  27. public void readFields(DataInput dataInput) throws IOException {
  28. id.readFields(dataInput);
  29. jobName.readFields(dataInput);
  30. jobUrl.readFields(dataInput);
  31. companyName.readFields(dataInput);
  32. companyUrl.readFields(dataInput);
  33. salary.readFields(dataInput);
  34. workPlace.readFields(dataInput);
  35. contact.readFields(dataInput);
  36. welfare.readFields(dataInput);
  37. }
  38. public void write(DataOutput dataOutput) throws IOException {
  39. id.write(dataOutput);
  40. jobName.write(dataOutput);
  41. jobUrl.write(dataOutput);
  42. companyName.write(dataOutput);
  43. companyUrl.write(dataOutput);
  44. salary.write(dataOutput);
  45. workPlace.write(dataOutput);
  46. contact.write(dataOutput);
  47. welfare.write(dataOutput);
  48. }
  49. public Text getId() {
  50. return id;
  51. }
  52. public void setId(Text id) {
  53. this.id = id;
  54. }
  55. public Text getJobName() {
  56. return jobName;
  57. }
  58. public void setJobName(Text jobName) {
  59. this.jobName = jobName;
  60. }
  61. public Text getJobUrl() {
  62. return jobUrl;
  63. }
  64. public void setJobUrl(Text jobUrl) {
  65. this.jobUrl = jobUrl;
  66. }
  67. public Text getCompanyName() {
  68. return companyName;
  69. }
  70. public void setCompanyName(Text companyName) {
  71. this.companyName = companyName;
  72. }
  73. public Text getCompanyUrl() {
  74. return companyUrl;
  75. }
  76. public void setCompanyUrl(Text companyUrl) {
  77. this.companyUrl = companyUrl;
  78. }
  79. public Text getSalary() {
  80. return salary;
  81. }
  82. public void setSalary(Text salary) {
  83. this.salary = salary;
  84. }
  85. public Text getWorkPlace() {
  86. return workPlace;
  87. }
  88. public void setWorkPlace(Text workPlace) {
  89. this.workPlace = workPlace;
  90. }
  91. public Text getContact() {
  92. return contact;
  93. }
  94. public void setContact(Text contact) {
  95. this.contact = contact;
  96. }
  97. public Text getWelfare() {
  98. return welfare;
  99. }
  100. public void setWelfare(Text welfare) {
  101. this.welfare = welfare;
  102. }
  103. @Override
  104. public String toString() {
  105. return id + ":" + jobName + ":" + jobUrl + ":" + companyName + ":" + companyUrl +
  106. ":" + salary + ":" + workPlace + ":" + contact + ":" + welfare;
  107. }
  108. }

下面這個執行個體是每行直接以json格式存儲在hadoop上

[java] 

​​view plain​​​

 ​​​copy​​

  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Map.Entry;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.io.Writable;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14. import org.elasticsearch.hadoop.mr.EsInputFormat;
  15. import org.elasticsearch.hadoop.mr.LinkedMapWritable;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import com.google.gson.Gson;
  19. public class E2HJob03 {
  20. private static Logger LOG = LoggerFactory.getLogger(E2HJob03.class);
  21. public static void main(String args[]) {
  22. try {
  23. new Configuration();
  24. "mapreduce.map.speculative", false);
  25. "mapreduce.reduce.speculative", false);
  26. "es.nodes", "centos.host1:9200");
  27. "es.resource", "job/51/");
  28. new GenericOptionsParser(conf, args).getRemainingArgs();
  29. if (oArgs.length != 1) {
  30. "error");
  31. 2);
  32. }
  33. "51JOBE2H03");
  34. class);
  35. class);
  36. class);
  37. class);
  38. class);
  39. new Path(oArgs[0]));
  40. true));
  41. catch (Exception e) {
  42. LOG.error(e.getMessage(), e);
  43. }
  44. }
  45. }
  46. class E2HMapper03 extends Mapper<Text, LinkedMapWritable, NullWritable, Text> {
  47. private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);
  48. private Gson gson = null;
  49. @Override
  50. protected void setup(Context context) throws IOException, InterruptedException {
  51. super.setup(context);
  52. new Gson();
  53. }
  54. @Override
  55. protected void map(Text key, LinkedMapWritable value, Context context)
  56. throws IOException, InterruptedException {
  57. new JobInfo();
  58. jobInfo.setId(key.toString());
  59. new HashMap<String, String>();
  60. for (Entry<Writable, Writable> entry : value.entrySet()) {
  61. "key {} value {}", entry.getKey(), entry.getValue());
  62. map.put(entry.getKey().toString(), entry.getValue().toString());
  63. }
  64. "jobName"));
  65. "jobUrl"));
  66. "companyName"));
  67. "companyUrl"));
  68. "salary"));
  69. "workPlace"));
  70. "contact"));
  71. "welfare"));
  72. new Text(gson.toJson(jobInfo)));
  73. }
  74. @Override
  75. protected void cleanup(Context context) throws IOException, InterruptedException {
  76. super.cleanup(context);
  77. }
  78. }
  79. class JobInfo {
  80. private String id = null;
  81. private String jobName = null;
  82. private String jobUrl = null;
  83. private String companyName = null;
  84. private String companyUrl = null;
  85. private String salary = null;
  86. private String workPlace = null;
  87. private String contact = null;
  88. private String welfare = null;
  89. public String getId() {
  90. return id;
  91. }
  92. public void setId(String id) {
  93. this.id = id;
  94. }
  95. public String getJobName() {
  96. return jobName;
  97. }
  98. public void setJobName(String jobName) {
  99. this.jobName = jobName;
  100. }
  101. public String getJobUrl() {
  102. return jobUrl;
  103. }
  104. public void setJobUrl(String jobUrl) {
  105. this.jobUrl = jobUrl;
  106. }
  107. public String getCompanyName() {
  108. return companyName;
  109. }
  110. public void setCompanyName(String companyName) {
  111. this.companyName = companyName;
  112. }
  113. public String getCompanyUrl() {
  114. return companyUrl;
  115. }
  116. public void setCompanyUrl(String companyUrl) {
  117. this.companyUrl = companyUrl;
  118. }
  119. public String getSalary() {
  120. return salary;
  121. }
  122. public void setSalary(String salary) {
  123. this.salary = salary;
  124. }
  125. public String getWorkPlace() {
  126. return workPlace;
  127. }
  128. public void setWorkPlace(String workPlace) {
  129. this.workPlace = workPlace;
  130. }
  131. public String getContact() {
  132. return contact;
  133. }
  134. public void setContact(String contact) {
  135. this.contact = contact;
  136. }
  137. public String getWelfare() {
  138. return welfare;
  139. }
  140. public void setWelfare(String welfare) {
  141. this.welfare = welfare;
  142. }
  143. }

接下來的執行個體是将hadoop上的資料移動到ElasticSearch上索引,這裡直接用上面存儲的JSON資料試驗

[java] 

​​view plain​​​

 ​​​copy​​

1. import java.io.IOException;
2. 
3. import org.apache.hadoop.conf.Configuration;
4. import org.apache.hadoop.fs.Path;
5. import org.apache.hadoop.io.LongWritable;
6. import org.apache.hadoop.io.NullWritable;
7. import org.apache.hadoop.io.Text;
8. import org.apache.hadoop.io.Writable;
9. import org.apache.hadoop.mapreduce.Job;
10. import org.apache.hadoop.mapreduce.Mapper;
11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12. import org.apache.hadoop.util.GenericOptionsParser;
13. import org.elasticsearch.hadoop.mr.EsOutputFormat;
14. import org.slf4j.Logger;
15. import org.slf4j.LoggerFactory;
16. 
17. public class H2EJob {
18. 
19. private static Logger LOG = LoggerFactory.getLogger(H2EJob.class);
20. 
21. public static void main(String args[]) {
22. try {
23. new Configuration();
24. "mapreduce.map.speculative", false);
25. "mapreduce.reduce.speculative", false);
26. "es.nodes", "centos.host1:9200");
27. "es.resource", "job1/51");
28. //Hadoop上的資料格式為JSON,可以直接導入
29. "es.input.json", "yes");
30. new GenericOptionsParser(conf, args).getRemainingArgs();
31. if (oArgs.length != 1) {
32. "error");
33. 2);
34. }
35. "51JOBH2E");
36. class);
37. class);
38. class);
39. class);
40. class);
41. 
42. new Path(oArgs[0]));
43. 
44. true));
45. catch (Exception e) {
46. LOG.error(e.getMessage(), e);
47. }
48. }
49. 
50. }
51. 
52. class H2EMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
53. 
54. @Override
55. protected void setup(Context context) throws IOException, InterruptedException {
56. super.setup(context);
57. }
58. 
59. @Override
60. public void run(Context context) throws IOException, InterruptedException {
61. super.run(context);
62. }
63. 
64. @Override
65. protected void map(LongWritable key, Text value, Context context)
66. throws IOException, InterruptedException {
67. context.write(NullWritable.get(), value);
68. }
69. 
70. @Override
71. protected void cleanup(Context context) throws IOException,InterruptedException {
72. super.cleanup(context);
73. }
74. 
75. }