天天看點

hadoop程式設計入門學習筆記-4 ChainMapper、DistributedCache和Contextufo sighting檔案流方式統計記錄數和字段數按形狀分組統計持續時間ChainMapperDistributedCacheContext編譯和運作程式的輔助腳本

這是《Hadoop Beginner's Guide》 第四章的學習筆記。本章通過分析UFO sighting dataset的講解了相關程式設計技巧。所需的ufo.tsv檔案在書中給出的連結已經下不到了,在網上搜尋檔案名可以找到。書中的腳本用的是ruby,我把它改成了python。書中用的舊的api,因為裝的是hadoop2.6.0,用的新api,我對程式也做相應改動。事後來看這種方式還挺有用的,迫使自己先看懂程式,對api不懂的地方就得度娘幫忙了。api說明在http://hadoop.apache.org/docs/r2.6.0/api/index.html。

ufo sighting檔案

序号 字段 說明
1 Sighting date 看到UFO 的時間 
2 Recorded date 記錄時間
3 Location 看到UFO 的地點
4 Shape 形狀,如diamond等
5 Duration  持續時間
6 Description 描述

流方式統計記錄數和字段數

這個很簡單,相當于WordCount。 

wcmapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def map(input):
    for line in input:
        line = line.strip()
        words = line.split()
        for word in words:
            print '%s\t%s' % (word, 1)

def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
 
           

wcreducer.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def reduce(input):
    current_word = None
    current_count = 0
    word = None
    for line in input:
        line = line.strip()
        
        word, count = line.split('\t', 1)
        try:
            count = int(count)
        except ValueError:
            continue

        if current_word == word:
            current_count += count
        else:
            if current_word:
                print '%s\t%s' %(current_word, current_count)
            current_count = count
            current_word = word

    print '%s\t%s' % (current_word, current_count)

def main():
    reduce(sys.stdin)


if __name__ == "__main__":
    main()
           

summarymapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def map(input):
    for line in input:
        print "total\t1"
        line = line.strip()
        words = line.split("\t")
        if len(words) != 6:
            print "badline\t1"
        else:
            if words[0] != None:
                print "sighted\t1"
            if words[1] != None:
                print "recorded\t1"
            if words[2] != None:
                print "location\t1"
            if words[3] != None:
                print "shape\t1"
            if words[4] != None:
                print "duration\t1"
            if words[5] != None:
                print "description\t1"


def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
 
           

用到的指令

cat ufo.tsv | ./summarymapper.py | sort | ./wcreducer.py > output.txt
hadoop dfs -copyFromLocal ufo.tsv /user/hadoop/ufo/ufo.tsv
hadoop jar /home/hadoop/cloud/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -file summarymapper.py -mapper summarymapper.py -file wcreducer.py -reducer wcreducer.py -input /user/hadoop/ufo/ufo.tsv -output /user/hadoop/ufo/out
           

按形狀分組統計持續時間

第一個腳本shpemapper.py隻是簡單的統計形狀,相當于WordCount。腳本shapetimemapper.py和shapetimemapper.py實作分組統計。相比于shpemapper.py,shapetimemapper.py多做了一件事情,第一件與shpemapper.py一樣,識别形狀(words[3]);第二件是用用到正規表達式提取文本(words[4]),轉換成整數後累加,輸出時将 print shape + "\t1" 改為 print shape + "\t" + str(time)。shapetimereducer.py中實作min 、max、total、mean等計算。 

shpemapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys

def map(input):
    for line in input:
        line = line.strip()
        words = line.split("\t")
        if len(words) == 6:
            shape = words[3].strip()
            if len(shape) > 0:
                print shape + "\t1"
            
def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
           

shapetimemapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys
import re

def map(input):
    pattern1 = re.compile(r'\d* ?((min)|(sec))')
    pattern2 = re.compile(r'\d*') 
    for line in input:
        line = line.strip()
        words = line.split("\t")
        if len(words) == 6:
            shape = words[3].strip()
            duration = words[4].strip()
            if shape != None and duration != None:
                match = pattern1.match(duration)
                if match != None:
                    time = pattern2.match(match.group())
                    unit = match.group(1)
                    try:
                        time = int(time.group())
                    except:
                        #print '??? : ' + duration
                        time = 0
                    if unit == 'min':
                        time = time * 60
                    if len(shape) > 0:
                        print shape + '\t' + str(time) 

def main():
    map(sys.stdin)

if __name__ == "__main__":
    main()
 
           

shapetimereducer.py

#!/usr/bin/python
# -*- coding: utf-8 -*- 
"""a python script for hadoop streaming map """

import sys
import re

def reduce(input):
    current = None
    minv = 0
    maxv = 0
    mean = 0
    total = 0
    count = 0
    
    for line in input:
        line = line.strip()
        word, time = line.split('\t')
        time = int(time)
        
        if word == current:
            count += 1
            total += time
            if time < minv:
                minv = time
            if time > maxv:
                maxv = time
        else:
            if current != None:
                print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))
            current = word
            count = 1
            total = time
            minv = time
            maxv = time
    print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))

def main():
    reduce(sys.stdin)

if __name__ == "__main__":
    main()
 
           

ChainMapper

用鍊的方式把兩個Mapper串起來,有點像是servlet裡的Filter。示例中用了兩個Mapper,第一個驗證記錄的有效性,第二個對Location進行計數。使用ChainMapper.addMapper 方法添加Mapper。注意job.setJarByClass(ufo.UFORecordValidationMapper.class);否則會報類找不到, 要注意的是新舊api的使用有差異,我在程式中進行了注釋。

UFORecordValidationMapper.java

package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

/*
舊api,繼承類org.apache.hadoop.mapred.MapReduceBase,然後實作接口org.apache.hadoop.mapred.Mapper<K1, V1, K2, V2>。
新api,繼承類org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>。
舊api,map方法的第三、四個形參分别是OutputCollector和Reporter類。
新api,map方法的第三個參數是Context類,新api的Context把兩個類的功能合并到一起。
*/

public class UFORecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String line = value.toString();
        if(validate(line))
            context.write(key, value); 
    }

    private boolean validate(String str)
    {
        String words[] = str.split("\t");
        if(words.length != 6)
            return false;
        else
            return true;
    }
}
           

UFOLocation.java

package ufo;

import java.io.*;
import java.util.Iterator;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class UFOLocation
{
     public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
    {
        private final static LongWritable one = new LongWritable(1);
        private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");
        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString();
            String[] fields = line.split("\t");
            String location = fields[2].trim();
            if(location.length() >= 2)
            {
                Matcher matcher = locationPattern.matcher(location);
                if(matcher.find())
                {
                    int start = matcher.start();
                    String state = location.substring(start, start + 2);
                    context.write(new Text(state.toUpperCase()), one);
                }
            }
        }
    }

    /*
    新api中,驅動代碼通過org.apache.hadoop.mapreduce.Job類實作,通過該類管理各種配置,然後調用waitForCompletion(boolean)方法把代碼送出給JobTracker執行。
    舊api中,驅動代碼通過org.apache.hadoop.mapred.JobConf(Configuration, Class)類實作,通過該類管理各種配置;通過org.apache.hadoop.mapred.JobClient類的runJob(JobConf)方法實作job的送出。
    */
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "UFOLocation");

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        Configuration mapAConf = new Configuration(false);
        ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
             LongWritable.class, Text.class, mapAConf);

        Configuration mapBConf = new Configuration(false);
        ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
             Text.class, LongWritable.class, mapBConf);
        
        job.setJarByClass(ufo.UFORecordValidationMapper.class);
        job.setMapperClass(ChainMapper.class);
        job.setCombinerClass(LongSumReducer.class);
        job.setReducerClass(LongSumReducer.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)?0:1);
    }    
}
           

DistributedCache

通過DistributedCache可以共享檔案,共享檔案分兩步實作,第一步是在main中用DistributedCache.addCacheFile添加檔案,第二步是在Mapper的獲得這個檔案,比如Mapper的setup方法将檔案讀入到一個HashMap。在執行個體中建立了一個states.txt檔案作為州名的簡稱到全稱的轉換。

states.txt

AL	Alabama
AK	Alaska
AZ	Arizona
AR	Arkansas
CA	California
           

UFOLocation2.java

package ufo;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.filecache.DistributedCache;

public class UFOLocation2
{
    public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
    {
        public final static String LINK_STATES_TXT = "__Link_statestxt__"; 
        private final static LongWritable one = new LongWritable(1);
        private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");
        private Map<String, String> stateNames;

        @Override
        public void setup(Context text) throws IOException, InterruptedException
        {
            try
            {
                setupStateMap();
            }catch (IOException e){
                System.err.println("Error reading state file.");
                System.exit(1);
            }
        }

        private void setupStateMap() throws IOException
        {
            Map<String, String> states = new HashMap<String, String>();
            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(MapClass.LINK_STATES_TXT)));
            String line = reader.readLine();
            while(line != null)
            {
                String[] split = line.split("\t");
                states.put(split[0], split[1]);
                line = reader.readLine();
            }
            stateNames = states;
        }
        
        private String lookupState(String state)
        {
            String fullName = stateNames.get(state);
            return fullName == null ? "Other" : fullName;
        }

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {   
            String line = value.toString();
            String[] fields = line.split("\t");
            String location = fields[2].trim();
            if(location.length() >= 2)
            {
                Matcher matcher = locationPattern.matcher(location);
                if(matcher.find())
                {
                    int start = matcher.start();
                    String state = location.substring(start, start + 2);
                    String fullName = lookupState(state.toUpperCase());
                    context.write(new Text(fullName), one);
                }
            }
        }
     }


    /*
    新api中DistributedCadhe需要通過建立符号連結的方式使用。
    */
    public static void main(String[] args) throws Exception
    {
         Configuration conf = new Configuration();
         Job job = new Job(conf, "UFOLocation");
         String cacheFilePath = "/user/hadoop/ufo/states.txt";
         Path inPath = new Path(cacheFilePath);
         //#後的為符号連結
         String inPathLink = inPath.toUri().toString()+"#"+MapClass.LINK_STATES_TXT;      
         DistributedCache.addCacheFile(new URI(inPathLink), job.getConfiguration());

         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(LongWritable.class);

         Configuration mapAConf = new Configuration(false);
         ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
             LongWritable.class, Text.class, mapAConf);

         Configuration mapBConf = new Configuration(false);
         ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
             Text.class, LongWritable.class, mapBConf);
        
         job.setJarByClass(ufo.UFORecordValidationMapper.class);
         job.setMapperClass(ChainMapper.class);
         job.setCombinerClass(LongSumReducer.class);
         job.setReducerClass(LongSumReducer.class);
        
         FileInputFormat.addInputPath(job, new Path(args[0]));
         FileOutputFormat.setOutputPath(job, new Path(args[1]));

         System.exit(job.waitForCompletion(true)?0:1);
    }    
}
           

Context

在舊api中使用Reporter來實作計數和狀态的輸出,在新api中通過Context實作。

UFOCountingRecordValidationMapper.java 

定義 enum LineCunters類型,通過context.getCounter得到計數器,使用計數器increment增加計數。通過context.setStatus設定狀态。

package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class UFOCountingRecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
    public enum LineCounters
    {
        BAD_LINES,
        TOO_MANY_TABS,
        TOO_FEW_TABS
    };
    
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String line = value.toString();
        if(validate(line, context))
            context.write(key, value); 
    }

    private boolean validate(String str, Context context)
    {
        String words[] = str.split("\t");
        if(words.length != 6)
        {
            if(words.length <6)
            {
                Counter ct = context.getCounter(LineCounters.TOO_FEW_TABS);
                ct.increment(1);
            }else{
                Counter ct = context.getCounter(LineCounters.TOO_MANY_TABS);
                ct.increment(1);
            }
            Counter ct = context.getCounter(LineCounters.BAD_LINES);
            ct.increment(1);
           
            if(ct.getValue() % 10 == 0){
                context.setStatus("Got 10 bad lines.");
                System.err.println("Read another 10 bad lines.");    
            }
            
            return false;
        }else
            return true;
    }
}
           

UFOLocation3.java

複制UFOLocation2.java改名,然後修改兩個地方。

ChainMapper.addMapper(job, UFOCountingRecordValidationMapper.class, LongWritable.class, Text.class,
             LongWritable.class, Text.class, mapAConf);
           
job.setJarByClass(ufo.UFOCountingRecordValidationMapper.class);
           

編譯和運作程式的輔助腳本

build.sh

因為使用了包ufo,是以build.sh放在目前目錄,java源檔案放在目前目錄的下一級目錄ufo。

#/bin/sh
HADOOP_LIB_DIR=/home/hadoop/cloud/hadoop/share/hadoop

rm -f ./*.class
rm -f ./ufo.jar

javac -classpath $HADOOP_LIB_DIR/common/hadoop-common-2.6.0.jar:$HADOOP_LIB_DIR/common/lib/commons-cli-1.2.jar:$HADOOP_LIB_DIR/common/lib/hadoop-annotations-2.6.0.jar:$HADOOP_LIB_DIR/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar -d . ufo/UFORecordValidationMapper.java ufo/UFOLocation.java ufo/UFOLocation2.java ufo/UFOCountingRecordValidationMapper.java  ufo/UFOLocation3.java

#package

jar -cvf ufo.jar ./ufo/*.class
           

run.sh

運作不同例子隻要改ufo.UFOLocation 為ufo.UFOLocation2 、ufo.UFOLocation3就行了

#/bin/sh
hdfs dfs -rm -r -f /user/hadoop/ufo/out001
hadoop jar ufo.jar ufo.UFOLocation /user/hadoop/ufo/ufo.tsv /user/hadoop/ufo/out001
hdfs dfs -cat /user/hadoop/ufo/out001/part-r-00000
           

繼續閱讀