天天看点

hadoop编程入门学习笔记-4 ChainMapper、DistributedCache和Contextufo sighting文件流方式统计记录数和字段数按形状分组统计持续时间ChainMapperDistributedCacheContext编译和运行程序的辅助脚本

这是《Hadoop Beginner's Guide》 第四章的学习笔记。本章通过分析UFO sighting dataset的讲解了相关编程技巧。所需的ufo.tsv文件在书中给出的链接已经下不到了,在网上搜索文件名可以找到。书中的脚本用的是ruby,我把它改成了python。书中用的旧的api,因为装的是hadoop2.6.0,用的新api,我对程序也做相应改动。事后来看这种方式还挺有用的,迫使自己先看懂程序,对api不懂的地方就得度娘帮忙了。api说明在http://hadoop.apache.org/docs/r2.6.0/api/index.html。

ufo sighting文件

序号 字段 说明
1 Sighting date 看到UFO 的时间 
2 Recorded date 记录时间
3 Location 看到UFO 的地点
4 Shape 形状,如diamond等
5 Duration  持续时间
6 Description 描述

流方式统计记录数和字段数

这个很简单,相当于WordCount。 

wcmapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def map(input):
    for line in input:
        line = line.strip()
        words = line.split()
        for word in words:
            print '%s\t%s' % (word, 1)

def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
 
           

wcreducer.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def reduce(input):
    current_word = None
    current_count = 0
    word = None
    for line in input:
        line = line.strip()
        
        word, count = line.split('\t', 1)
        try:
            count = int(count)
        except ValueError:
            continue

        if current_word == word:
            current_count += count
        else:
            if current_word:
                print '%s\t%s' %(current_word, current_count)
            current_count = count
            current_word = word

    print '%s\t%s' % (current_word, current_count)

def main():
    reduce(sys.stdin)


if __name__ == "__main__":
    main()
           

summarymapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def map(input):
    for line in input:
        print "total\t1"
        line = line.strip()
        words = line.split("\t")
        if len(words) != 6:
            print "badline\t1"
        else:
            if words[0] != None:
                print "sighted\t1"
            if words[1] != None:
                print "recorded\t1"
            if words[2] != None:
                print "location\t1"
            if words[3] != None:
                print "shape\t1"
            if words[4] != None:
                print "duration\t1"
            if words[5] != None:
                print "description\t1"


def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
 
           

用到的命令

cat ufo.tsv | ./summarymapper.py | sort | ./wcreducer.py > output.txt
hadoop dfs -copyFromLocal ufo.tsv /user/hadoop/ufo/ufo.tsv
hadoop jar /home/hadoop/cloud/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -file summarymapper.py -mapper summarymapper.py -file wcreducer.py -reducer wcreducer.py -input /user/hadoop/ufo/ufo.tsv -output /user/hadoop/ufo/out
           

按形状分组统计持续时间

第一个脚本shpemapper.py只是简单的统计形状,相当于WordCount。脚本shapetimemapper.py和shapetimemapper.py实现分组统计。相比于shpemapper.py,shapetimemapper.py多做了一件事情,第一件与shpemapper.py一样,识别形状(words[3]);第二件是用用到正则表达式提取文本(words[4]),转换成整数后累加,输出时将 print shape + "\t1" 改为 print shape + "\t" + str(time)。shapetimereducer.py中实现min 、max、total、mean等计算。 

shpemapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def map(input):
    for line in input:
        line = line.strip()
        words = line.split("\t")
        if len(words) == 6:
            shape = words[3].strip()
            if len(shape) > 0:
                print shape + "\t1"
            
def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
           

shapetimemapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys
import re

def map(input):
    pattern1 = re.compile(r'\d* ?((min)|(sec))')
    pattern2 = re.compile(r'\d*') 
    for line in input:
        line = line.strip()
        words = line.split("\t")
        if len(words) == 6:
            shape = words[3].strip()
            duration = words[4].strip()
            if shape != None and duration != None:
                match = pattern1.match(duration)
                if match != None:
                    time = pattern2.match(match.group())
                    unit = match.group(1)
                    try:
                        time = int(time.group())
                    except:
                        #print '??? : ' + duration
                        time = 0
                    if unit == 'min':
                        time = time * 60
                    if len(shape) > 0:
                        print shape + '\t' + str(time) 

def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
 
           

shapetimereducer.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys
import re

def reduce(input):
    current = None
    minv = 0
    maxv = 0
    mean = 0
    total = 0
    count = 0
    
    for line in input:
        line = line.strip()
        word, time = line.split('\t')
        time = int(time)
        
        if word == current:
            count += 1
            total += time
            if time < minv:
                minv = time
            if time > maxv:
                maxv = time
        else:
            if current != None:
                print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))
            current = word
            count = 1
            total = time
            minv = time
            maxv = time
    print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))

def main():
    reduce(sys.stdin)

if __name__ == "__main__":
    main()
 
           

ChainMapper

用链的方式把两个Mapper串起来,有点像是servlet里的Filter。示例中用了两个Mapper,第一个验证记录的有效性,第二个对Location进行计数。使用ChainMapper.addMapper 方法添加Mapper。注意job.setJarByClass(ufo.UFORecordValidationMapper.class);否则会报类找不到, 要注意的是新旧api的使用有差异,我在程序中进行了注释。

UFORecordValidationMapper.java

package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

/*
旧api,继承类org.apache.hadoop.mapred.MapReduceBase,然后实现接口org.apache.hadoop.mapred.Mapper<K1, V1, K2, V2>。
新api,继承类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>。
旧api,map方法的第三、四个形参分别是OutputCollector和Reporter类。
新api,map方法的第三个参数是Context类,新api的Context把两个类的功能合并到一起。
*/

public class UFORecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String line = value.toString();
        if(validate(line))
            context.write(key, value); 
    }

    private boolean validate(String str)
    {
        String words[] = str.split("\t");
        if(words.length != 6)
            return false;
        else
            return true;
    }
}
           

UFOLocation.java

package ufo;

import java.io.*;
import java.util.Iterator;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class UFOLocation
{
     public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
    {
        private final static LongWritable one = new LongWritable(1);
        private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");
        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString();
            String[] fields = line.split("\t");
            String location = fields[2].trim();
            if(location.length() >= 2)
            {
                Matcher matcher = locationPattern.matcher(location);
                if(matcher.find())
                {
                    int start = matcher.start();
                    String state = location.substring(start, start + 2);
                    context.write(new Text(state.toUpperCase()), one);
                }
            }
        }
    }

    /*
    新api中,驱动代码通过org.apache.hadoop.mapreduce.Job类实现,通过该类管理各种配置,然后调用waitForCompletion(boolean)方法把代码提交给JobTracker执行。
    旧api中,驱动代码通过org.apache.hadoop.mapred.JobConf(Configuration, Class)类实现,通过该类管理各种配置;通过org.apache.hadoop.mapred.JobClient类的runJob(JobConf)方法实现job的提交。
    */
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "UFOLocation");

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        Configuration mapAConf = new Configuration(false);
        ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
             LongWritable.class, Text.class, mapAConf);

        Configuration mapBConf = new Configuration(false);
        ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
             Text.class, LongWritable.class, mapBConf);
        
        job.setJarByClass(ufo.UFORecordValidationMapper.class);
        job.setMapperClass(ChainMapper.class);
        job.setCombinerClass(LongSumReducer.class);
        job.setReducerClass(LongSumReducer.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)?0:1);
    }    
}
           

DistributedCache

通过DistributedCache可以共享文件,共享文件分两步实现,第一步是在main中用DistributedCache.addCacheFile添加文件,第二步是在Mapper的获得这个文件,比如Mapper的setup方法将文件读入到一个HashMap。在实例中创建了一个states.txt文件作为州名的简称到全称的转换。

states.txt

AL	Alabama
AK	Alaska
AZ	Arizona
AR	Arkansas
CA	California
           

UFOLocation2.java

package ufo;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.filecache.DistributedCache;

public class UFOLocation2
{
    public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
    {
        public final static String LINK_STATES_TXT = "__Link_statestxt__"; 
        private final static LongWritable one = new LongWritable(1);
        private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");
        private Map<String, String> stateNames;

        @Override
        public void setup(Context text) throws IOException, InterruptedException
        {
            try
            {
                setupStateMap();
            }catch (IOException e){
                System.err.println("Error reading state file.");
                System.exit(1);
            }
        }

        private void setupStateMap() throws IOException
        {
            Map<String, String> states = new HashMap<String, String>();
            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(MapClass.LINK_STATES_TXT)));
            String line = reader.readLine();
            while(line != null)
            {
                String[] split = line.split("\t");
                states.put(split[0], split[1]);
                line = reader.readLine();
            }
            stateNames = states;
        }
        
        private String lookupState(String state)
        {
            String fullName = stateNames.get(state);
            return fullName == null ? "Other" : fullName;
        }

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {   
            String line = value.toString();
            String[] fields = line.split("\t");
            String location = fields[2].trim();
            if(location.length() >= 2)
            {
                Matcher matcher = locationPattern.matcher(location);
                if(matcher.find())
                {
                    int start = matcher.start();
                    String state = location.substring(start, start + 2);
                    String fullName = lookupState(state.toUpperCase());
                    context.write(new Text(fullName), one);
                }
            }
        }
     }


    /*
    新api中DistributedCadhe需要通过创建符号链接的方式使用。
    */
    public static void main(String[] args) throws Exception
    {
         Configuration conf = new Configuration();
         Job job = new Job(conf, "UFOLocation");
         String cacheFilePath = "/user/hadoop/ufo/states.txt";
         Path inPath = new Path(cacheFilePath);
         //#后的为符号链接
         String inPathLink = inPath.toUri().toString()+"#"+MapClass.LINK_STATES_TXT;      
         DistributedCache.addCacheFile(new URI(inPathLink), job.getConfiguration());

         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(LongWritable.class);

         Configuration mapAConf = new Configuration(false);
         ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
             LongWritable.class, Text.class, mapAConf);

         Configuration mapBConf = new Configuration(false);
         ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
             Text.class, LongWritable.class, mapBConf);
        
         job.setJarByClass(ufo.UFORecordValidationMapper.class);
         job.setMapperClass(ChainMapper.class);
         job.setCombinerClass(LongSumReducer.class);
         job.setReducerClass(LongSumReducer.class);
        
         FileInputFormat.addInputPath(job, new Path(args[0]));
         FileOutputFormat.setOutputPath(job, new Path(args[1]));

         System.exit(job.waitForCompletion(true)?0:1);
    }    
}
           

Context

在旧api中使用Reporter来实现计数和状态的输出,在新api中通过Context实现。

UFOCountingRecordValidationMapper.java 

定义 enum LineCunters类型,通过context.getCounter得到计数器,使用计数器increment增加计数。通过context.setStatus设置状态。

package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class UFOCountingRecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
    public enum LineCounters
    {
        BAD_LINES,
        TOO_MANY_TABS,
        TOO_FEW_TABS
    };
    
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String line = value.toString();
        if(validate(line, context))
            context.write(key, value); 
    }

    private boolean validate(String str, Context context)
    {
        String words[] = str.split("\t");
        if(words.length != 6)
        {
            if(words.length <6)
            {
                Counter ct = context.getCounter(LineCounters.TOO_FEW_TABS);
                ct.increment(1);
            }else{
                Counter ct = context.getCounter(LineCounters.TOO_MANY_TABS);
                ct.increment(1);
            }
            Counter ct = context.getCounter(LineCounters.BAD_LINES);
            ct.increment(1);
           
            if(ct.getValue() % 10 == 0){
                context.setStatus("Got 10 bad lines.");
                System.err.println("Read another 10 bad lines.");    
            }
            
            return false;
        }else
            return true;
    }
}
           

UFOLocation3.java

复制UFOLocation2.java改名,然后修改两个地方。

ChainMapper.addMapper(job, UFOCountingRecordValidationMapper.class, LongWritable.class, Text.class,
             LongWritable.class, Text.class, mapAConf);
           
job.setJarByClass(ufo.UFOCountingRecordValidationMapper.class);
           

编译和运行程序的辅助脚本

build.sh

因为使用了包ufo,所以build.sh放在当前目录,java源文件放在当前目录的下一级目录ufo。

#/bin/sh
HADOOP_LIB_DIR=/home/hadoop/cloud/hadoop/share/hadoop

rm -f ./*.class
rm -f ./ufo.jar

javac -classpath $HADOOP_LIB_DIR/common/hadoop-common-2.6.0.jar:$HADOOP_LIB_DIR/common/lib/commons-cli-1.2.jar:$HADOOP_LIB_DIR/common/lib/hadoop-annotations-2.6.0.jar:$HADOOP_LIB_DIR/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar -d . ufo/UFORecordValidationMapper.java ufo/UFOLocation.java ufo/UFOLocation2.java ufo/UFOCountingRecordValidationMapper.java  ufo/UFOLocation3.java

#package

jar -cvf ufo.jar ./ufo/*.class
           

run.sh

运行不同例子只要改ufo.UFOLocation 为ufo.UFOLocation2 、ufo.UFOLocation3就行了

#/bin/sh
hdfs dfs -rm -r -f /user/hadoop/ufo/out001
hadoop jar ufo.jar ufo.UFOLocation /user/hadoop/ufo/ufo.tsv /user/hadoop/ufo/out001
hdfs dfs -cat /user/hadoop/ufo/out001/part-r-00000
           

继续阅读