精通Python爬蟲架構Scrapy

精通Python爬蟲架構Scrapy
2018年2月的書,居然代碼用的是Python2
環境使用的是
Vagrant
,但是由于國内網絡的問題,安裝的太慢了。
書裡内容比較高深,需要了解一些比較簡單的Scrapy内容可以看一下我github上的一些例子:https://github.com/zx490336534/spider-review
使用Xpath選擇HTML元素
選擇Html元素
$x('//h1')
Xpath表達式通過使用字首點号「.」轉為相對Xpath
XQuery 1.0、XPath 2.0 以及 XSLT 2.0 共享相同的函數庫。
Xpath的函數:https://www.w3school.com.cn/xsl/xsl_functions.asp
調試Scrapy
$ scrapy shell http://example.com
>>> response.xpath('//a/text()')
[<Selector xpath='//a/text()' data='More information...'>]
建立Scrapy項目
$ scrapy startproject xxx
Selectors對象
抽取資料的方式:https://docs.scrapy.org/en/latest/topics/selectors.html
檢視建立爬蟲模版
(venv) (base) 192:properties zhongxin$ scrapy genspider -l
Available templates:
basic
crawl
csvfeed
xmlfeed
使用
scrapy genspider -t
選擇模版進行建立
列印日志
def parse(self, response):
self.log('title:%s' % response.xpath('//*[@itemprop="name"][1]/text()').extract())
2021-03-06 09:15:30 [basic] DEBUG: title:['United Kingdom', 'England', 'London', 'All Categories', 'Property']
測試其他url
$ scrapy parse --spider=basic http://xxx
儲存檔案
from properties.items import PropertiesItem
def parse(self, response):
item = PropertiesItem()
item['title'] = response.xpath('//*[@itemprop="name"][1]/text()').extract()
return item
2021-03-06 09:23:08 [scrapy.core.scraper] DEBUG: Scraped from <200 https://www.gumtree.com/flats-houses/london>
{'title': ['United Kingdom', 'England', 'London', 'All Categories', 'Property']}
2021-03-06 09:23:08 [scrapy.core.engine] INFO: Closing spider (finished)
使用
-o
将item内容存到制定檔案中
(venv) (base) 192:properties zhongxin$ scrapy crawl basic -o a.json
儲存檔案
item裝載器與管理字段
官方文檔:https://docs.scrapy.org/en/latest/topics/loaders.html
import scrapy
from scrapy.loader import ItemLoader
from scrapy.loader.processors import MapCompose
from properties.items import PropertiesItem
class BasicSpider(scrapy.Spider):
name = 'basic'
allowed_domains = ['web']
start_urls = ['http://web/']
start_urls = (
'https://www.gumtree.com/flats-houses/london',
)
def parse(self, response):
l = ItemLoader(item=PropertiesItem(), response=response)
l.add_xpath('title', '//*[@itemprop="name"][1]/text()',
MapCompose(str.strip, str.title))
return l.load_item()
建立contract
為爬蟲設計的單元測試
def parse(self, response):
""" This function parses a property page.
@url http://web:9312/properties/property_000000.html
@returns items 1
@scrapes title price description address image_urls
@scrapes url project spider server date
"""
檢查該
url
并找到我列出的字段中有值的一個Item
$ scrapy check basic
使用CrawlSpider實作雙向爬取
CrawlSpider
提供了一個使用
rules
變量實作的
parse()
方法
rules = (
Rule(LinkExtractor(restrict_xpaths='//*[contains(@class,"next")]')),
Rule(LinkExtractor(restrict_xpaths='//*[@itemprop="url"]'),
callback='parse_item')
)
送出登入表單
from scrapy.http import FormRequest
class LoginSpider(CrawlSpider):
name = 'login'
allowed_domains = ["web"]
# Start with a login request
def start_requests(self):
return [
FormRequest(
"http://web:9312/dynamic/login",
formdata={"user": "user", "pass": "pass"}
)]
定制化登入
from scrapy.http import Request, FormRequest
class NonceLoginSpider(CrawlSpider):
name = 'noncelogin'
allowed_domains = ["web"]
# Start on the welcome page
def start_requests(self):
return [
Request(
"http://web:9312/dynamic/nonce",
callback=self.parse_welcome)
]
# Post welcome page's first form with the given user/pass
def parse_welcome(self, response):
return FormRequest.from_response(
response,
formdata={"user": "user", "pass": "pass"}
)
在響應間傳參
def parse(self, response):
xxx
yield Request(url, meta={"title": title}, callback=self.parse_item)
def parse_item(self, response):
l = ItemLoader(item=PropertiesItem(), response=response)
l.add_value('title', response.meta['title'], MapCompose(unicode.strip, unicode.title))
部署到Scrapinghub
http://scrapinghub.com/
編寫Item管道
# scrapybook/ch08/properties/properties/pipelines/tidyup.py
from datetime import datetime
class TidyUp(object):
"""A pipeline that does some basic post-processing"""
def process_item(self, item, spider):
"""
Pipeline's main method. Formats the date as a string.
"""
item['date'] = map(datetime.isoformat, item['date'])
return item
# scrapybook/ch08/properties/properties/settings.py
ITEM_PIPELINES = {
'properties.pipelines.tidyup.TidyUp': 100,
}
常用管道
class XXXPipeline(object):
def open_spider(self, spider):
self.f = xx.open()
def process_item(self, item, spider):
self.f.write(item)
return item
def close_spider(self, spider):
self.f.close()
例如,寫入json
import json
class MyPipeline(object):
def open_spider(self, spider):
self.file = open('Thanzhou.json', 'w', encoding='utf8')
def process_item(self, item, spider):
content = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.file.write(content)
return item
def close_spider(self, spider):
self.file.close()
Scrapy是一個Twisted應用
在任何情況下,都不要編寫阻塞的代碼
實作插入Mysql
import traceback
import dj_database_url
import MySQLdb
from twisted.internet import defer
from twisted.enterprise import adbapi
from scrapy.exceptions import NotConfigured
class MysqlWriter(object):
"""
A spider that writes to MySQL databases
"""
@classmethod
def from_crawler(cls, crawler):
"""Retrieves scrapy crawler and accesses pipeline's settings"""
# Get MySQL URL from settings
mysql_url = crawler.settings.get('MYSQL_PIPELINE_URL', None)
# If doesn't exist, disable the pipeline
if not mysql_url:
raise NotConfigured
# Create the class
return cls(mysql_url)
def __init__(self, mysql_url):
"""Opens a MySQL connection pool"""
# Store the url for future reference
self.mysql_url = mysql_url
# Report connection error only once
self.report_connection_error = True
# Parse MySQL URL and try to initialize a connection
conn_kwargs = MysqlWriter.parse_mysql_url(mysql_url)
self.dbpool = adbapi.ConnectionPool('MySQLdb',
charset='utf8',
use_unicode=True,
connect_timeout=5,
**conn_kwargs)
def close_spider(self, spider):
"""Discard the database pool on spider close"""
self.dbpool.close()
@defer.inlineCallbacks
def process_item(self, item, spider):
"""Processes the item. Does insert into MySQL"""
logger = spider.logger
try:
yield self.dbpool.runInteraction(self.do_replace, item)
except MySQLdb.OperationalError:
if self.report_connection_error:
logger.error("Can't connect to MySQL: %s" % self.mysql_url)
self.report_connection_error = False
except:
print traceback.format_exc()
# Return the item for the next stage
defer.returnValue(item)
@staticmethod
def do_replace(tx, item):
"""Does the actual REPLACE INTO"""
sql = """REPLACE INTO properties (url, title, price, description)
VALUES (%s,%s,%s,%s)"""
args = (
item["url"][0][:100],
item["title"][0][:30],
item["price"][0],
item["description"][0].replace("\r\n", " ")[:30]
)
tx.execute(sql, args)
@staticmethod
def parse_mysql_url(mysql_url):
"""
Parses mysql url and prepares arguments for
adbapi.ConnectionPool()
"""
params = dj_database_url.parse(mysql_url)
conn_kwargs = {}
conn_kwargs['host'] = params['HOST']
conn_kwargs['user'] = params['USER']
conn_kwargs['passwd'] = params['PASSWORD']
conn_kwargs['db'] = params['NAME']
conn_kwargs['port'] = params['PORT']
# Remove items with empty values
conn_kwargs = dict((k, v) for k, v in conn_kwargs.iteritems() if v)
return conn_kwargs