天天看点

Python数据挖掘-Python with hadoop一、SnakeBite二、使用原生python编写MapReduce三、使用MRJOB编写Mapreduce程序四、MRJOB-实战Web日志分析

一、SnakeBite

1.1 Snakebite介绍

介绍:Snakebite由Spotify创建,需要python2 (python3版本目前并不支持) and python-protobuf 2.4.1或更高版本。Snakebite提供了一个Python客户端库,允许客户从Python应用程序中以编程方式访问HDFS。客户端库使用protobuf与NameNode直接通信的消息。snakebite还包括一个基于客户端库,可用于HDFS的命令行界面。

  下载地址:https://pypi.python.org/pypi/snakebite/   (目前最新版本号为snakebite-2.11.0);

   主页地址为:Home Page: http://github.com/spotify/snakebite

1.2 Snakebite安装:

Snakebite requires Python 2 and python-protobuf 2.4.1 or higher.Python 3 is currently not supported. Snakebite is distributed through PyPI and can be installed using pip :

$ pip install snakebite

1.3 操作例程:

Example 1-1. python/HDFS/list_directory.py

from snakebite.client import Client
client = Client('localhost', 9000)
for x in client.ls(['/']):
    print x
           

Example 1-2. python/HDFS/mkdir.py

from snakebite.client import Client
client = Client('localhost', 9000)
for p in client.mkdir(['/foo/bar', '/input'], create_parent=True):
   print p
           

Example 1-3. python/HDFS/delete.py

from snakebite.client import Client
client = Client('localhost', 9000)
for p in client.delete(['/foo', '/input'], recurse=True):
    print p
           

Example 1-4. python/HDFS/copy_to_local.py

from snakebite.client import Client
client = Client('localhost', 9000)
for f in client.copyToLocal(['/input/input.txt'], '/tmp'):
    print f
           

Example 1-5. python/HDFS/text.py

from snakebite.client import Client
client = Client('localhost', 9000)
for l in client.text(['/input/input.txt']):
print l
           

此外,snakebite还提供了snakebite CLI客户端,snakebite CLI客户端是一个Python命令行,配置查看http://snakebite.readthedocs.io/en/latest/,以下为《Python with hadoop》书中介绍使用例程:

$ snakebite ls /
Found 2 items
drwx------ - hadoop supergroup 0 2015-09-20 14:36 /tmp
drwxr-xr-x - hadoop supergroup 0 2015-09-20 11:40 /user
           

二、使用原生python编写MapReduce

简介:由于hadoop有java和Streaming两种方式来编写MapReduce任务:

1、java的优点是计算效率高,并且部署方便,直接打包成一个jar文件就可以了;

2、Hadoop Streaming是hadoop提供的一个编程工具,它允许用户使用任何可执行文件或脚本作为mapper和Reducer。

于是我们试着编写程序:

Python数据挖掘-Python with hadoop一、SnakeBite二、使用原生python编写MapReduce三、使用MRJOB编写Mapreduce程序四、MRJOB-实战Web日志分析
第一段代码:mapper.py
import sys  

for line in sys.stdin:  
    line = line.strip()   
    words = line.split()  
    for word in words:  
        print('%s\t%s' % (word, 1)) 
第二段代码reducer.py:
from operator import itemgetter  
import sys    
current_word = None  
current_count = 0  
word = None  

for line in sys.stdin:    
    line = line.strip()   
    word, count = line.split('\t', 1)  
    try:  
        count = int(count)  
    except ValueError:   
        continue  

    if current_word == word:  
        current_count += count  
    else:  
        if current_word:  
            # write result to STDOUT  
            print('%s\t%s' % (current_word, current_count))  
        current_count = count  
        current_word = word   
if current_word == word:  
    print('%s\t%s' % (current_word, current_count))
           

本地调试:

[[email protected] ~]# cd /root/hadooptest/
[[email protected] hadooptest]# cat input.txt | ./mapper.py 
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
abc 1
bar 1
see 1
you 1
by 1
test 1
welcome 1
test 1
abc 1
labs 1
foo 1
me 1
python 1
hadoop 1
ab 1
ac 1
bc 1
bec 1
python 1


[[email protected] hadooptest]# cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py 
ab 1
abc 2
ac 1
bar 2
bc 1
bec 1
by 1
foo 4
hadoop 1
labs 2
me 1
python 2
quux 2
see 1
test 2
welcome 1
you 1
           

Hadoop平台下运行:

先cd到mapper.py与reducer.py的目录下,再进行运行。
           
Python数据挖掘-Python with hadoop一、SnakeBite二、使用原生python编写MapReduce三、使用MRJOB编写Mapreduce程序四、MRJOB-实战Web日志分析
hadoop –jar /home/hadoop/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar 
-jobconf mapred.reduce.tasks=2
-mapper “python mapper.py”
-file ./mapper.py
-reducer “python reducer.py”
-file ./reducer.py
-input /testpy/
-output /hehe

注意:这里的jobconf mapred.reduce.tasks=2表示有两个Reduce
           

参数说明:

/usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \
-input <输入目录> \ # 可以指定多个输入路径,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
-inputformat <输入格式 JavaClassName> \
-output <输出目录> \
-outputformat <输出格式 JavaClassName> \
-mapper <mapper executable or JavaClassName> \
-reducer <reducer executable or JavaClassName> \
-combiner <combiner executable or JavaClassName> \
-partitioner <JavaClassName> \
-cmdenv <name=value> \ # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个
-file <依赖的文件> \ # 配置文件,字典等依赖
-D <name=value> \ # 作业的属性配置
           

三、使用MRJOB编写Mapreduce程序

3.1 Mrjob介绍:

Mrjob是一个编写MapReduce任务的开源Python框架,它实际上对Hadoop Streaming的命令行进行了封装,因此接粗不到Hadoop的数据流命令行,使我们可以更轻松、快速的编写MapReduce任务。

Mrjob具有如下特点:

代码简洁,map及reduce函数通过一个Python文件就可以搞定;
      
支持多步骤的MapReduce任务工作流;      
支持多种运行方式,包括内嵌方式、本地环境、Hadoop、远程亚马逊;      
支持亚马逊网络数据分析服务Elastic MapReduce(EMR);      
调试方便,无需任何支持环境。      

3.2 安装Mrjob:

1、联网下运行:pip install mrjob

2、也可下载http://pythonhosted.org/mrjob/guides/quickstart.html后解压,再进入mrjob安装包解压后的目录,安装python setup.py install

测试代码:WordCount代码

3.3 代码测试:

Mrjob通过Python的yield机制将函数变成一个生成器,通过不断调用next()去实现key:value的初始化或运算操作。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from mrjob.job import MRJob

class MRWordCounter(MRJob):
    def mapper(self,key,line): #接收每一行的输入数据,处理后返回一堆key:value,初始化value值为1
        for word in line.split():
            yield word,1

    def reducer(self,word,occurrences): #接收mapper输出的key:value对进行整合,把相同key的value做累加(sum)操作后输出
        yield word,sum(occurrences)

if __name__ == '__main__':
MRWordCounter.run()
           

运行:

Mrjob支持4种运行方式:内嵌(-r inline)、本地(-r local)、Hadoop(-r hadoop)、Amazon EMR(-r emr)

内嵌

特点是调试方便,启动单一进程模拟任务执行状态及结果,Mrjob默认以内嵌方式运行,选项可以不写。输出可以用‘>’或‘-o’。下面两条命令是等价的

#python word_count.py -r inline > output.txt
python word_count.py -r inline -o output.txt      

  结果

本地

用于本地模拟Hadoop调试,与内嵌方式的区别是启动了多进程执行每一个任务

python word_count.py -r local -o output.txt      

  结果

Hadoop

用于Hadoop环境,支持Hadoop运行调度控制参数。

python word_count.py -r hadoop --jobconf mapreduce.job.priority=VREY_HIGH --jobconf mapreduce.job.maps=2 --jobconf mapreduce.job.reduces=1 -o hdfs:///output/hadoop hdfs:///user/hadoop/input

#--jobconf mapreduce.job.priority=VREY_HIGH 指定任务调度优先级(VREY_HIGH|HIGH)
#--jobconf mapreduce.job.maps=2 Map任务个数限制
#--jobconf mapreduce.job.reduces=1 Reduce任务个数限制      

  结果

四、MRJOB-实战Web日志分析

示例场景

日志说明

有两台Web服务器,日志文件存放在/usr/local/nginx/logs/目录,日志默认为nginx定义格式。如:

123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg HTTP/1.1" 206 51934 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)"
120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 16631 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)"
123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg HTTP/1.1" 206 53119 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)"
219.137.119.16 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/gamenet/icon/icon_0_506_0.jpg HTTP/1.1" 404 1035 "-" "Dalvik/v3.3.110_update3 (Linux; U; Android 2.2.1-R-20151127.1131; ET_35 Build/KTU84Q)"
120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 40719 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)"      

以空格分隔,共有12列数据:

1、客户端IP
2、空白(远程登录名称)
3、空白(认证的远程用户)
4、请求时间
5、时区(UTC)
6、请求方法
7、请求资源
8、http协议
9、状态码
10、发送字节数
11、访问来源
12、客户浏览信息(不具体拆分)      

场景部署

在两台Web服务器上部署HDFS客户端,以便定期上传Web日志到HDFS存储平台,最终实现分布式计算。

上传日志到HDFS存储的脚本

【/root/hadooptest/hdfsput.py】

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import subprocess
import sys
import datetime

webid = 'test1' #HDFS存储日志标志,另一台Web服务器为:test2
currdate = datetime.datetime.now().strftime('%Y%m%d')

logspath = '/usr/local/nginx/logs/access.log' #日志路径
logname = 'access.log.'+webid

try:
    #创建HDFS目录,目录格式:nginx/20160825,加wait()是为了让父进程等待子进程完成后再继续往下执行(subporcess默认启动子进程后不等待其执行结果就继续往下执行)
    subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-mkdir','-p','hdfs:///user/root/nginx'+currdate],stdout=subprocess.PIPE).wait() 
except Exception as e:
    pass

putinfo = subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-put',logspath,'hdfs:///user/root/nginx/' +currdate +'/'+logname],stdout=subprocess.PIPE) #上传本地日志到HDFS

for line in putinfo.stdout:
    print line      

添加定时功能到crontab

0 0 * * * /usr/bin/python /root/hadooptest/hdfsput.py >> /dev/null 2>&1      

两台Web服务器都上传日志后,HDFS上信息如下:

[[email protected] ~]# hadoop fs -ls /user/root/nginx/20160825
Found 2 items
-rw-r--r-- 1 root supergroup 15 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test1
-rw-r--r-- 1 root supergroup 28 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test2
      

网站访问流量统计

网站访问流量作为衡量一个站点的价值、热度的重要指标,另外CDN服务中流量会涉及计费,如何快速准确分析当前站点的流量数据至关重要。下面实现精确到分钟统计网站访问流量,原理是在mapper操作时将Web日志中小时的每分钟作为key,将对应的行发送字节数作为value,在reducer操作时对相同key做累加(sum统计)。

【/root/hadooptest/httpflow.py】

#/usr/bin/env python
# -*- coding:utf-8 -*-

from mrjob.job import MRJob
import re

class MRCounter(MRJob):
    def mapper(self, key, line):
        i = 0
        for flow in line.split(): #获取时间段,为域日志的第4列,内容如:“[24/Aug/2016:00:00:02”
            if i==3:
                timerow = flow.split(':')
                hm = timerow[1] + ':' + timerow[2] #获取'小时:分钟',作为key
            if i==9 and re.match(r'\d{1,}',flow): #获取日志第10列:发送的字节数,作为value
                yield hm,int(flow) #初始化key:value
            i+=1

    def reducer(self, key, occurences):
        yield key,sum(occurences) #相同key“小时:分钟”的value做累加操作

if __name__ == '__main__':
    MRCounter.run()      

生成Hadoop任务,运行:

python /root/hadoop/httpflow.py -r hadoop -o hdfs://output/httpflow hdfs:///user/root/nginx      

建议将分析的数据定期入库MySQL,利用MySQL灵活丰富的SQL支持,可以很方便的对数据进行加工,轻松输出比较美观的数据报表。

网站HTTP状态码统计

统计一个网站的HTTP状态码比例数据,可以帮助我们了解网站的可用度及健康状态,比如我们关注的200、404/5xx状态等。在此示例中我们利用Mrjob的多步调用的形式来实现,除了基本的mapper、reducer方法外,还可以自定义处理方法,在steps中添加调用即可。

【/root/hadooptest/httpstatus.py】

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from mrjob.job import MRJob
import re

class MRCounter(MRJob):
    def mapper(self, key, line):
        i = 0
        for httpcode in line.split():
            if i == 8 and re.match(r'\d{1,3}',httpcode): #获取日志中HTTP状态码段,作为key
                yield httpcode,1 #初始化key:value,value计数为1,方便reducer做累加
            i+=1

    def reducer(self, httpcode,occurrences):
        yield httpcode,sum(occurrences) #对排序后的key对应的value作sum累加

    def steps(self):
        return [self.mr(mapper=self.mapper),self.mr(reducer=self.reducer)] #在steps方法中添加调用队列

if __name__ == '__main__':
    MRCounter.run()      

生成Hadoop任务,运行:

python httpstatus.py -r hadoop -o hdfs:///output/httpstatus hdfs:///user/nginx      

分析结果:

[[email protected] hadooptest]# hadoop fs -cat /output/httpstatus/part-00000
"200" 608997
"206" 2802574
"302" 1
"304" 34600
"400" 30
"401" 1
"404" 1653791
"416" 180358
"499" 2689      

网站分钟级请求数统计

一个网站请求量大小,直接关系到网站的访问质量,非常有必要对改数据进行分析且关注。本示例以分钟为单位对网站的访问数进行统计。

【/root/hadooptest/http_minute_conn.py】

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from mrjob.job import MRJob
import re

class MRCounter(MRJob):
    def mapper(self, key, line):
        i = 0
            for dt in line.split():
                if i == 3: #获取时间段,位于日志的第4列,内容如“[24/Aug/2016:00:00:02”
                    timerow = dt.split(':')
                    hm = timerow[1] + ':' + timerow[2] #获取'小时:分钟',作为key
                    yield hm,1 #初始化key:value
                i+=1

    def reducer(self, key,occurrences):
        yield key,sum(occurrences) #对排序后的key对应的value作sum累加

if __name__ == '__main__':
    MRCounter.run()      

生成Hadoop任务,运行:

python http_minute_conn.py -r hadoop -o hdfs:///output/http_minute_conn hdfs:///user/nginx      

网站访问来源IP统计

统计用户的访问来源IP可以更好地了解网站的用户分布,同时也可以帮助安全人员捕捉攻击来源。实现原理是定义匹配IP正则字符串作为key,将value初始化为1,执行reducer操作时做累加(sum)统计

【/root/hadooptest/ipstat.py】

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from mrjob.job import MRJob
import re

IP_RE = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #定义IP正则匹配

class MRCounter(MRJob):
    def mapper(self, key, line):
        for ip in IP_RE.findall(line): #匹配IP正则后生成key:value,其中key为IP地址,value初始值为1
            yield ip,1

    def reducer(self, ip,occurrences):
        yield ip,sum(occurrences) #对排序后的key对应的value作sum累加

if __name__ == '__main__':
    MRCounter.run()      

生成Hadoop任务,运行:

python ipstat.py -r hadoop -o hdfs:///output/ipstat hdfs:///user/nginx      

网站文件访问统计

通过统计网站文件的访问次数可以帮助运维人员了解访问最集中的文件,以便进行有针对性的优化,比如调整静态文件过期策略、优化动态cgi的执行速度、拆分业务逻辑等。实现原理是讲访问文件作为key,初始化value为1,执行reducer是做累加(sum)统计。

【/root/hadooptest/httpfile.py】

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from mrjob.job import MRJob
import re

IP_RE = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #定义IP正则匹配

class MRCounter(MRJob):
    def mapper(self, key, line):
        i = 0
        for url in line.split(): 
            if i==6: #获取日志中URL文件资源字段,作为key
                yield url,1
            i+=1

    def reducer(self, url,occurrences):
        yield url,sum(occurrences) #对排序后的key对应的value作sum累加

if __name__ == '__main__':
    MRCounter.run()      

生成Hadoop任务,运行:

python httpfile.py -r hadoop -o hdfs:///output/httpfile hdfs:///user/nginx      

同理,我们可以使用以上方法对User-Agent域进行分析,包括浏览器类型及版本、操作系统及版本、浏览器内核等信息,为更好地提升用户体验提供数据支持。

参考资料:

1、刘天斯《Python自动化运维技术与最佳实践》

2、博客园文章:http://www.cnblogs.com/MacoLee/p/5805656.html

3、《Python with Hadoop》Zachary Radtka & Donald Miner

继续阅读