文章目錄
- 本地模式
-
- 本地模式(輸入輸出資料在本地)
- 本地模式(輸入輸出資料在HDFS)
- 叢集模式
-
- 1 将hadoop叢集的配置檔案copy到eclipse工程的resources目錄下
- 2 使用SBT工具打包釋出
- 3 将打包生成的jar包送出到叢集運作
本地模式
- mapreduce程式是被送出給LocalJobRunner在本地以單程序的形式運作。在本地運作可以使用debug進行跟蹤代碼,友善查錯,在本地運作主要作用是看mapreduce的業務邏輯是否正确;
- 處理的資料及輸出結果可以在本地檔案系統,也可以在hdfs上;
- 本地模式非常便于進行業務邏輯的debug,隻要在eclipse中打斷點即可;
- 如何區分本地模式:主要看mr程式的conf中是否有mapreduce.framework.name=local和mapreduce.cluster.local.dir=×××參數。
本地模式(輸入輸出資料在本地)
Configuration conf = new Configuration();
//是否運作為本地模式,就是看這個參數值是否為local,預設就是local
conf.set("mapreduce.framework.name", "local");
conf.set("mapreduce.cluster.local.dir", "/home/workspace/mapred/local");
//本地模式運作mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
//到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
// conf.set("fs.defaultFS", "hdfs://hadoopSvr1:8020/");
conf.set("fs.defaultFS", "file:///");
本地模式(輸入輸出資料在HDFS)
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
//是否運作為本地模式,就是看這個參數值是否為local,預設就是local
conf.set("mapreduce.framework.name", "local");
conf.set("mapreduce.cluster.local.dir", "/home/workspace/mapred/local");
//本地模式運作mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
//到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
conf.set("fs.defaultFS", "hdfs://hadoopSvr1:8020/");
// conf.set("fs.defaultFS", "file:///");
叢集模式
1 将hadoop叢集的配置檔案copy到eclipse工程的resources目錄下
主要是如下幾個配置檔案
core-site.xml
hdfs-site.xml
log4j.properties
mapred-site.xml
yarn-site.xml
讀取配置部分代碼如下
Configuration conf = new YarnConfiguration();
// conf.addResource("log4j.properties");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("mapred-site.xml");
conf.addResource("yarn-site.xml");
Job job = Job.getInstance(conf);
//設定job的各種屬性
job.setJobName("WCApp"); //作業名稱
job.setJarByClass(WCApp.class); //搜尋類
2 使用SBT工具打包釋出
具體參考:https://blog.csdn.net/wangkai_123456/article/details/88933417
其中scala工程的建構定義檔案build.sbt内容如下:
ThisBuild / scalaVersion := "2.11.12"
ThisBuild / organization := "org.kaidy"
// Dependencies for hdfs
// Excluding JAR files that are already part of the container (like Spark), consider scoping the dependent library to "provided" configuration
val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % "3.1.0"
val hadoopCommon = "org.apache.hadoop" % "hadoop-common" % "3.1.0"
// val hadoopClient = "org.apache.hadoop" % "hadoop-client" % "3.1.0"
val hadoopMrClientJobClient = "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % "3.1.0"
val hadoopMrClientCore = "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.1.0"
// https://mvnrepository.com/artifact/org.scalaj/scalaj-http
// val scalaJson = "org.scalaj" %% "scalaj-http" % "2.4.1"
lazy val root = (project in file("."))
.settings(
name := "mrWordCount",
version := "1.0",
libraryDependencies ++= Seq(
hadoopHdfs % "provided",
hadoopCommon % "provided",
hadoopMrClientJobClient % "provided",
hadoopMrClientCore % "provided"
),
// libraryDependencies += scalaJson
)
3 将打包生成的jar包送出到叢集運作
假設打包生成jar包為:mrWordCount-assembly-1.0.jar,送出指令大緻如下:
hadoop jar ../runableJars/mrWordCount-assembly-1.0.jar /test/kaidy/input /test/kaidy/output
其中,輸入資料内容如下:
[[email protected] hadoop]# hadoop fs -ls /test/kaidy/input
Found 2 items
-rw-r--r-- 2 root supergroup 24 2019-04-09 18:33 /test/kaidy/input/file01
-rw-r--r-- 2 root supergroup 33 2019-04-09 18:33 /test/kaidy/input/file02
[[email protected] hadoop]# hadoop fs -cat /test/kaidy/input/file01
Hello World, Bye World!
[[email protected] hadoop]# hadoop fs -cat /test/kaidy/input/file02
Hello Hadoop, Goodbye to hadoop.
[[email protected] hadoop]#
運作結果:
[[email protected] hadoop]# hadoop fs -ls /test/kaidy/output
Found 2 items
-rw-r--r-- 2 root supergroup 0 2019-04-10 17:32 /test/kaidy/output/_SUCCESS
-rw-r--r-- 2 root supergroup 67 2019-04-10 17:32 /test/kaidy/output/part-r-00000
[[email protected] hadoop]# hadoop fs -cat /test/kaidy/output/part-r-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
附完整代碼:
package kaidy.mr;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WCApp {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
// Configuration conf = new Configuration();
//是否運作為本地模式,就是看這個參數值是否為local,預設就是local
// conf.set("mapreduce.framework.name", "local");
// conf.set("mapreduce.cluster.local.dir", "/home/***/workspace/mapred/local");
//本地模式運作mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
//到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
// conf.set("fs.defaultFS", "hdfs://hadoopSvr1:8020/");
// conf.set("fs.defaultFS", "file:///");
Configuration conf = new YarnConfiguration();
// conf.addResource("log4j.properties");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("mapred-site.xml");
conf.addResource("yarn-site.xml");
Job job = Job.getInstance(conf);
//設定job的各種屬性
job.setJobName("WCApp"); //作業名稱
job.setJarByClass(WCApp.class); //搜尋類
// job.setJar("mrWordCount.jar");
job.setInputFormatClass(TextInputFormat.class); //設定輸入格式
//設定輸入路徑
// FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.setInputPaths(job, new Path(args[0]));
//設定輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WCMapperTmp.class); //mapper類
job.setReducerClass(WCReducerTmp.class); //reducer類
job.setNumReduceTasks(1); //reduce個數
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class); //設定輸出的key格式
job.setOutputValueClass(IntWritable.class); //設定輸出的value格式
job.waitForCompletion(true);
}
}
class WCMapperTmp extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyOut = new Text();
IntWritable valueOut = new IntWritable();
String[] arr = value.toString().split(" ");
for (String s : arr) {
keyOut.set(s);
valueOut.set(1);
context.write(keyOut, valueOut);
}
}
}
class WCReducerTmp extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* reduce
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable iw : values) {
count = count + iw.get();
}
//輸出
context.write(key, new IntWritable(count));
}
}