Mapreduce Wordcount白名單 Python實作
1.Mapper部分的map.py代碼:
其中讀入檔案The_Man_of_Property.txt需要上傳到HDFS檔案系統上:hadoop fs -put The_Man_of_Property.txt /
# coding=utf-8
import sys
def read_local_file_func(f):
word_set = set();
file_in = open(f,'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
#white_list_fd 由run.sh配置檔案傳入
word_set = read_local_file_func(white_list_fd)
file_in = open('The_Man_of_Property.txt','r')
#file_out = open('mapper_result','w')
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word !="" and (word in word_set):
#file_out.write(s+"\t"+"1\n")
print "%s\t%s" % (s,1)
#file_out.close()
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
2.Reducer部分的red.py代碼:
# coding = utf-8
import sys
def reducer_func():
current_word = None
count_pool = []
sum = 0
#file_in = open('mapper_result','r')
for line in sys.stdin:
word,val = line.strip().split('\t')
count_pool.append(int(val))
if current_word == None :
current_word = word
if current_word != word:
for count in count_pool:
sum += count
print "%s\t%s" % (current_word,sum)
current_word = word
count_pool = []
sum = 0
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, str(sum))
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv)> 1:
args = sys.argv[2:]
func(*args)
3.建立一個shell腳本檔案:streaming接口運作的腳本run.sh
HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop" #hadoop的安裝路徑
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt" #wordcount檔案
OUTPUT_PATH="/output_file_broadcast" #輸出檔案
$HADOOP_CMD fs -rmr -skipTrash #删除HDFS已存在的輸出檔案
$OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_2 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func white_list" \ #指定map的方法和參數
-reducer "python red.py reduer_func" \ #指定處理reduce的方法
-jobconf "mapred.reduce.tasks=3" \ #配置3個reduce
-file ./map.py \
-file ./red.py \
-file ./white_list
4.本地測試
cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func
5.hadoop叢集測試
運作bash.run.sh
hadoop fs -ls /output_file_broadcast
-rw-r--r-- 3 root supergroup 0 2019-10-30 03:26 /output_file_broadcast/_SUCCESS
drwxr-xr-x - root supergroup 0 2019-10-30 03:24 /output_file_broadcast/_logs
-rw-r--r-- 3 root supergroup 22 2019-10-30 03:26 /output_file_broadcast/part-00000
-rw-r--r-- 3 root supergroup 9 2019-10-30 03:25 /output_file_broadcast/part-00001
-rw-r--r-- 3 root supergroup 0 2019-10-30 03:26 /output_file_broadcast/part-00002