天天看點

爬蟲分析之資料存儲——基于MySQL,Scrapy

上一篇-> 爬蟲練習之資料整理——基于Pandas 上上篇-> 爬蟲練習之資料清洗——基于Pandas

配置MySql

關于MySQL在Ubuntu的Pycharm上的配置,可以參考這篇文章中的第三部分

Mac安裝mysql及終端操作mysql與pycharm的資料庫可視化

如果上面的步驟處理完畢後找不到你建立的資料庫, 可以參照下圖配置

勾選要顯示的Schemas(資料庫集合)

資料存儲需要用到

pymysql

子產品, 在File->Settings中找到如圖的設定頁面,點選加号搜尋

pymysql

并安裝

如何存儲

在開始考慮如何存儲之前, 我們需要考慮一個問題, 資料存儲應該是什麼時候要做的事.

假設你已經了解過Scrapy架構, 下面是來自官網對item pipeline的典型應用

  • 清理資料
  • 驗證爬取的資料(檢查item包含某些字段)
  • 查重(并丢棄)
  • 将爬取結果儲存到資料庫中

另請參閱官方文檔>

Item Pipeline

我們要實作的資料存儲, 先來試一試能否成功吧

# 你可以參考以下代碼編寫自己的pipeline
import pymysql

class jobCrawlerPipeline(object):
    def process_item(self, item, spider):
        '''
        将爬取的資訊儲存到mysql
        :param item:
        :param spider:
        :return: item
        '''
        # Get data from item
        job_name = item['job_name']
        company = item['company']
        address = item['address']
        salary = item['salary']
        time = item['time']

        # Connecting with local database, change the value if not the same
        db = pymysql.connect(
            host='localhost',
            user='root',
            passwd='1320',
            db='scrapyDB',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)
        try:
            # open the cursor
            cursor = db.cursor()
            sql = 'INSERT INTO tb_job(job_name,company,address,salary,time)' \
                  'VALUES ("%s", "%s", "%s", "%s", "%s")' % (job_name,company,address,salary,time)
            # execute the sql
            cursor.execute(sql)
            db.commit()
        finally:
            # close the connection
            db.close()
        return item
           

爬蟲尚未結束, 但是通過終端, 我們知道該停下爬蟲了.

爬取中...

存儲在MySQL的資訊

重新回到爬蟲項目的思路

思考整個爬蟲項目的流程, 應該是這樣

抓取資訊->清理資訊->整理資訊->存儲資訊->分析資訊

資料整理

而上面的存儲資訊雖然已經成功了一部分,但是薪資資訊仍需要整理,更重要的是爬取的資訊中沒有明确的id, 如何在後續中加入topSalary, bottomSalary 等整理後才有的資訊與之對應呢?

重新審視Item Pipeline的典型應用, 我們能不能在Pipeline上實作整理,清理, 驗證或是丢棄呢?

分析item中的項目, 整理和驗證可能是最容易實作的部分

我們先把整理功能實作并驗證是否成功, 在

class jobCrawlerPipeline(object):

中添加下面這個方法.用于把爬取下來的工資資料進行整理,關于這個方法的實作,請參考前一篇

class jobCrawlerPipeline(object):

    def cut_word(self, word, method):
        if method == 'bottom':
            length = len(word)
            if (word.find('萬') == -1):
                if (word.find('以下') != -1):
                    # XX千以下
                    postion = word.find('以下')
                    bottomSalary = str(word[:(postion - 5)])
                elif (word.find('以上') != -1):
                    postion = word.find('以上')
                    bottomSalary = str(float(word[:postion - 5]))
                else:
                    # XX千/月
                    postion = word.find('-')
                    bottomSalary = str(float(word[:(postion)]))
            else:
                if (word.find('年') == -1):
                    if (word.find('以下') != -1):
                        # XX萬以下
                        postion = word.find('以下')
                        bottomSalary = str(float(word[:(postion - 5)]) * 10)
                    elif (word.find('以上') != -1):
                        # XX萬以上
                        postion = word.find('以上')
                        bottomSalary = str(float(word[:postion - 5]) * 10)
                    elif (word.find('+') != -1):
                        # XX萬+
                        postion = word.find('+')
                        bottomSalary = str(float(word[:(postion)]) * 10)
                    else:
                        # XX萬/月
                        postion = word.find('-')
                        bottomSalary = str(float(word[:(postion)]) * 10)

                else:
                    if (word.find('以下') != -1):
                        # XX萬以下/年
                        postion = word.find('以下')
                        bottomSalary = str(float(word[:(postion - 5)]) / 1.2)
                    elif (word.find('以上') != -1):
                        postion = word.find('以上')
                        bottomSalary = str(float(word[:postion - 5]) / 1.2)
                    elif (word.find('+') != -1):
                        # XX萬+
                        postion = word.find('+')
                        bottomSalary = str(float(word[:(postion)]) / 1.2)
                    else:
                        # XX萬/年
                        postion = word.find('-')
                        bottomSalary = word[:(postion)]
                        bottomSalary = str(float(bottomSalary) / 1.2)
            return bottomSalary

        if method == 'top':
            length = len(word)
            if (word.find('萬') == -1):
                if (word.find('以下') != -1):
                    # XX千以下
                    postion = word.find('以下')
                    topSalary = str(float(word[:(postion - 5)]))
                elif (word.find('以上') != -1):
                    postion = word.find('以上')
                    topSalary = str(float(word[:postion - 5]))
                else:
                    # XX千/月
                    postion = word.find('-')
                    topSalary = str(float(word[(postion + 1):(length - 11)]))
            else:
                if (word.find('年') == -1):
                    if (word.find('以下') != -1):
                        # XX萬以下
                        postion = word.find('以下')
                        topSalary = str(float(word[:(postion - 5)]) * 10)
                    elif (word.find('以上') != -1):
                        # XX萬以上
                        postion = word.find('以上')
                        topSalary = str(float(word[:postion - 5]) * 10)
                    else:
                        # XX萬/月
                        postion = word.find('-')
                        topSalary = str(float(word[(postion + 1):(length - 11)]) * 10)

                else:
                    if (word.find('以下') != -1):
                        # XX萬以下/年
                        postion = word.find('以下')
                        topSalary = str(float(word[:(postion - 5)]) / 1.2)
                    elif (word.find('以上') != -1):
                        # XX萬以上一年
                        postion = word.find('以上')
                        topSalary = str(float(word[:postion - 5]) / 1.2)
                    elif (word.find('+') != -1):
                        # XX萬+
                        postion = word.find('+')
                        topSalary = str(float(word[:(postion)]) / 1.2)
                    else:
                        # XX萬/年
                        postion = word.find('-')
                        topSalary = word[(postion + 1):(length - 11)]
                        topSalary = str(int(topSalary) / 1.2)
            return topSalary
           

如果你看了上面的代碼, 你可能發現與前一篇有些許不同, 最主要的差别就是字元串數組切片的位置發生了改變.

為什麼要改呢?

因為這是Python的編碼坑啊

通過觀察終端的輸出,可以看到爬下來尚未存儲的資料是以unicode的形式存在,這個時候是5個位元組一個中文

是以看到下面截圖中的salary,可以判斷要得到薪資的底薪和頂薪,需要剔除掉11個位元組

爬取資料中

資料清洗

至此,資料的基本處理已經合并到Pipeline中,鑒于可能還有髒資料在item中,我們在Pipeline的

process_item

方法中加入相應的代碼

這段代碼應當加在處理資料之前,減少一些系統開銷

# Get data from item
        job_name = item['job_name']
        salary = item['salary']

        dirty_job_name = re.compile(r'(\*|在家|試用|體驗|無需|無須|試玩|紅包)+')
        dirty_salary = re.compile(r'(小時|天)+')

        # clean dirty data
        if(dirty_job_name.search(str(job_name))):
            raise DropItem("Dirty data %s" % item)
        if(dirty_salary.search(str(salary))):
            raise DropItem("Dirty data %s" % item)
        if(salary == None):
            raise DropItem("Dirty data %s" % item)
           

資料存儲

把清洗并整理完畢的資料進行資料存儲

建立資料庫的相關MySql語句是

CREATE DATABASE IF NOT EXISTS scrapyDB DEFAULT CHARACTER SET utf8;

CREATE TABLE IF NOT EXISTS `tb_job`(
  `job_id` bigint NOT NULL AUTO_INCREMENT,
  `job_name` varchar(50) NOT NULL,
  `company` varchar(50) NOT NULL,
  `address` varchar(50) NOT NULL,
  `bottom_salary` varchar(10) NOT NULL,
  `top_salary` varchar(10) NOT NULL,
  `salary` varchar(15) NOT NULL,
  `time` varchar(10) NOT NULL,
  PRIMARY KEY (`job_id`),
  UNIQUE KEY `unique_info`(`job_name`, `company`, `address`)
  );
           

這裡實作的思路不止一種

Solution 1 在

process_item

中直接将處理完的item儲存到資料庫中

實際測試的時候發現儲存下來的資料除了

job_name

字段外, 其他中文字段全部變成Unicode碼, 原因不明. 大家如果成功用這種方法實作了, 不妨在留言區告知一下, 畢竟第二種方法多了檔案IO的開銷, 耗時會比較大

Solution 2 在爬取結束之後再進行資料庫寫入操作

爬取結束後, 用pandas子產品的csv讀取函數打開爬取完畢的csv檔案, 寫入資料庫

Attention!

以上兩種方法的

commit()

建議在全部插入後一次commit完成

必須在

close_spider

方法中關閉資料庫

若使用第一種方法, 建議在

open_spider

中實作資料庫初始化工作, 而不是每執行一次

process_item

進行一次打開關閉資料庫

寫入資料庫

參考代碼

# Function1
def open_spider(self, spider):
        self.conn = pymysql.connect(
            host='localhost',
            user='root',
            passwd='mysql',
            db='scrapyDB',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)

def close_spider(self, spider):
        try:
            # open the cursor
            self.cursor = self.conn.cursor()

            # get data from csv file
            # reload data
            f = open(r'job.csv', 'r')
            f.close()
            job_info = pandas.read_csv(r'job.csv', iterator=True,chunksize=1,
                                       header=None,names=
                                       ['job_name','company','address','bottom_salary','top_salary','salary','time'])

            # store data
            for i, job in enumerate (job_info):
                # use -1 or ' ' to fill NAN
                job = job.fillna({'job_name':'','company':'','address':'','time':''})
                job = job.fillna(-1)
                # transform series to list type
                job = job.values[0]

                sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
                      'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (
                      job[6], job[2], job[3], job[1], job[5], job[0], job[6])
                self.cursor.execute(sql)
            self.conn.commit()

        finally:
            # close the connection
            self.conn.close()

# Function2 
# 未将打開關閉資料庫拆分出來, 請自行修改
db = pymysql.connect(
            host='localhost',
            user='root',
            passwd='mysql',
            db='scrapyDB',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)
        try:
            # open the cursor
            cursor = db.cursor()
            sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
                  'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (job_name,item['company'],item['address'],item['bottomSalary'],item['topSalary'],item['salary'],item['time'])
            # execute the sql
            cursor.execute(sql)
            db.commit()
        finally:
            # close the connection
            db.close()
           

最終的資料庫代碼中, 暫時删除了

unique_info

索引, 原因是目前隻需要尚不需要進行增量爬取. 使用

unique_info

索引後, 如果遇到重複的資料将直接

RollBack

, 而我們是在最後才一次性

commit

的, 這樣肯定不行

就需要增加開銷去每插入一條資料送出一次

後續将對這個問題進行處理, 敬請期待