天天看點

Mapreduce Wordcount白名單 Python實作

Mapreduce Wordcount白名單 Python實作

1.Mapper部分的map.py代碼:

其中讀入檔案The_Man_of_Property.txt需要上傳到HDFS檔案系統上:hadoop fs -put The_Man_of_Property.txt /

# coding=utf-8
import sys

def read_local_file_func(f):
    word_set = set();
    file_in = open(f,'r')
    for line in file_in:
        word = line.strip()
        word_set.add(word)
    return word_set

def mapper_func(white_list_fd):
    #white_list_fd 由run.sh配置檔案傳入
    word_set = read_local_file_func(white_list_fd)
    file_in = open('The_Man_of_Property.txt','r')
    #file_out = open('mapper_result','w')
    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            if word !="" and (word in word_set):
                #file_out.write(s+"\t"+"1\n")
                print "%s\t%s" % (s,1)
    #file_out.close()

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)

           

2.Reducer部分的red.py代碼:

# coding = utf-8
import sys


def reducer_func():
    current_word = None
    count_pool = []
    sum = 0
    

    #file_in = open('mapper_result','r')
    for line in sys.stdin:
        word,val = line.strip().split('\t')
        count_pool.append(int(val))
        if current_word == None :
            current_word = word
        
        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word,sum)
            current_word = word
            count_pool = []
            sum = 0
            
    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))
        
if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv)> 1:
        args = sys.argv[2:]
    func(*args)
           

3.建立一個shell腳本檔案:streaming接口運作的腳本run.sh

HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop" #hadoop的安裝路徑
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"

INPUT_FILE_PATH_1="/The_Man_of_Property.txt" #wordcount檔案
OUTPUT_PATH="/output_file_broadcast" #輸出檔案

$HADOOP_CMD fs -rmr -skipTrash  #删除HDFS已存在的輸出檔案
$OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_2 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func white_list" \ #指定map的方法和參數
    -reducer "python red.py reduer_func" \ #指定處理reduce的方法
    -jobconf "mapred.reduce.tasks=3" \ #配置3個reduce
    -file ./map.py \
    -file ./red.py \
    -file ./white_list
           

4.本地測試

cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func
           

5.hadoop叢集測試

運作bash.run.sh

hadoop fs -ls /output_file_broadcast
-rw-r--r--   3 root supergroup          0 2019-10-30 03:26 /output_file_broadcast/_SUCCESS
drwxr-xr-x   - root supergroup          0 2019-10-30 03:24 /output_file_broadcast/_logs
-rw-r--r--   3 root supergroup         22 2019-10-30 03:26 /output_file_broadcast/part-00000
-rw-r--r--   3 root supergroup          9 2019-10-30 03:25 /output_file_broadcast/part-00001
-rw-r--r--   3 root supergroup          0 2019-10-30 03:26 /output_file_broadcast/part-00002
           

繼續閱讀