天天看點

基于linux和eclipse的mapreduce程式本地模式和叢集模式本地模式叢集模式

文章目錄

  • 本地模式
    • 本地模式(輸入輸出資料在本地)
    • 本地模式(輸入輸出資料在HDFS)
  • 叢集模式
    • 1 将hadoop叢集的配置檔案copy到eclipse工程的resources目錄下
    • 2 使用SBT工具打包釋出
    • 3 将打包生成的jar包送出到叢集運作

本地模式

  1. mapreduce程式是被送出給LocalJobRunner在本地以單程序的形式運作。在本地運作可以使用debug進行跟蹤代碼,友善查錯,在本地運作主要作用是看mapreduce的業務邏輯是否正确;
  2. 處理的資料及輸出結果可以在本地檔案系統,也可以在hdfs上;
  3. 本地模式非常便于進行業務邏輯的debug,隻要在eclipse中打斷點即可;
  4. 如何區分本地模式:主要看mr程式的conf中是否有mapreduce.framework.name=local和mapreduce.cluster.local.dir=×××參數。

本地模式(輸入輸出資料在本地)

Configuration conf = new Configuration();
    	//是否運作為本地模式,就是看這個參數值是否為local,預設就是local
    	conf.set("mapreduce.framework.name", "local");
    	conf.set("mapreduce.cluster.local.dir", "/home/workspace/mapred/local");

    	//本地模式運作mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
    	//到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
//    	conf.set("fs.defaultFS", "hdfs://hadoopSvr1:8020/");
    	conf.set("fs.defaultFS", "file:///");
           

本地模式(輸入輸出資料在HDFS)

System.setProperty("HADOOP_USER_NAME", "root");
    	Configuration conf = new Configuration();
    	//是否運作為本地模式,就是看這個參數值是否為local,預設就是local
    	conf.set("mapreduce.framework.name", "local");
    	conf.set("mapreduce.cluster.local.dir", "/home/workspace/mapred/local");

    	//本地模式運作mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
    	//到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
    	conf.set("fs.defaultFS", "hdfs://hadoopSvr1:8020/");
//    	conf.set("fs.defaultFS", "file:///");
           

叢集模式

1 将hadoop叢集的配置檔案copy到eclipse工程的resources目錄下

主要是如下幾個配置檔案

core-site.xml  
hdfs-site.xml  
log4j.properties  
mapred-site.xml 
yarn-site.xml
           

讀取配置部分代碼如下

Configuration conf = new YarnConfiguration();
//    	conf.addResource("log4j.properties");
    	conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.addResource("mapred-site.xml");
        conf.addResource("yarn-site.xml");
        
        Job job = Job.getInstance(conf);

        //設定job的各種屬性
        job.setJobName("WCApp");           //作業名稱
        job.setJarByClass(WCApp.class);    //搜尋類
           

2 使用SBT工具打包釋出

具體參考:https://blog.csdn.net/wangkai_123456/article/details/88933417

其中scala工程的建構定義檔案build.sbt内容如下:

ThisBuild / scalaVersion := "2.11.12"
ThisBuild / organization := "org.kaidy"

// Dependencies for hdfs
// Excluding JAR files that are already part of the container (like Spark), consider scoping the dependent library to "provided" configuration
val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % "3.1.0"
val hadoopCommon = "org.apache.hadoop" % "hadoop-common" % "3.1.0"
// val hadoopClient = "org.apache.hadoop" % "hadoop-client" % "3.1.0"
val hadoopMrClientJobClient = "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % "3.1.0"
val hadoopMrClientCore = "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.1.0"

// https://mvnrepository.com/artifact/org.scalaj/scalaj-http
// val scalaJson = "org.scalaj" %% "scalaj-http" % "2.4.1"

lazy val root = (project in file("."))
  .settings(
    name := "mrWordCount",
    version := "1.0",
	
	libraryDependencies ++= Seq(
		hadoopHdfs % "provided",
      	hadoopCommon % "provided",
		hadoopMrClientJobClient % "provided",
		hadoopMrClientCore % "provided"
    ),

	// libraryDependencies += scalaJson
  )
           

3 将打包生成的jar包送出到叢集運作

假設打包生成jar包為:mrWordCount-assembly-1.0.jar,送出指令大緻如下:

hadoop jar ../runableJars/mrWordCount-assembly-1.0.jar  /test/kaidy/input /test/kaidy/output
           

其中,輸入資料内容如下:

[[email protected] hadoop]# hadoop fs -ls /test/kaidy/input
Found 2 items
-rw-r--r--   2 root supergroup         24 2019-04-09 18:33 /test/kaidy/input/file01
-rw-r--r--   2 root supergroup         33 2019-04-09 18:33 /test/kaidy/input/file02
[[email protected] hadoop]# hadoop fs -cat /test/kaidy/input/file01
Hello World, Bye World!
[[email protected] hadoop]# hadoop fs -cat /test/kaidy/input/file02
Hello Hadoop, Goodbye to hadoop.
[[email protected] hadoop]# 
           

運作結果:

[[email protected] hadoop]# hadoop fs -ls /test/kaidy/output
Found 2 items
-rw-r--r--   2 root supergroup          0 2019-04-10 17:32 /test/kaidy/output/_SUCCESS
-rw-r--r--   2 root supergroup         67 2019-04-10 17:32 /test/kaidy/output/part-r-00000
[[email protected] hadoop]# hadoop fs -cat /test/kaidy/output/part-r-00000
Bye	1
Goodbye	1
Hadoop,	1
Hello	2
World!	1
World,	1
hadoop.	1
to	1
           

附完整代碼:

package kaidy.mr;

import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WCApp { 
    public static void main(String[] args) throws Exception {
    	System.setProperty("HADOOP_USER_NAME", "root");
//    	Configuration conf = new Configuration();
    	//是否運作為本地模式,就是看這個參數值是否為local,預設就是local
//    	conf.set("mapreduce.framework.name", "local");
//    	conf.set("mapreduce.cluster.local.dir", "/home/***/workspace/mapred/local");

    	//本地模式運作mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
    	//到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
//    	conf.set("fs.defaultFS", "hdfs://hadoopSvr1:8020/");
//    	conf.set("fs.defaultFS", "file:///");
    	
    	Configuration conf = new YarnConfiguration();
//    	conf.addResource("log4j.properties");
    	conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.addResource("mapred-site.xml");
        conf.addResource("yarn-site.xml");
        
        Job job = Job.getInstance(conf);

        //設定job的各種屬性
        job.setJobName("WCApp");           //作業名稱
        job.setJarByClass(WCApp.class);    //搜尋類
//        job.setJar("mrWordCount.jar");
        job.setInputFormatClass(TextInputFormat.class);  //設定輸入格式
        
        //設定輸入路徑
//        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //設定輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.setMapperClass(WCMapperTmp.class);   //mapper類
        job.setReducerClass(WCReducerTmp.class); //reducer類

        job.setNumReduceTasks(1);             //reduce個數

        job.setMapOutputKeyClass(Text.class);	
        job.setMapOutputValueClass(IntWritable.class);	

        job.setOutputKeyClass(Text.class);	//設定輸出的key格式
        job.setOutputValueClass(IntWritable.class);	//設定輸出的value格式

        job.waitForCompletion(true);
    }
}

class WCMapperTmp extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();
        String[] arr = value.toString().split(" ");
        for (String s : arr) {
            keyOut.set(s);
            valueOut.set(1);
            context.write(keyOut, valueOut);
        }
    }
}

class WCReducerTmp extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * reduce
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int count = 0;
        for (IntWritable iw : values) {
            count = count + iw.get();
        }
        //輸出
        context.write(key, new IntWritable(count));
    }
}
           
上一篇: 解析soap

繼續閱讀