天天看點

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

作者:程式設計技術分享

目前爬虫程序的开发语言首选 Python,因为 Python 为我们提供丰富的第三方爬虫库。除了熟练掌握爬虫库之外,我们还可以自己动手开发个人的爬虫框架,本文将为大家讲解如何开发个人爬虫框架:

  1. 框架设计说明
  2. 异步爬取方式
  3. 数据清洗机制
  4. 数据存储机制
  5. 实战:爬取豆瓣电影
  6. 框架的功能扩展

1. 框架设计说明

爬虫开发不管是使用爬虫库还是爬虫框架,若按照功能划分,整个爬虫程序分为三部分:数据爬取、数据清洗和数据入库。本文开发的爬虫框架也是按照功能划分的逻辑来实现,目前尚处于雏形阶段,虽然能实现爬虫开发,但尚有很多功能有待完善。

本篇文章讲述的爬虫框架现由 4 个文件组成,分别是初始化文件 __init__.py、功能文件 pattern.py、spider.py、storage.py。文件说明如下:

  • 初始化文件 __init__.py 用于设置框架的版本信息和导入框架的功能文件;
  • 数据清洗文件 pattern.py 用于定义数据清洗类,清洗方式与 Scrapy 框架相似;
  • 数据爬取文件 spider.py 用于定义数据爬取类,爬取方式支持异步并发、URL 去重和分布式;
  • 数据存储文件 storage.py 用于定义数据存储类,目前支持关系型数据库、非关系型数据库、CSV 文件存储数据和文件下载功能。

我们将框架命名为 pyReptile,在 D 盘里创建文件夹 pyReptile,然后在文件夹里创建文件,框架的目录结构如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

由于初始化文件 __init__.py 只是设置框架的版本信息及导入框架的功能文件,因此初始化文件的代码如下:

# project: pyReptile
    # author:  Xy Huang
    __version__ = '1.0.0'
    # 导入功能文件
    from .storage import *
    from .spider import *
    from .pattern import *
           

初始化文件是整个框架的入口,它导入了整个框架的功能。在使用框架的时候,只需在初始化文件调用相关的功能模块即可。[功能文件 pattern.py、spider.py 和 storage.py 支撑整个框架的运行,其原理图如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

pyReptile 框架的设计原理是从 Scrapy 框架和 SQLAlchemy 框架受到启发的,具体的说明如下:

  • 数据爬取方式由 URL 地址的数据格式决定,如果 URL 地址的数据格式为列表,pyReptile 就会执行异步并发,并将所有请求的响应内容以列表格式返回;如果传入的 URL 地址是字符串格式(即单一的 URL 地址),pyReptile 就直接返回相应的响应内容,并且还支持 URL 去重和分布式爬虫功能。
  • 数据清洗采用 Scrapy 框架的清洗模式,使用方式与 Scrapy 框架有一定的相似之处,目前仅支持 CssSelector 和 Xpath 定位方式。
  • 数据入库支持关系型数据库、非关系型数据库和 CSV 文件存储,关系型数据库由 SQLAlchemy 框架实现;非关系型数据库目前仅支持 MongoDB 数据库。pyReptile 简化入库方式,只需将爬取的数据以字典格式传入即可实现入库操作。

2. 异步爬取方式

pyReptile 框架的数据爬取由 Aiohttp 模块实现,因此它具备了异步并发功能。我们将 Aiohttp 模块的数据爬取功能进行封装和延伸,简化了其使用方式,使用者只需调用相关的函数并传入参数即可发送 HTTP 请求。打开 spider.py 文件,在文件里定义爬虫类 Request,代码如下:

import asyncio
    import aiohttp
    import redis

    # 设置默认参数
    TIMEOUT = 40
    REQUEST_HEADERS = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'en',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'}
    # 实例化对象,用于发送HTTP请求
    loop = asyncio.get_event_loop()

    # 定义装饰器,实现URL去重或分布式处理
    def distributes(func):
    def wrapper(self, url, **kwargs):
        redis_host = kwargs.get('redis_host', '')
        if redis_host:
            port = kwargs.get('port', 6379)
            db = kwargs.get('db', 1)
            redis_db = redis.Redis(host=redis_host, port=port, db=db)
            redis_data_dict = 'keys'
            if not redis_db.hexists(redis_data_dict, url):
                redis_db.hset(redis_data_dict, url, 0)
                return func(self, url, **kwargs)
            else:
                return {}
        else:
            return func(self, url, **kwargs)
    return wrapper

    # 定义爬虫类
    class Request(object):
    # 定义异步函数
    async def httpGet(self, url, **kwargs):
        cookies = kwargs.get('cookies', {})
        params = kwargs.get('params', {})
        proxy = kwargs.get('proxy', '')
        timeout = kwargs.get('timeout', TIMEOUT)
        headers = kwargs.get('headers', REQUEST_HEADERS)
        # 带代理IP
        if proxy:
            async with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.get(url, params=params, proxy=proxy, timeout=timeout, headers=headers) as response:
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result
        # 不带代理IP
        else:
            async with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.get(url, params=params, timeout=timeout, headers=headers) as response:
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result

    # 定义异步函数
    async def httpPost(self, url, **kwargs):
        cookies = kwargs.get('cookies', {})
        data = kwargs.get('data', {})
        proxy = kwargs.get('proxy', '')
        timeout = kwargs.get('timeout', TIMEOUT)
        headers = kwargs.get('headers', REQUEST_HEADERS)
        if proxy:
            async with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.post(url, data=data, proxy=proxy, timeout=timeout, headers=headers) as response:
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result
        else:
            async with aiohttp.ClientSession(cookies=cookies) as session:
                async with session.post(url, data=data, timeout=timeout, headers=headers) as response:
                    result = dict(
                        content=await response.read(),
                        text=await response.text(),
                        status=response.status,
                        headers=response.headers,
                        url=response.url
                    )
                    return result

    # 定义GET请求方式
    @distributes
    def get(self, url, **kwargs):
        tasks = []
        if isinstance(url, list):
            for u in url:
                task = asyncio.ensure_future(self.httpGet(u, **kwargs))
                tasks.append(task)
            result = loop.run_until_complete(asyncio.gather(*tasks))
        else:
            result = loop.run_until_complete(self.httpGet(url, **kwargs))
        return result

    # 定义POST请求方式
    @distributes
    def post(self, url, **kwargs):
        tasks = []
        if isinstance(url, list):
            for u in url:
                task = asyncio.ensure_future(self.httpPost(u, **kwargs))
                tasks.append(task)
            result = loop.run_until_complete(asyncio.gather(*tasks))
        else:
            result = loop.run_until_complete(self.httpPost(url, **kwargs))
        return result

    # 实例化Request对象
    request = Request()
           

上述代码主要分为:初始化变量、定义装饰器与对象及定义爬虫类 Request。初始化变量与对象是设置爬虫的超时时间、请求头以及实例化对象 loop,该对象用于发送 HTTP 请求;定义装饰器用于爬虫类 Request,实现 URL 去重功能或分布式功能。爬虫类 Request 一共定义 4 个函数,函数的功能说明如下:

  • 函数 httpGet 是定义 Aiohttp 的异步 GET 请求函数,函数参数 url 以字符串格式表示,代表请求地址 URL,可选参数 kwargs 代表自定义的请求设置,如请求头、代理 IP、Cookies 信息、超时和请求参数等。
  • 函数 httpGet 会对参数 proxy 进行判断,如果参数 proxy 非空,Aiohttp 在发送 GET 请求的时候,则在请求里添加参数 proxy,由于参数 proxy 的特殊性,如果参数 proxy 为空并且在请求里添加参数 proxy,Aiohttp 会提示异常信息,因此函数需要对参数 proxy 进行判断处理。最后,函数会将响应内容以字典格式返回。
  • 函数 httpPost 是定义 Aiohttp 的异步 POST 请求函数,函数参数 url 和 kwargs 与函数 httpGet 的参数功能一致;函数的功能实现过程与函数 httpGet 的相似,区别在于两者的 HTTP 请求方式各有不同。
  • 函数 get 是定义爬虫类 Request 的 GET 请求方式,函数参数 url 的数据格式可为字符串或列表格式,可选参数 kwargs 代表自定义的请求设置,如请求头、代理 IP、Cookies 信息、超时和请求参数等,参数 kwargs 也是函数 httpGet 的参数 kwargs。
  • 函数 get 经过装饰器 distributes 过滤,装饰器从函数 get 获取 Redis 数据库连接参数,如果没有数据库连接参数,则往下执行函数 get;如果存在数据库连接参数,则连接 Redis 数据库并判断参数 url 是否记录在 Redis 数据库,若已记录,不再执行函数 get,反之执行函数 get。
  • 函数 get 对参数 url 进行判断,如果 url 是列表,则对列表进行遍历,每次遍历调用函数 httpGet,传入当前的 URL 地址并添加到任务列表,然后将任务列表交给对象 loop 处理,对所有任务发送异步并发的 HTTP 请求,最后将所有请求的响应内容以列表格式返回。如果 url 是字符串,则由对象 loop 调用函数 httpGet,发送 HTTP 请求并返回响应内容。
  • 函数 post 是定义爬虫类 Request 的 POST 请求方式,函数参数 url 和 kwargs 与函数 get 的参数功能一致;函数的功能实现过程与函数 get 的相似,区别在于两者调用的 Aiohttp 异步函数各有不同。

从爬虫类 Request 的代码可以看到,函数之间的代码存在重复使用的情况,因为 Aiohttp 在使用过程中需要以 with 模块化表示,从而导致代码出现重复。

为了测试爬虫类 Request 的功能是否正确,我们在 spider.py 文件目录下创建 spiderTest.py 文件,并在文件里编写功能测试代码,如下所示:

from spider import request
    # GET请求
    from spider import request

    # GET请求
    url = 'http://httpbin.org/get'
    # url = ['http://httpbin.org/get']
    params = {
        'pyReptile': 'spiderGet'
    }
    cookies = {
        'pyReptile': 'spiderCookies'
    }
    # URL去重或分布式,设置Redis数据库连接参数
    redis_host = '127.0.0.1'

    r = request.get(url, params=params, cookies=cookies, 
    redis_host=redis_host)
    print(r.get('text', ''))
    # print(r[0]['text'])

    # POST请求
    url = 'http://httpbin.org/post'
    # url = ['http://httpbin.org/post']
    data = {
        'pyReptile': 'spiderPost'
    }
    cookies = {
        'pyReptile': 'spiderCookies'
    }
    r = request.post(url, data=data, cookies=cookies)
    print(r.get('text', ''))
    # print(r[0]['text'])
           

上述代码简单演示了 pyReptile 框架的 GET 和 POST 请求,使用方法与 Requests 模块相似,但在发送 HTTP 请求的时候,pyReptile 框架会根据参数 url 的数据格式而执行相应的请求处理,这一优势是 Requests 模块无法比拟的。运行上述代码就会分别输出 GET 和 POST 请求的响应内容,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

上述测试代码中,GET 请求设置数据库连接参数 redis_host,当再次运行上述代码时就不再执行 GET 请求,打开 RedisDesktopManager 查看 Redis 数据库,查看数据库所记录的 URL 地址,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

3. 数据清洗机制

pyReptile 框架的数据清洗由 BeautifulSoup4 和 lxml 模块实现,使用者只需调用相关的函数并传入相应的参数即可清洗数据。打开 pattern.py 文件,在文件里定义数据清洗类 DataPattern,代码如下:

from bs4 import BeautifulSoup
    import lxml
    from lxml.html.soupparser import fromstring as soup_parse

    class DataPattern(object):
        def cssSelector(self, response, selector, **kwargs):
            parser = kwargs.get('parser', 'html.parser')
            tempList = []
            soup = BeautifulSoup(response, parser)
            temp = soup.select(selector=selector)
            for i in temp:
                tempList.append(i.getText())
            return tempList

        def xpath(self,response,selector, **kwargs):
            parser = kwargs.get('parser', 'html.parser')
            try:
                soup = soup_parse(response, features=parser)
            except:
                soup = lxml.html.fromstring(response)
            temp = soup.xpath(selector)
            tempList = []
            for i in temp:
                tempList.append(i.text)
            return tempList

    dataPattern = DataPattern()
           

数据清洗类 DataPattern 定义了函数 cssSelector() 和 xpath(),两个函数的参数说明如下:

  • 参数 response 代表 HTTP 请求的响应内容;
  • 参数 selector 代表目标数据的定位方法,定位方法采用 5CssSelector 或 Xpath 语法;
  • 可选参数 kwargs 是自定义设置,如参数 parser 可自定义选择 HTML 解析器,若无对参数 parser 进行设置,则默认使用 Python 标准库的 HTML 解析器——html.parser。

函数 cssSelector() 和 xpath() 实现数据清洗处理,具体的实现过程如下:

  • 从可选参数 kwargs 获取参数 parser,如果 parser 的参数值为空,则默认使用 html.parser 作为解析器,将参数 response 的参数值进行 HTML 解析并生成 soup 对象。
  • 由参数 selector 对 soup 对象进行定位和查找,从中找出符合条件的数据对象 temp。
  • 遍历循环对象 temp,获取对象 temp 的数据内容并写入列表 tempList,再将列表作为函数返回值。
  • 将数据清洗类 DataPattern 进行实例化,生成对象 dataPattern,用于开发者的调用。

为了测试数据清洗类 DataPattern 的功能是否正确,在 pattern.py 文件目录下创建 patternTest.py 文件,并在文件里编写功能测试代码,如下所示:

from pattern import dataPattern
    from spider import request
    url = 'https://movie.douban.com/subject/3168101/comments'
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)
        AppleWebKit/537.36 (KHTML, like Gecko)
        Chrome/70.0.3538.67 Safari/537.36'
    }
    r = request.get(url, headers=headers)

    # cssSelector   
    title = dataPattern.cssSelector(r['text'], '#content > h1')
    print(title)
    selector = 'div.comment> p > span'
    comment=dataPattern.cssSelector(r['text'],selector,parser='html5lib')
    print(len(comment))

    # xpath
    title = dataPattern.xpath(r['text'], '//*[@id="content"]/h1')
    print(title)
    selector = '//*[@id="comments"]//p//span'
    comment = dataPattern.xpath(r['text'], selector, parser='html5lib')
    print(len(comment))
           

上述代码使用爬虫类 Request 向豆瓣电影评论页发送 HTTP 请求,并将响应内容交给数据清洗对象 dataPattern 进行清洗处理,从响应内容中分别提取电影标题和评论内容。由于评论内容较多,我们只输出电影标题和评论总数,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

4. 数据存储机制

pyReptile 框架的数据存储是采用 SQLAlchemy 框架、pymongo 和 csv 模块实现的,分别提供了三种不同的数据存储方式,在使用过程中只需设置数据存储方式及调用相关方法即可实现数据存储处理。打开 storage.py 文件,在文件里定义数据存储类 DataStorage,代码如下:

from sqlalchemy import *
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.declarative import declarative_base
    from pymongo import MongoClient
    import csv
    import os
    Base = declarative_base()

    # 定义数据存储类DataStorage
    class DataStorage(object):
        def __init__(self, CONNECTION, **kwargs):
            self.databaseType = kwargs.get('databaseType', 'CSV')
            # 根据参数databaseType选择存储方式,默认CSV文件存储
            if self.databaseType == 'SQL':
                # 根据字段创建映射类和数据表
                self.field()
                tablename = kwargs.get('tablename', self.__class__.__name__)
                self.table = self.table(tablename)
                self.DBSession = self.connect(CONNECTION)
            elif self.databaseType == 'NoSQL':
                self.DBSession = self.connect(CONNECTION)
            else:
                self.path = CONNECTION

        # 定义数据表字段
        def field(self):
            # self.name = Column(String(50))
            pass

        # 连接数据库,生成DBSession对象
        def connect(self, CONNECTION):
            # 连接关系型数据库
            if self.databaseType == 'SQL':
                engine = create_engine(CONNECTION)
                DBSession = sessionmaker(bind=engine)()
                Base.metadata.create_all(engine)
            # 连接非关系型数据库
            else:
                info = CONNECTION.split('/')
                # 连接Mongo数据库
                connection = MongoClient(
                    info[0],
                    int(info[1])
                )
                db = connection[info[2]]
                DBSession = db[info[3]]
            return DBSession

        # 定义映射类
        def table(self, tablename):
            class TempTable(Base):
                __tablename__ = tablename
                id = Column(Integer, primary_key=True)
            # 将类属些进行判断,符合sqlalchemy的字段则定义到数据映射类
            for k, v in self.__dict__.items():
                if isinstance(v, Column):
                    setattr(TempTable, k, v)
            return TempTable

        # 插入数据
        def insert(self, value):
            # 关系型数据库的数据插入
            if self.databaseType == 'SQL':
                self.DBSession.execute(self.table.__table__.insert(), value)
                self.DBSession.commit()
            # 非关系型数据库的数据插入
            elif self.databaseType == 'NoSQL':
                # 判断参数value的数据类型,选择单条数据还是多条数据插入
                if isinstance(value, list):
                    self.DBSession.insert_many(value)
                else:
                    self.DBSession.insert(value)

        # 更新数据
        def update(self, value, condition={}):
            # 关系型数据库的数据更新
            if self.databaseType == 'SQL':
                # 更新条件只设置了单个条件
                if condition:
                    c = self.table.__dict__[list(condition.keys())[0]].in_(list(condition.values()))
                    self.DBSession.execute(self.table.__table__.update().where(c).values(), value)
                # 全表更新
                else:
                    self.DBSession.execute(self.table.__table__.update().values(), value)
                self.DBSession.commit()
            # 非关系型数据库的数据更新
            elif self.databaseType == 'NoSQL':
                self.DBSession.update_many(condition, {'$set': value})

        # 文件下载
        def getfile(self, content, filepath):
            with open(filepath, 'wb') as code:
                code.write(content)

        # 数据写入CSV文件
        def writeCSV(self, value, title=[]):
            # 参数title为空列表,则将字典的keys进行排序并作为CSV的标题
            if not title:
                title = sorted(value[0].keys())
            # 判断文件是否存在,
            pathExists = os.path.exists(self.path)
            with open(self.path, 'a', newline='') as csv_file:
                csv_writer = csv.writer(csv_file)
                # 文件不存在,则写入标题
                if not pathExists:
                    csv_writer.writerow(title)
                # 将数据写入CSV文件
                for v in value:
                    valueList = []
                    for t in title:
                        valueList.append(v[t])
                    csv_writer.writerow(valueList)
           

数据存储类 DataStorage 定义 8 个方法,分别是初始化方法 __init__()、类方法 field()、connect()、table()、insert()、update()、getfile() 和 writeCSV(),每个方法所实现的功能说明如下。

(1) 初始化方法__init__() 根据参数 databaseType 来执行相应的数据存储方式,每种数据存储方式说明如下:

  • 如果参数 databaseType 设为 SQL,则说明数据存储方式为关系型数据库。初始化方法会从可选参数 kwargs 里获取参数 tablename,如果参数 tablename 不存在,则由子类的名字作为数据表的表名;然后调用类方法 field(),从类方法 field() 里获取自定义的字段属性,用于定义数据表映射类;再调用类方法 table() 来创建数据表映射类,并以类属性 table 表示;最后调用类方法 connect() 进行数据库连接,将数据库连接对象返回并以类属性 DBSession 表示。
  • 如果参数 databaseType 设为 NoSQL,则说明数据存储方式为非关系型数据库。初始化方法只调用类方法 connect() 并把参数 CONNECTION 传入,实现数据库连接,将数据库连接对象返回并以类属性 DBSession 表示。
  • 如果参数 databaseType 设为 CSV 或没有设置参数 databaseType,则说明数据存储方式为 CSV 文件存储。初始化方法将参数 CONNECTION 赋值给类属性 path,类属性 path 代表 CSV 文件路径信息。

(2) 类方法 field() 让开发者自定义数据表字段,主要用于关系型数据库的存储方式。在使用过程中,通过子类继承数据存储类 DataStorage,在子类里重写类方法 field() 即可实现自定义表字段。

(3) 类方法 connect() 根据参数 databaseType 来选择相应的数据库连接方式。如果使用关系型数据库,则使用 SQLAlchemy 框架实现数据库连接,反之则使用 pymongo 模块连接 MongoDB。

(4) 类方法 table() 定义数据表映射类 TempTable,映射类会默认创建主键 ID,然后遍历数据存储类 DataStorage 的类属性,并对每个类属性的数据类型进行判断,如果类属性是 Column 对象(即 SQLAlchemy 的表字段对象),则使用 Python 内置方法 setattr() 将类属些写入数据表映射类 TempTable。

(5) 类方法 insert() 实现数据入库功能,支持关系型和非关系型数据库的数据入库操作。插入的数据必须是字典格式,并且字典的 key 必须为表字段。参数 value 可以是列表或字典形式,若是以字典表示,则插入单条数据,若是以列表表示,则插入多条数据。

(6) 类方法 update() 实现数据更新功能,支持关系型和非关系型数据库的数据更新操作。参数 value 必须是字典格式,并且字典的 key 必须为表字段;参数 condition 是更新条件,它的默认值为 None,如果参数值为 None,则对全表数据进行更新处理,反之对符合条件的数据进行更新处理。

(7) 类方法 getfile() 实现文件下载功能,参数 content 代表文件内容;参数 filepath 代表文件所保存的绝对路径。

(8) 类方法 writeCSV() 实现 CSV 文件存储数据功能,参数 title 代表文件表头内容,如果参数值为空,则以参数 value 首个元素的 keys 作为表头内容,参数 title 以列表表示,列表元素决定了数据写入顺序;参数 value 是待存储的数据内容,也是以列表表示,每个列表元素是以字典表示。

综合上述,类方法 field()、connect() 和 table() 主要用于初始化方法__init__(),为初始化方法__init__() 分别提供数据表字段、数据库连接对象 DBSession 和数据表映射类 TempTable;类方法 insert() 和 update() 是实现数据库的数据操作(如数据的新增或修改);getfile() 和 writeCSV() 分别实现文件下载功能和 CSV 文件存储数据功能。

为了验证数据存储类 DataStorage 的功能是否正确,在 storage.py 文件目录下创建三个测试文件 storageTest-CSV.py、storageTest-NoSQL.py 和 storageTest-SQL.py,分别验证三种数据存储方式。

首先创建并打开 storageTest-CSV.py,在文件里编写功能测试代码,验证 CSV 文件存储数据功能,如下所示:

from storage import *
    if __name__ == '__main__':
        CONNECTION = 'data.csv'
        # 待存储数据personInfo
        personInfo = [{'name': 'Lucy', 'age': '21', 'address': '北京市'},
                      {'name': 'Lily', 'age': '18', 'address': '上海市'}]
        # 实例化数据存储类DataStorage
        database = DataStorage(CONNECTION)
        # 调用writeCSV()实现CSV文件存储
        # database.writeCSV(personInfo)
        database.writeCSV(personInfo, title=['name', 'age', 'address'])
           

变量 CONNECTION 是 CSV 文件路径信息,在实例化数据存储类 DataStorage 的时候传入变量 CONNECTION 即可将数据存储方式选为 CSV 文件存储,无须设置参数 databaseType。实例化对象 database 调用 writeCSV() 方法即可实现 CSV 文件存储数据功能。

运行上述代码,并控制参数 title 的传入方式,分别查看参数 title 的传入是否对文件存储的造成影响,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

接着创建并打开 storageTest-NoSQL.py,在文件里编写功能测试代码,验证非关系型数据库的数据存储功能,如下所示:

from storage import *
    if __name__ == '__main__':
        CONNECTION = 'localhost/27017/test/storage_db'
        # 实例化数据存储类DataStorage
        database = DataStorage(CONNECTION, databaseType='NoSQL')
        # 插入多条数据
        personInfo = [{'name': 'Lucy', 'age': '21', 'address': '北京市'},
                      {'name': 'Lily', 'age': '18', 'address': '上海市'}]
        database.insert(personInfo)
        # 插入单条数据
        value = {'name': 'Tom', 'age': '21', 'address': '北京市'}
        database.insert(value)
        # 更新数据
        condition = {'name': 'Lucy'}
        updateInfo = {'name': 'Lucy', 'age': '22', 'address': '广州市'}
        database.update(updateInfo, condition)
           

变量 CONNECTION 是 MongoDB 的连接方式,在实例化数据存储类 DataStorage 的时候,传入变量 CONNECTION 并设置参数 databaseType 为 NoSQL 即可选择非关系型数据库的数据存储功能。实例化对象 database 调用 insert() 和 update() 方法,分别实现多条数据插入、单条数据插入和数据更新功能。

运行上述代码之前,在 MongoDB 的可视化工具里操作 MongoDB,创建数据库 test。代码运行成功后,在可视化工具里查看数据库 test 的 storage_db 集合,该集合的数据信息如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

最后创建并打开 storageTest-SQL.py,在文件里编写功能测试代码,验证关系型数据库的数据存储功能,如下所示:

from storage import *
    # 定义数据表personinfo
    class PersonInfo(DataStorage):
        def field(self):
            # 定义数据表字段
            # self.name = Column(String(50))
            self.name = Column(String(50), comment='姓名')
            self.age = Column(String(50), comment='年龄')
            self.address = Column(String(50), comment='地址')

    # 定义数据表schoolinfo
    class SchoolInfo(DataStorage):
        def field(self):
            # 定义数据表字段
            # self.name = Column(String(50))
            self.school = Column(String(50), comment='学校')
            self.name = Column(String(50), comment='姓名')

    if __name__=='__main__':
        CONNECTION = 'mysql+pymysql://root:1234@localhost/storage_db?charset=utf8mb4'
        person = PersonInfo(CONNECTION, databaseType='SQL')
        school = SchoolInfo(CONNECTION, databaseType='SQL')
        # 对personInfo表插入多条数据
        personInfo = [{'name': 'Lucy', 'age': '21', 'address': '北京市'},
                      {'name': 'Lily', 'age': '18', 'address': '上海市'}]
        person.insert(personInfo)
        # 对schoolInfo表插入单条数据
        schoolInfo = {'name': 'Lucy', 'school': '清华大学'}
        school.insert(schoolInfo)

        # 对personInfo表更新数据
        condition = {'id': 1}
        personInfo = {'name': 'Lucy', 'age': '22', 'address': '广州市'}
        person.update(personInfo, condition)
        # 对schoolInfo表更新数据
        schoolInfo = {'name': 'Lucy', 'school': '北京大学'}
        school.update(schoolInfo, condition)
           

上述代码分别定义了数据存储类 PersonInfo 和 SchoolInfo,两者通过重写类方法 field() 来实现表字段的定义。在文件中的运行函数 __main__ 分别对类 PersonInfo 和 SchoolInfo 进行实例化,由于子类继承了父类 DataStorage 的初始化方法,因此数据存储类 PersonInfo 和 SchoolInfo 在实例化的时候会定义数据表映射类和创建数据表连接对象,最后实例化对象 person 和 school 分别调用 insert() 和 update() 方法,实现数据的入库和更新处理。

从使用方式发现,关系型数据库的使用方式不同于非关系型数据库和 CSV 文件,前者是通过定义子类并继承数据存储类 DataStorage,再实例化子类并调用相关的方法,从而实现数据存储功能;而非关系型数据库和 CSV 文件是直接实例化数据存储类 DataStorage 并调用相关的方法。

运行上述代码,并打开数据库 storage_db 查看数据表 schoolinfo 和 personinfo 的数据信息,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

5. 实战:爬取豆瓣电影

相信读者对 pyReptile 框架设计已有一定的了解,我们通过一个实战项目来讲述如何使用 pyReptile 框架实现爬虫开发。以豆瓣电影为例,选取某一部电影作为爬取对象,分别爬取电影信息和电影评论。在电影信息页里分别爬取电影名称和剧情简介,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

然后在浏览器中打开电影评论页,分别爬取用户名和评论内容,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

爬取的数据皆可从开发者工具 Network 选项卡的 Doc 分类标签里找到数据位置,本节不再讲述网页结构的分析过程。我们将 pyReptile 框架放置在 Python 安装目录的 site-packages 文件夹,这是将 pyReptile 框架以第三方库的形式安装在 Python 里,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

完成 pyReptile 框架安装后,在 D 盘下创建文件夹 doubanSpider,并在文件夹里分别创建 fields.py 和 spider.py 文件。文件夹 doubanSpider 是项目的文件目录,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

打开 fields.py 文件,分别定义数据存储类 MovieComment 和 MovieInfo,两者皆继承 pyReptile 框架的数据存储类 DataStorage。在自定义的数据存储类中,重写类方法 field() 并在类方法里自定义类属性,每个自定义的类属性代表数据表的表字段,代码如下:

from pyReptile.storage import *
    # 定义电影信息表的字段
    class MovieComment(DataStorage):
        def field(self):
            # 定义数据表字段
            self.movieId = Column(String(50), comment='电影ID')
            self.user = Column(String(50), comment='用户名')
            self.comment = Column(String(3000), comment='评论内容')

    # 定义电影评论表的字段
    class MovieInfo(DataStorage):
        def field(self):
            # 定义数据表字段
            self.movieId = Column(String(50), comment='电影ID')
            self.name = Column(String(50), comment='电影名称')
            self.summary = Column(String(3000), comment='剧情简介')
           

最后在 spider.py 文件里编写具体的爬虫规则,数据存储介质选择 MySQL 数据库,爬取数据是某部电影的基本信息和前十页的评论内容。实现代码如下:

from pyReptile import request, dataPattern
    from fields import MovieComment, MovieInfo
    import time

    # 基本设置
    CONNECTION = 'mysql+pymysql://root:1234@localhost/
    spiderdb?charset=utf8mb4'
    # 实例化数据存储类,定义映射类以及创建数据表
    movieComment = MovieComment(CONNECTION)
    movieInfo = MovieInfo(CONNECTION)

    # 爬取电影信息
    def get_movie(movieId):
        # URL以字符串格式传入
        r = request.get(movieUrl % (movieId))
        name = dataPattern.cssSelector(r['text'], 'h1 > span')[0]
        summary = dataPattern.cssSelector(r['text'],'#link-report')[0].strip()
        movieDic = dict(movieId=movieId, name=name, summary=summary)
        # 查询数据表是否已存在数据
        queryMovie = movieInfo.DBSession.query(movieInfo.table).filter_by(movieId=movieId).all()
        # 存在数据则作更新处理
        if queryMovie:
            condition = {'movieId': movieId}
            movieInfo.update(movieDic, condition)
        # 不存在就插入新的数据
        else:
            movieInfo.insert(movieDic)

    # 爬取电影评论
    def get_comment(movieId):
        # URL以列表格式传入
        urlList = []
        for page in range(10):
            urlList.append(commentUrl % (movieId, str(page * 20)))
        valueList = []
        responseList = request.get(urlList)
        for response in responseList:
            commentList = dataPattern.cssSelector(response['text'], 'div.comment > p > span')
            userList = dataPattern.cssSelector(response['text'], 'span.comment-info > a')
            for comment, user in zip(commentList, userList):
                valueList.append(dict(movieId=movieId, user=user, comment=comment))
                # 数据入库
                movieComment.insert(valueList)
    if __name__ == '__main__':
        # 开始时间
        localTime = time.localtime(time.time())
        beginTime = time.strftime("%H:%M:%S", localTime)
        print('程序开始时间:' + beginTime)
        # 爬虫程序
        movieUrl = 'https://movie.douban.com/subject/%s/?from=showing'
        commentUrl = 'https://movie.douban.com/subject/%s/comments?start=%s&limit=20&sort=new_score&status=P'
        movieId = '3168101'
        get_movie(movieId)
        get_comment(movieId)
        # 结束时间
        localTime = time.localtime(time.time())
        endTime = time.strftime("%H:%M:%S", localTime)
        print('程序结束时间:' + endTime)
           

上述代码可划分为 4 部分,分别是 pyReptile 框架功能的初始化、电影信息的爬虫函数 get_movie()、电影评论的爬虫函数 get_comment() 和文件运行入口,说明如下:

(1) pyReptile 框架功能的初始化是设置 SQLAlchemy 连接 MySQL 的连接内容,由 pymysql 模块实现连接,数据存储在数据库 spiderdb;将数据库的连接内容以参数的形式传入数据存储类 MovieComment 和 MovieInfo,生成实例化对象 movieComment 和 movieInfo。

(2) 电影信息的爬虫函数 get_movie() 是对电影信息页进行数据爬取、清洗和入库处理,说明如下:

  • 首先对电影信息页的 URL 地址发送 HTTP 请求,因为只爬取某一部电影,所以 URL 地址是以字符串格式表示;
  • 从响应内容里提取电影名称和剧情简介,将提取的数据转换成字典格式,字典的 key 是数据表的表字段,即数据存储类 MovieInfo 定义的类属些,字典的 value 是提取的数据内容;
  • 最后由对象 movieInfo 判断电影 ID 是否已存在,若存在,则对数据表的数据进行更新处理,反之则对数据表新增数据。

(3) 电影评论的爬虫函数 get_comment() 是对前十页的电影评论页进行数据爬取、清洗和入库处理,说明如下:

  • 前十页的电影评论页共有 10 条不同的 URL 地址,因此 URL 地址是以列表的形式传入请求函数 get(),pyReptile 框架对其执行异步并发的 HTTP 请求;
  • 将前十页的响应内容进行遍历,每次遍历会提取当前页面的用户名和评论内容,再将用户名和评论内容转换成字典格式,并且写入列表 valueList,该列表保存了前十页所有的用户名和评论内容;
  • 最后由对象 movieComment 对列表 valueList 执行数据入库处理。

(4) 文件运行入口是设置电影 ID、信息页和评论页的 URL 地址、调用爬虫函数 get_movie() 和 get_comment() 以及设置程序运行的开始时间和结束时间。通过程序运行前后的时间对比,可以得知 pyReptile 框架的爬取效率。运行 spider.py 文件,若不考虑网速或硬件等因素,项目的爬取效率约为 3 秒。

最后打开数据库 spiderdb,分别查看数据表 movieinfo 和 moviecomment 的数据信息,如图所示。

實戰 Python 網絡爬蟲:Python 開發個人爬蟲架構

6. 框架的功能扩展

从框架设计来看,爬虫框架目前还有很多功能尚未完善,优化功能说明如下。

爬虫类 Request 需要添加 Selenium 或 Splash 等功能。

如果遇到反爬虫机制,爬虫类 Request 需要有待优化,并且现在爬虫类 Request 的代码量相对冗余。