天天看點

利用scrapy_redis中間件增加代理簡單爬取新片場前20頁視訊資料,并存入mysql資料庫

1、创建Scrapy项目

scrapy startproject XPC_Redis
           

2.进入项目目录,使用命令genspider创建Spider(注意后面允许爬取的域要增加)

scrapy genspider xpc_redis xinpianchang.com
           

3、定义要抓取的数据(处理items.py文件)

# -*- coding: utf-8 -*-

import scrapy

class XpcRedisItem(scrapy.Item):
    # 视频id
    v_id = scrapy.Field()
    # 视频名字
    video_name = scrapy.Field()
    # 视频分类
    category = scrapy.Field()
    # 上传时间
    up_time = scrapy.Field()
    # 播放量
    play_counts = scrapy.Field()
    # 点赞量
    like_counts = scrapy.Field()
    # 视频链接地址
    video_url = scrapy.Field()
    # 视频介绍
    video_info = scrapy.Field()
    # json文件地址,这个页面可以查看到视频的播放地址video_url
    json_url = scrapy.Field()
    # 视频详情页地址
    video_detail_url = scrapy.Field()
    # 本条记录的添加时间
    add_time = scrapy.Field()
           

4、编写提取item数据的Spider(在spiders文件夹下:xpc_redis.py)

# -*- coding: utf-8 -*-
import re
import datetime
import scrapy
import json
from XPC_Redis.items import XpcRedisItem
from scrapy_redis.spiders import RedisSpider


class XpcRedisSpider(RedisSpider):
    name = 'xpc_redis'
    allowed_domains = ['xinpianchang.com','openapi-vtom.vmovier.com']
    # start_urls = ['https://www.xinpianchang.com/channel/index/type-/sort-like/duration_type-0/resolution_type-/page-1']
    redis_key = 'xpc_redis:start_urls'
    # lpush xpc_redis:start_urls https://www.xinpianchang.com/channel/index/type-/sort-like/duration_type-0/resolution_type-/page-1

    def parse(self, response):
        # 获取视频id,每页40条
        video_id = response.xpath('//div[@class="channel-con"]/ul[@class="video-list"]/li/@data-articleid').extract()
        for id in video_id:
            # 视频详情页地址
            video_detail_url = 'https://www.xinpianchang.com/a{}'.format(id)
            yield scrapy.Request(url=video_detail_url,meta={'meta_1':video_detail_url},callback=self.video_detail)
        # 非登录状态只能获取20页
        total_page = 20
        for page in range(2,total_page+1):
             # print("处理第%s页..."%page)
            url = 'https://www.xinpianchang.com/channel/index/type-/sort-like/duration_type-0/resolution_type-/page-'
            yield scrapy.Request(url=url+str(page),callback=self.parse)
    # 视频详情页
    def video_detail(self,response):
        # 在spider运行到某个位置时暂停,查看被处理的response等情况
        # from scrapy.shell import inspect_response
        # inspect_response(response, self)
        meta_1 = response.meta['meta_1']
        # with open(meta_1.split('a')[-1] + ".html",'w',encoding='utf-8')as f:
        #     f.write(response.text)
        item = XpcRedisItem()
        # 视频详情页面
        item['video_detail_url'] = meta_1
        item['v_id'] = meta_1.split('a')[-1]
        # 视频名字
        video_name = response.xpath('//div[@class="title-wrap"]/h3/text()').extract_first()
        item['video_name'] = video_name.strip()
        # 视频分类
        # category = response.xpath('//span/span[contains(@class,"cate")]//text()').extract()
        # item['category'] = "".join([s.strip() for s in category])
        # 视频分类可能有多个,先判断有几个分类,取奇数个,偶数个是个|符号
        category_count = len(response.xpath("//span[contains(@class,'cate-box')]/span/a[1]"))
        if category_count >1:
            category_list = []
            for i in range(1,category_count+1):
                c = response.xpath("//span[contains(@class,'cate-box')]/span["+str(2*i-1)+"]/a/text()").extract()
                category_list.append("-".join([s.strip() for s in c]))
            item['category'] = ",".join(category_list)
        else:
            category = response.xpath('//span/span[contains(@class,"cate")]//text()').extract()
            item['category'] = "".join([s.strip() for s in category])
        # 视频上传时间,时间会显示昨天不知道几号要转换
        up_time = response.xpath('//div/span[contains(@class,"update-time")]/i/text()').get()
        today = datetime.datetime.today()
        if '昨天' in up_time:
            yes = today - datetime.timedelta(days=1)
            up_time = up_time.replace('昨天', yes.strftime("%Y-%m-%d"))
        elif '今天' in up_time:
            up_time = up_time.replace('今天', today.strftime("%Y-%m-%d"))
        item['up_time'] = up_time
        # 播放量
        play_counts = response.xpath('//div/i[contains(@class,"play-counts")]/@data-curplaycounts').get()
        item['play_counts'] = play_counts
        # 喜欢量,点赞量
        like_counts = response.xpath('//span/span[contains(@class,"like-counts")]/@data-counts').get()
        item['like_counts'] = like_counts
        # 视频连接地址
        # video_url = response.xpath('//*[@id="xpc_video"]/source/@src').extract_first()
        # item['video_url'] = video_url.strip()
        # 视频介绍
        video_info= response.xpath('//div[@class="filmplay-info"]/div/p[1]/text()').extract()
        video_info = [s.strip() for s in video_info]
        item['video_info']= ','.join(video_info)
        # data-vid是json文件地址的一部分:960VAm7OGE7DRnW8
        # https://openapi-vtom.vmovier.com/v3/video/960VAm7OGE7DRnW8?expand=resource&usage=xpc_web&appKey=61a2f329348b3bf77
        # ①通过xpath获取data_vid
        # data_vid = response.xpath('//div[@class="filmplay-data"]/div/span/a/@data-vid').extract_first()
        # ②通过正则获取data_vid
        patt_vid = re.compile(r'vid = "(\w+)";')
        data_vid = patt_vid.findall(response.text)[0]
        # modeServerAppKey=61a2f329348b3bf77这个值不知道会不会变
        patt_modeServerAppKey = re.compile(r'modeServerAppKey = "(\w+)";')
        data_modeServerAppKey = patt_modeServerAppKey.findall(response.text)[0]
        # json文件地址,这个页面可以查看到视频的播放地址video_url
        json_url = 'https://openapi-vtom.vmovier.com/v3/video/{}?expand=resource&usage=xpc_web&appKey={}'.format(data_vid,data_modeServerAppKey)
        item['json_url'] = json_url
        yield scrapy.Request(url=json_url,meta={'meta_2':item},callback=self.video_address)
    # 视频地址
    def video_address(self,respones):
        item = XpcRedisItem()
        meta_2 = respones.meta['meta_2']
        item['v_id'] = meta_2['v_id']
        item['video_name'] = meta_2['video_name']
        item['video_detail_url'] = meta_2['video_detail_url']
        item['video_info'] = meta_2['video_info']
        item['json_url'] = meta_2['json_url']
        item['category'] = meta_2['category']
        item['up_time'] = meta_2['up_time']
        item['play_counts'] = meta_2['play_counts']
        item['like_counts'] = meta_2['like_counts']
        # 这条记录添加时间
        item['add_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        json_html = json.loads(respones.text)
        # resource = {'default':'','progressive':'','lowest':''},这里面有不同的清晰度,要进行判断
        resource = json_html['data']['resource']
        if 'default' in resource.keys():
            item['video_url'] = json_html['data']['resource']['default']['url']
        elif 'progressive' in resource.keys():
            item['video_url'] = json_html['data']['resource']['progressive'][0]['url']
        else:
            item['video_url'] = json_html['data']['resource']['lowest']['url']
        yield item
           

5.处理pipelines管道文件保存数据,将结果保存到数据库中(pipelines.py)

# -*- coding: utf-8 -*-

import pymysql

class XpcRedisPipeline(object):
    @classmethod
    def from_crawler(cls,crawler):
        cls.MYSQL_HOST = crawler.settings.get('MYSQL_HOST')
        cls.MYSQL_PORT = crawler.settings.get('MYSQL_PORT')
        cls.MYSQL_USER = crawler.settings.get('MYSQL_USER')
        cls.MYSQL_PASSWD = crawler.settings.get('MYSQL_PASSWD')
        cls.MYSQL_DBNAME = crawler.settings.get('MYSQL_DBNAME')
        cls.MYSQL_CHARSET = crawler.settings.get('MYSQL_CHARSET')
        return cls()
    def __init__(self):
        self.db = pymysql.connect(host=self.MYSQL_HOST,port=self.MYSQL_PORT,user=self.MYSQL_USER,passwd=self.MYSQL_PASSWD,
                        db=self.MYSQL_DBNAME,charset=self.MYSQL_CHARSET)
        self.cursor = self.db.cursor()

    def process_item(self,item,spider):
        try:
            # 尝试创建xpc表
            # self.cursor.execute('DROP table IF EXISTS xpc')
            sql = 'CREATE TABLE IF NOT EXISTS xpc(v_id BIGINT primary key not null COMMENT "视频页id",' \
                  'video_name varchar(200),category varchar(100),up_time VARCHAR(50),add_time DATETIME,play_counts INT(13),like_counts INT(13),' \
                  'video_detail_url varchar(100),video_url varchar(200),video_info LONGTEXT,' \
                  'json_url varchar(300))ENGINE =InnoDB DEFAULT CHARSET=utf8mb4;'
            self.cursor.execute(sql)
        except Exception as e:
            print("xpc表已存在,无需创建!")
        try:
            # 去重处理
            self.cursor.execute("SELECT v_id from xpc WHERE v_id=%s;",item['v_id'])
            repetition = self.cursor.fetchone()
            keys, values = zip(*item.items())
            # 如果存在,则不重新插入,只更新
            if repetition:
                # ON DUPLICATE KEY UPDATE:数据已存在,只是更新部分字段值,否则插入重复key值数据会报错
                sql = """
                    INSERT INTO xpc({})VALUES ({}) ON DUPLICATE KEY UPDATE {};""".format(
                    ','.join(keys),
                    ','.join(['%s']*len(values)),
                    ','.join(['{}=%s'.format(k) for k in keys]))
                self.cursor.execute(sql,values*2)
            else:
                sql = """
                    INSERT INTO xpc({})VALUES ({});""".format(
                    ','.join(keys),
                    ','.join(['%s'] * len(values)))
                self.cursor.execute(sql, values)
            self.db.commit()
            # print(self.cursor._last_executed)
            return item
        except Exception as e:
            print("出错ERROR:",e)
            self.db.rollback()

    def close_spider(self,spider):
        print("mysql数据库处理完毕")
        self.cursor.close()
        self.db.close()

           

6.配置settings文件(settings.py)

ROBOTSTXT_OBEY = False
PROXY_REDIS_KEY = 'xpc:proxies'

MYSQL_HOST = 'localhost'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWD = '123456'
MYSQL_DBNAME = 'python4'
MYSQL_CHARSET = 'utf8mb4'

DOWNLOAD_DELAY = 3

DEFAULT_REQUEST_HEADERS = {
'User-Agesettingsnt': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0);',
  # 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
  # 'Accept-Language': 'en',
}

DOWNLOADER_MIDDLEWARES = {
   # 'XPC_Redis.middlewares.XpcRedisDownloaderMiddleware': 543,
   'XPC_Redis.middlewares.RandomProxyMiddleware': 749,
}
DOWNLOAD_TIMEOUT = 5

ITEM_PIPELINES = {
   'XPC_Redis.pipelines.XpcRedisPipeline': 300,
   'scrapy_redis.pipelines.RedisPipeline': 400,
}


# 还可以将日志存到本地文件中(可选添加设置)
LOG_FILE = "xpc_redis.log"
LOG_LEVEL = "DEBUG"
# 包含打印信息也一起写进日志里
LOG_STDOUT = True
           

7-选用,增加随机代理中间件(middlewares.py)

# -*- coding: utf-8 -*-

import random
import redis,requests

from scrapy import signals
from scrapy.exceptions import NotConfigured
from twisted.internet.error import ConnectionRefusedError,TimeoutError
from scrapy.core.downloader.handlers.http11 import TunnelError

class RandomProxyMiddleware(object):
    def __init__(self,settings):
        self.r = redis.StrictRedis(host='127.0.0.1',decode_responses=True)
        self.proxy_key = settings.get('PROXY_REDIS_KEY')
        # 获取长度,将代理池中的HTTP全部修改成小写http
        llren = self.r.llen(self.proxy_key)
        for i in range(llren):
            self.r.lset(self.proxy_key,i,[s.lower() for s in self.r.lrange(self.proxy_key,0,-1) if isinstance(s,str)==True][i])
             # 记录某个代理失败的次数的key
        self.proxy_stats_key =self.proxy_key+'_stats'
        # 最大失败次数
        self.max_failed = 3
    # 获取所有代理
    @property
    def proxies(self):
        return  self.r.lrange(self.proxy_key,0,-1)

    @classmethod
    def from_crawler(cls,crawler):
        # 如果HTTPPROXY_ENABLED为false或者proxies中没有代理数据的时候
        if not cls.proxies or not crawler.settings.getbool('HTTPPROXY_ENABLED'):
            # 该异常由某些组件(例如:Downloader middlwares)抛出,声明其仍然保持关闭
            raise NotConfigured
        return cls(crawler.settings)

    def process_request(self,request,spider):
        if self.proxies and not request.meta.get('proxy'):
            request.meta['proxy'] = random.choice(self.proxies)
            print("当前使用的代理是:%s"%request.meta['proxy'])

    def process_response(self,request,response,spider):
        # 获取状态码
        get_status =response.status
        cur_proxy = request.meta.get('proxy')
        if get_status in (400,403,404):
            # cur_proxy 加1
            self.r.hincrby(self.proxy_stats_key,cur_proxy,1)
        # 失败次数,str类型
        failed_times = self.r.hget(self.proxy_stats_key,cur_proxy) or 0
        # 如果大于次数并且cur_proxy不为None,如果代理池中无数据,则为None
        if (int(failed_times) > self.max_failed) and cur_proxy:
            print("got error http code(%s) when use proxy:%s" % (get_status, cur_proxy))
            self.remove_proxy(cur_proxy)
            if cur_proxy:
                del request.meta['proxy']
                return request
        return response

    def process_exception(self,request,exception,spider):
        cur_proxy = request.meta.get('proxy')
        if cur_proxy and isinstance(exception,(ConnectionRefusedError,TimeoutError,TunnelError)):
            print("出错ERROR(%s) when use proxy:%s" % (exception, cur_proxy))
            self.remove_proxy(cur_proxy)
            del request.meta['proxy']
            return request

    def remove_proxy(self,cur_proxy):
        if cur_proxy in self.proxies:
            # 从列表中删掉这个不能用的代理,防止再次被利用
            # self.proxies.remove(cur_proxy)
            self.r.lrem(self.proxy_key,0,cur_proxy)
            # 删掉统计信息
            self.r.hdel(self.proxy_stats_key,cur_proxy)
            print("remove proxy:%s from proxy list" % cur_proxy)
           

8.参照以下链接打开redis数据库:

https://blog.csdn.net/z564359805/article/details/80808155
           

9.以上设置完毕,进行爬取:执行项目命令crawl,启动Spider:

scrapy runspider xpc_redis.py
           

10.在Master端(核心服务器)的redis-cli输入push指令,参考格式:

输入:lpush xpc_redis:start_urls https://www.xinpianchang.com/channel/index/type-/sort-like/duration_type-0/resolution_type-/page-1
           

Â