天天看點

第三百五十八節,Python分布式爬蟲打造搜尋引擎Scrapy精講—将bloomfilter(布隆過濾器)內建到scrapy-redis中

第三百五十八節,Python分布式爬蟲打造搜尋引擎Scrapy精講—将bloomfilter(布隆過濾器)內建到scrapy-redis中,判斷URL是否重複

布隆過濾器(Bloom Filter)詳解

第三百五十八節,Python分布式爬蟲打造搜尋引擎Scrapy精講—将bloomfilter(布隆過濾器)內建到scrapy-redis中

基本概念

如果想判斷一個元素是不是在一個集合裡,一般想到的是将所有元素儲存起來,然後通過比較确定。連結清單,樹等等資料結構都是這種思路. 但是随着集合中元素的增加,我們需要的存儲空間越來越大,檢索速度也越來越慢。不過世界上還有一種叫作散清單(又叫哈希表,Hash table)的資料結構。它可以通過一個Hash函數将一個元素映射成一個位陣列(Bit Array)中的一個點。這樣一來,我們隻要看看這個點是不是 1 就知道可以集合中有沒有它了。這就是布隆過濾器的基本思想。

Hash面臨的問題就是沖突。假設 Hash 函數是良好的,如果我們的位陣列長度為 m 個點,那麼如果我們想将沖突率降低到例如 1%, 這個散清單就隻能容納 m/100 個元素。顯然這就不叫空間有效了(Space-efficient)。解決方法也簡單,就是使用多個 Hash,如果它們有一個說元素不在集合中,那肯定就不在。如果它們都說在,雖然也有一定可能性它們在說謊,不過直覺上判斷這種事情的機率是比較低的。

優點

相比于其它的資料結構,布隆過濾器在空間和時間方面都有巨大的優勢。布隆過濾器存儲空間和插入/查詢時間都是常數。另外, Hash 函數互相之間沒有關系,友善由硬體并行實作。布隆過濾器不需要存儲元素本身,在某些對保密要求非常嚴格的場合有優勢。

布隆過濾器可以表示全集,其它任何資料結構都不能;

k 和 m 相同,使用同一組 Hash 函數的兩個布隆過濾器的交并差運算可以使用位操作進行。

缺點

但是布隆過濾器的缺點和優點一樣明顯。誤算率(False Positive)是其中之一。随着存入的元素數量增加,誤算率随之增加。但是如果元素數量太少,則使用散清單足矣。

另外,一般情況下不能從布隆過濾器中删除元素. 我們很容易想到把位列陣變成整數數組,每插入一個元素相應的計數器加1, 這樣删除元素時将計數器減掉就可以了。然而要保證安全的删除元素并非如此簡單。首先我們必須保證删除的元素的确在布隆過濾器裡面. 這一點單憑這個過濾器是無法保證的。另外計數器回繞也會造成問題。

python 基于redis實作的bloomfilter(布隆過濾器),BloomFilter_imooc

BloomFilter_imooc下載下傳

下載下傳位址:https://github.com/liyaopinner/BloomFilter_imooc

依賴關系: 

  python 基于redis實作的bloomfilter

  依賴mmh3

  安裝依賴包:

  pip install mmh3

1、安裝好BloomFilter_imooc所需要的依賴

2、将下載下傳的BloomFilter_imooc包解壓後,将裡面的py_bloomfilter.py檔案複制到scrapy工程目錄

第三百五十八節,Python分布式爬蟲打造搜尋引擎Scrapy精講—将bloomfilter(布隆過濾器)內建到scrapy-redis中

py_bloomfilter.py(布隆過濾器)源碼

import mmh3
import redis
import math
import time


class PyBloomFilter():
    #内置100個随機種子
    SEEDS = [543, 460, 171, 876, 796, 607, 650, 81, 837, 545, 591, 946, 846, 521, 913, 636, 878, 735, 414, 372,
             344, 324, 223, 180, 327, 891, 798, 933, 493, 293, 836, 10, 6, 544, 924, 849, 438, 41, 862, 648, 338,
             465, 562, 693, 979, 52, 763, 103, 387, 374, 349, 94, 384, 680, 574, 480, 307, 580, 71, 535, 300, 53,
             481, 519, 644, 219, 686, 236, 424, 326, 244, 212, 909, 202, 951, 56, 812, 901, 926, 250, 507, 739, 371,
             63, 584, 154, 7, 284, 617, 332, 472, 140, 605, 262, 355, 526, 647, 923, 199, 518]

    #capacity是預先估計要去重的數量
    #error_rate表示錯誤率
    #conn表示redis的連接配接用戶端
    #key表示在redis中的鍵的名字字首
    def __init__(self, capacity=1000000000, error_rate=0.00000001, conn=None, key='BloomFilter'):
        self.m = math.ceil(capacity*math.log2(math.e)*math.log2(1/error_rate))      #需要的總bit位數
        self.k = math.ceil(math.log1p(2)*self.m/capacity)                           #需要最少的hash次數
        self.mem = math.ceil(self.m/8/1024/1024)                                    #需要的多少M記憶體
        self.blocknum = math.ceil(self.mem/512)                                     #需要多少個512M的記憶體塊,value的第一個字元必須是ascii碼,所有最多有256個記憶體塊
        self.seeds = self.SEEDS[0:self.k]
        self.key = key
        self.N = 2**31-1
        self.redis = conn
        # print(self.mem)
        # print(self.k)

    def add(self, value):
        name = self.key + "_" + str(ord(value[0])%self.blocknum)
        hashs = self.get_hashs(value)
        for hash in hashs:
            self.redis.setbit(name, hash, 1)

    def is_exist(self, value):
        name = self.key + "_" + str(ord(value[0])%self.blocknum)
        hashs = self.get_hashs(value)
        exist = True
        for hash in hashs:
            exist = exist & self.redis.getbit(name, hash)
        return exist

    def get_hashs(self, value):
        hashs = list()
        for seed in self.seeds:
            hash = mmh3.hash(value, seed)
            if hash >= 0:
                hashs.append(hash)
            else:
                hashs.append(self.N - hash)
        return hashs


pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
conn = redis.StrictRedis(connection_pool=pool)

# 使用方法
# if __name__ == "__main__":
#     bf = PyBloomFilter(conn=conn)           # 利用連接配接池連接配接Redis
#     bf.add('www.jobbole.com')               # 向Redis預設的通道添加一個域名
#     bf.add('www.luyin.org')                 # 向Redis預設的通道添加一個域名
#     print(bf.is_exist('www.zhihu.com'))     # 列印此域名在通道裡是否存在,存在傳回1,不存在傳回0
#     print(bf.is_exist('www.luyin.org'))     # 列印此域名在通道裡是否存在,存在傳回1,不存在傳回0      

将py_bloomfilter.py(布隆過濾器)內建到scrapy-redis中的dupefilter.py去重器中,使其抓取過的URL不添加到下載下傳器,沒抓取過的URL添加到下載下傳器

第三百五十八節,Python分布式爬蟲打造搜尋引擎Scrapy精講—将bloomfilter(布隆過濾器)內建到scrapy-redis中

scrapy-redis中的dupefilter.py去重器修改

import logging
import time

from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint

from . import defaults
from .connection import get_redis_from_settings
from bloomfilter.py_bloomfilter import conn,PyBloomFilter   #導入布隆過濾器

logger = logging.getLogger(__name__)


# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.

    This class can also be used with default Scrapy's scheduler.

    """

    logger = logger

    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.

        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.

        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

        # 內建布隆過濾器
        self.bf = PyBloomFilter(conn=conn, key=key)     # 利用連接配接池連接配接Redis

    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.

        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.

        Parameters
        ----------
        settings : scrapy.settings.Settings

        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.


        """
        server = get_redis_from_settings(settings)
        # XXX: This creates one-time key. needed to support to use this
        # class as standalone dupefilter with scrapy's default scheduler
        # if scrapy passes spider on open() method this wouldn't be needed
        # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.

        Parameters
        ----------
        crawler : scrapy.crawler.Crawler

        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.

        """
        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """Returns True if request was already seen.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        bool

        """
        fp = self.request_fingerprint(request)

        # 內建布隆過濾器
        if self.bf.is_exist(fp):    # 判斷如果域名在Redis裡存在
            return True
        else:
            self.bf.add(fp)         # 如果不存在,将域名添加到Redis
            return False

        # This returns the number of values added, zero if already exists.
        # added = self.server.sadd(self.key, fp)
        # return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        str

        """
        return request_fingerprint(request)

    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.

        Parameters
        ----------
        reason : str, optional

        """
        self.clear()

    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    def log(self, request, spider):
        """Logs given request.

        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider

        """
        if self.debug:
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request)s"
                   " - no more duplicates will be shown"
                   " (see DUPEFILTER_DEBUG to show all duplicates)")
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False      

爬蟲檔案

#!/usr/bin/env python
# -*- coding:utf8 -*-

from scrapy_redis.spiders import RedisCrawlSpider    # 導入scrapy_redis裡的RedisCrawlSpider類
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule


class jobboleSpider(RedisCrawlSpider):               # 自定義爬蟲類,繼承RedisSpider類
    name = 'jobbole'                                 # 設定爬蟲名稱
    allowed_domains = ['www.luyin.org']              # 爬取域名
    redis_key = 'jobbole:start_urls'                 # 向redis設定一個名稱儲存url

    rules = (
        # 配置抓取清單頁規則
        # Rule(LinkExtractor(allow=('ggwa/.*')), follow=True),

        # 配置抓取内容頁規則
        Rule(LinkExtractor(allow=('.*')), callback='parse_job', follow=True),
    )


    def parse_job(self, response):  # 回調函數,注意:因為CrawlS模闆的源碼建立了parse回調函數,是以切記我們不能建立parse名稱的函數
        # 利用ItemLoader類,加載items容器類填充資料
        neir = response.css('title::text').extract()
        print(neir)      

啟動爬蟲 scrapy crawl jobbole

cd 到redis安裝目錄執行指令:redis-cli -h 127.0.0.1 -p 6379  連接配接redis用戶端

連接配接redis用戶端後執行指令:lpush jobbole:start_urls http://www.luyin.org  向redis添加一個爬蟲起始url

開始爬取

第三百五十八節,Python分布式爬蟲打造搜尋引擎Scrapy精講—将bloomfilter(布隆過濾器)內建到scrapy-redis中

redis狀态說明:

第三百五十八節,Python分布式爬蟲打造搜尋引擎Scrapy精講—将bloomfilter(布隆過濾器)內建到scrapy-redis中