天天看點

scrapy-redis分布式爬蟲案例(房天下)

運作效果動圖:

scrapy-redis分布式爬蟲案例(房天下)

簡述:本案例中有一台Linux系統運作Redis伺服器,兩台Windows系統跑分布式爬蟲。爬蟲從Redis隊列中得到要爬取的URL,同時redis負責隊列中URL的去重以及爬蟲因某些原因暫停或者終止時,下次開啟爬蟲自動繼續上次未完成的URL繼續爬取,不會重頭開始爬,當爬蟲爬空Redis中的URL時,就會處于等待狀态,次數可以設定等待一段時間,如果隊列中還是沒有新增要爬取的URL就自動關閉爬蟲,避免爬蟲一直處于等待狀态而占用資源。

sfw.py

import scrapy
import re
from fang.items import NewHouseItem, OldHouseItem
from scrapy_redis.spiders import RedisSpider


class SfwSpider(RedisSpider):
    name = 'sfw'
    allowed_domains = ['fang.com']
    # start_urls = ['https://www.fang.com/SoufunFamily.htm']
    redis_key = 'fang:start_url'
    def parse(self, response):
        trs = response.xpath("//div[@class='outCont']//tr")
        province = None
        for tr in trs:
            tds = tr.xpath(".//td[not(@class)]")
            province_td = tds[0]
            province_text = province_td.xpath(".//text()").get()
            province_text = re.sub(r'\s', '', province_text)
            if province_text:
                province = province_text
            if province == '其它':
                continue
            city_td = tds[1]
            city_links = city_td.xpath(".//a")
            for city_link in city_links:
                city = city_link.xpath(".//text()").get()
                city_url = city_link.xpath(".//@href").get()
                # print("省份", province)
                # print("城市", city)
                # print("城市連結", city_url)
                url_module = city_url.split("//")
                scheme = url_module[0]
                domain = url_module[1]
                if 'bj.' in domain:
                    newHouse_url = 'https://newhouse.fang.com/house/s/'
                    oldHouse_url = 'https://esf.fang.com'
                else:
                    # 建構新房URL
                    newHouse_url = scheme+'//'+'newHouse.'+domain+'/house/s/'
                    # 建構二手房房URL
                    oldHouse_url = scheme+'//'+"esf."+domain
                # print("城市:%s%s" % (province, city))
                # print("新房連結: %s" % newHouse_url )
                # print("二手房連結: %s" % esf_url )
                yield scrapy.Request(url=newHouse_url, callback=self.parse_newhouse, meta={'info': (province, city)})
                yield scrapy.Request(url=oldHouse_url, callback=self.parse_oldhouse, meta={'info': (province, city)})
            #     break
            # break

    def parse_newhouse(self, response):
        province, city = response.meta.get('info')
        lis = response.xpath("//div[contains(@class,'nl_con')]/ul/li")
        for li in lis:
            name = li.xpath(".//div[@class='nlcd_name']/a/text()").get()
            if name is not None:
                name = name.strip()
                # print(name)
            house_type = li.xpath(".//div[contains(@class, 'house_type')]/a/text()").getall()
            house_type_list =list(map(lambda x: re.sub(r"\s", "", x), house_type))
            rooms =list(filter(lambda x: x.endswith('居'), house_type_list))
            # print(house_type_list)
            area = " ".join(li.xpath(".//div[contains(@class, 'house_type')]/text()").getall())
            area = re.sub(r"\s|-|/", "", area)
            # print(area)
            address = li.xpath(".//div[@class='address']/a/@title").get()
            district = li.xpath(".//div[@class='address']/a//text()").getall()
            district_text = "".join(district)
            district = re.search(r".*\[(.+)\].*", district_text)
            if district is not None:
                district = district.group(1)
            sale = li.xpath(".//div[contains(@class,'fangyuan')]/span/text()").get()
            price = "".join(li.xpath(".//div[@class = 'nhouse_price']//text()").getall())
            price = re.sub(r"\s|廣告", "", price)
            url = li.xpath(".//div[@class = 'nlcd_name']/a/@href").get()
            if url is not None:
                url = "https:"+url
            item = NewHouseItem(province=province, city=city, name=name, rooms=rooms, area=area,
                                district=district, address=address, price=price, sale=sale, url=url)
            yield item
        domain_url = "https://newhouse.fang.com"
        next_url = response.xpath("//a[@class='next']/@href").get()
        # next_url = domain_url+next_url
        # print("下一頁連結:"+next_url)
        if next_url:
            scrapy.Request(url=response.urljoin(next_url), callback=self.parse_newhouse, meta={'info': (province, city)})

    def parse_oldhouse(self, response):
        province, city = response.meta.get('info')
        print("二手房市場")
        dls = response.xpath("//div[contains(@class, 'shop_list')]/dl[contains(@dataflag,'bg')]")
        # dls = response.xpath("//div[contains(@class, 'shop_list')]/dl")
        for dl in dls:
            # print("dl循環")
            item = OldHouseItem(province=province, city=city)
            # name =dl.xpath(".//p[@class='add_shop']//text()").get()
            item['name'] = dl.xpath(".//p[@class='add_shop']/a/text()").get().strip()
            infos = dl.xpath(".//p[@class='tel_shop']/text()").getall()
            infos = list(map(lambda x:re.sub(r"\s", "", x), infos))

            for info in infos:
                if '廳' in info:
                    item['rooms'] = info
                elif '㎡' in info:
                    item['area'] = info
                elif '層' in info:
                    item['floor'] = info
                elif '向' in info:
                    item['toward'] = info
                elif '年' in info:
                    item['year'] = info.replace("年建", "")
                # print(item)
            item['address'] = dl.xpath(".//p[@class='add_shop']/span/text()").get()
            item['price'] = "".join(dl.xpath(".//dd[@class='price_right']/span[1]//text()").getall())
            item['unit'] = dl.xpath(".//dd[@class='price_right']/span[2]//text()").get()
            detail_url = dl.xpath(".//h4/a/@href").get()
            item['url'] = response.urljoin(detail_url)
            yield item
        next_url = response.xpath("//div[@class='page_al']/p[1]/a/@href").get()
        # print(next_url)
        if next_url is not None:
            scrapy.Request(url=response.urljoin(next_url), callback=self.parse_oldhouse, meta={'info': (province, city)})
           

pipelines.py

from scrapy.exporters import JsonLinesItemExporter


class FangPipeline(object):
    def __init__(self):
        self.newHouse_fp = open('newHouse.json', 'wb')
        self.oldHouse_fp = open('oldHouse.json', 'wb')
        self.newHouse_exporter = JsonLinesItemExporter(self.newHouse_fp, ensure_ascii=False)
        self.oldHouse_exporter = JsonLinesItemExporter(self.oldHouse_fp, ensure_ascii=False)

    def process_item(self, item, spider):
        self.newHouse_exporter.export_item(item)
        self.oldHouse_exporter.export_item(item)
        return item

    def close_spider(self):
        self.newHouse_fp.close()
        self.oldHouse_fp.close()
           

settings.py

# Scrapy-Redis相關配置
# 確定request存儲到redis中
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 確定所有爬蟲共享相同的去重指紋
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

# 設定redis為item pipeline
ITEM_PIPELINES = {
    'scrapy_redis.pipelines.RedisPipeline': 300
}

# 在redis中保持scrapy-redis用到的隊列,不會清理redis中的隊列,進而可以實作暫停和恢複的功能。
SCHEDULER_PERSIST = True

# 設定連接配接redis資訊
# REDIS_HOST = '127.0.0.1'
REDIS_HOST = '192.168.150.134'
REDIS_PORT = 8888

# 分布式爬蟲防止爬空,當redis隊列中沒有要爬的URL時,爬蟲仍會處于等待中,
MYEXT_ENABLED = True
# IDLE_NUMBER = 360   # 半個小時  配置空閑持續時間機關為 360個 ,一個時間機關為5s
IDLE_NUMBER = 60
EXTENSIONS = {
   'fang.extensions.RedisSpiderSmartIdleClosedExensions': 500,
}
           

繼續閱讀