天天看點

每日總結

Mapreduce執行個體——單表join

依賴:

<dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-common</artifactId>

      <version>3.2.0</version>

    </dependency>

    <dependency>

      <artifactId>hadoop-mapreduce-client-app</artifactId>

      <artifactId>hadoop-hdfs</artifactId>

      <groupId>org.slf4j</groupId>

      <artifactId>slf4j-log4j12</artifactId>

      <version>1.7.30</version>

      <artifactId>hadoop-client</artifactId>

</dependency>

實驗代碼:

package   mapreduce;

import   java.io.IOException;

import   java.util.Iterator;

import   org.apache.hadoop.conf.Configuration;

import   org.apache.hadoop.fs.Path;

import   org.apache.hadoop.io.Text;

import   org.apache.hadoop.mapreduce.Job;

import   org.apache.hadoop.mapreduce.Mapper;

import   org.apache.hadoop.mapreduce.Reducer;

import   org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import   org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public   class   DanJoin {

    public static class Map extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String line = value.toString();

            String[] arr = line.split("\t");

            String mapkey = arr[0];

            String mapvalue = arr[1];

            String relationtype = new String();

            relationtype = "1";

            context.write(new Text(mapkey), new Text(relationtype + "+" + mapvalue));

            //System.out.println(relationtype+"+"+mapvalue);

            relationtype = "2";

            context.write(new Text(mapvalue), new Text(relationtype + "+" + mapkey));

        }

    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context)

            int buyernum = 0;

            String[] buyer = new String[20];

            int friendsnum = 0;

            String[] friends = new String[20];

            Iterator ite = values.iterator();

            while (ite.hasNext()) {

                String record = ite.next().toString();

                int len = record.length();

                int i = 2;

                if (0 == len) {

                    continue;

                }

                char relationtype = record.charAt(0);

                if ('1' == relationtype) {

                    buyer[buyernum] = record.substring(i);

                    buyernum++;

                if ('2' == relationtype) {

                    friends[friendsnum] = record.substring(i);

                    friendsnum++;

            }

            if (0 != buyernum && 0 != friendsnum) {

                for (int m = 0; m < buyernum; m++) {

                    for (int n = 0; n < friendsnum; n++) {

                        if (buyer[m] != friends[n]) {

                            context.write(new Text(buyer[m]), new Text(friends[n]));

                        }

                    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new String[2];

        otherArgs[0] = "hdfs://hadoop102:8020/mymapreduce2/in/buyer1";

        otherArgs[1] = "hdfs://hadoop102:8020/mymapreduce2/out3";

        Job job = new Job(conf, "   Table   join");

        job.setJarByClass(DanJoin.class);

        job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

}

每日總結
每日總結