上一篇-> 爬蟲練習之資料整理——基于Pandas 上上篇-> 爬蟲練習之資料清洗——基于Pandas
配置MySql
關于MySQL在Ubuntu的Pycharm上的配置,可以參考這篇文章中的第三部分
Mac安裝mysql及終端操作mysql與pycharm的資料庫可視化
如果上面的步驟處理完畢後找不到你建立的資料庫, 可以參照下圖配置
勾選要顯示的Schemas(資料庫集合)
資料存儲需要用到
pymysql
子產品, 在File->Settings中找到如圖的設定頁面,點選加号搜尋
pymysql
并安裝
如何存儲
在開始考慮如何存儲之前, 我們需要考慮一個問題, 資料存儲應該是什麼時候要做的事.
假設你已經了解過Scrapy架構, 下面是來自官網對item pipeline的典型應用
- 清理資料
- 驗證爬取的資料(檢查item包含某些字段)
- 查重(并丢棄)
- 将爬取結果儲存到資料庫中
另請參閱官方文檔>
Item Pipeline 我們要實作的資料存儲, 先來試一試能否成功吧
# 你可以參考以下代碼編寫自己的pipeline
import pymysql
class jobCrawlerPipeline(object):
def process_item(self, item, spider):
'''
将爬取的資訊儲存到mysql
:param item:
:param spider:
:return: item
'''
# Get data from item
job_name = item['job_name']
company = item['company']
address = item['address']
salary = item['salary']
time = item['time']
# Connecting with local database, change the value if not the same
db = pymysql.connect(
host='localhost',
user='root',
passwd='1320',
db='scrapyDB',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
try:
# open the cursor
cursor = db.cursor()
sql = 'INSERT INTO tb_job(job_name,company,address,salary,time)' \
'VALUES ("%s", "%s", "%s", "%s", "%s")' % (job_name,company,address,salary,time)
# execute the sql
cursor.execute(sql)
db.commit()
finally:
# close the connection
db.close()
return item
爬蟲尚未結束, 但是通過終端, 我們知道該停下爬蟲了.
爬取中...
存儲在MySQL的資訊
重新回到爬蟲項目的思路
思考整個爬蟲項目的流程, 應該是這樣
抓取資訊->清理資訊->整理資訊->存儲資訊->分析資訊
資料整理
而上面的存儲資訊雖然已經成功了一部分,但是薪資資訊仍需要整理,更重要的是爬取的資訊中沒有明确的id, 如何在後續中加入topSalary, bottomSalary 等整理後才有的資訊與之對應呢?
重新審視Item Pipeline的典型應用, 我們能不能在Pipeline上實作整理,清理, 驗證或是丢棄呢?
分析item中的項目, 整理和驗證可能是最容易實作的部分
我們先把整理功能實作并驗證是否成功, 在
class jobCrawlerPipeline(object):
中添加下面這個方法.用于把爬取下來的工資資料進行整理,關于這個方法的實作,請參考前一篇
class jobCrawlerPipeline(object):
def cut_word(self, word, method):
if method == 'bottom':
length = len(word)
if (word.find('萬') == -1):
if (word.find('以下') != -1):
# XX千以下
postion = word.find('以下')
bottomSalary = str(word[:(postion - 5)])
elif (word.find('以上') != -1):
postion = word.find('以上')
bottomSalary = str(float(word[:postion - 5]))
else:
# XX千/月
postion = word.find('-')
bottomSalary = str(float(word[:(postion)]))
else:
if (word.find('年') == -1):
if (word.find('以下') != -1):
# XX萬以下
postion = word.find('以下')
bottomSalary = str(float(word[:(postion - 5)]) * 10)
elif (word.find('以上') != -1):
# XX萬以上
postion = word.find('以上')
bottomSalary = str(float(word[:postion - 5]) * 10)
elif (word.find('+') != -1):
# XX萬+
postion = word.find('+')
bottomSalary = str(float(word[:(postion)]) * 10)
else:
# XX萬/月
postion = word.find('-')
bottomSalary = str(float(word[:(postion)]) * 10)
else:
if (word.find('以下') != -1):
# XX萬以下/年
postion = word.find('以下')
bottomSalary = str(float(word[:(postion - 5)]) / 1.2)
elif (word.find('以上') != -1):
postion = word.find('以上')
bottomSalary = str(float(word[:postion - 5]) / 1.2)
elif (word.find('+') != -1):
# XX萬+
postion = word.find('+')
bottomSalary = str(float(word[:(postion)]) / 1.2)
else:
# XX萬/年
postion = word.find('-')
bottomSalary = word[:(postion)]
bottomSalary = str(float(bottomSalary) / 1.2)
return bottomSalary
if method == 'top':
length = len(word)
if (word.find('萬') == -1):
if (word.find('以下') != -1):
# XX千以下
postion = word.find('以下')
topSalary = str(float(word[:(postion - 5)]))
elif (word.find('以上') != -1):
postion = word.find('以上')
topSalary = str(float(word[:postion - 5]))
else:
# XX千/月
postion = word.find('-')
topSalary = str(float(word[(postion + 1):(length - 11)]))
else:
if (word.find('年') == -1):
if (word.find('以下') != -1):
# XX萬以下
postion = word.find('以下')
topSalary = str(float(word[:(postion - 5)]) * 10)
elif (word.find('以上') != -1):
# XX萬以上
postion = word.find('以上')
topSalary = str(float(word[:postion - 5]) * 10)
else:
# XX萬/月
postion = word.find('-')
topSalary = str(float(word[(postion + 1):(length - 11)]) * 10)
else:
if (word.find('以下') != -1):
# XX萬以下/年
postion = word.find('以下')
topSalary = str(float(word[:(postion - 5)]) / 1.2)
elif (word.find('以上') != -1):
# XX萬以上一年
postion = word.find('以上')
topSalary = str(float(word[:postion - 5]) / 1.2)
elif (word.find('+') != -1):
# XX萬+
postion = word.find('+')
topSalary = str(float(word[:(postion)]) / 1.2)
else:
# XX萬/年
postion = word.find('-')
topSalary = word[(postion + 1):(length - 11)]
topSalary = str(int(topSalary) / 1.2)
return topSalary
如果你看了上面的代碼, 你可能發現與前一篇有些許不同, 最主要的差别就是字元串數組切片的位置發生了改變.
為什麼要改呢?
因為這是Python的編碼坑啊
通過觀察終端的輸出,可以看到爬下來尚未存儲的資料是以unicode的形式存在,這個時候是5個位元組一個中文
是以看到下面截圖中的salary,可以判斷要得到薪資的底薪和頂薪,需要剔除掉11個位元組
爬取資料中
資料清洗
至此,資料的基本處理已經合并到Pipeline中,鑒于可能還有髒資料在item中,我們在Pipeline的
process_item
方法中加入相應的代碼
這段代碼應當加在處理資料之前,減少一些系統開銷
# Get data from item
job_name = item['job_name']
salary = item['salary']
dirty_job_name = re.compile(r'(\*|在家|試用|體驗|無需|無須|試玩|紅包)+')
dirty_salary = re.compile(r'(小時|天)+')
# clean dirty data
if(dirty_job_name.search(str(job_name))):
raise DropItem("Dirty data %s" % item)
if(dirty_salary.search(str(salary))):
raise DropItem("Dirty data %s" % item)
if(salary == None):
raise DropItem("Dirty data %s" % item)
資料存儲
把清洗并整理完畢的資料進行資料存儲
建立資料庫的相關MySql語句是
CREATE DATABASE IF NOT EXISTS scrapyDB DEFAULT CHARACTER SET utf8;
CREATE TABLE IF NOT EXISTS `tb_job`(
`job_id` bigint NOT NULL AUTO_INCREMENT,
`job_name` varchar(50) NOT NULL,
`company` varchar(50) NOT NULL,
`address` varchar(50) NOT NULL,
`bottom_salary` varchar(10) NOT NULL,
`top_salary` varchar(10) NOT NULL,
`salary` varchar(15) NOT NULL,
`time` varchar(10) NOT NULL,
PRIMARY KEY (`job_id`),
UNIQUE KEY `unique_info`(`job_name`, `company`, `address`)
);
這裡實作的思路不止一種
Solution 1 在 process_item
中直接将處理完的item儲存到資料庫中
process_item
實際測試的時候發現儲存下來的資料除了
job_name
字段外, 其他中文字段全部變成Unicode碼, 原因不明. 大家如果成功用這種方法實作了, 不妨在留言區告知一下, 畢竟第二種方法多了檔案IO的開銷, 耗時會比較大
Solution 2 在爬取結束之後再進行資料庫寫入操作
爬取結束後, 用pandas子產品的csv讀取函數打開爬取完畢的csv檔案, 寫入資料庫
Attention!
以上兩種方法的
commit()
建議在全部插入後一次commit完成
必須在
close_spider
方法中關閉資料庫
若使用第一種方法, 建議在
open_spider
中實作資料庫初始化工作, 而不是每執行一次
process_item
進行一次打開關閉資料庫
寫入資料庫
參考代碼
# Function1
def open_spider(self, spider):
self.conn = pymysql.connect(
host='localhost',
user='root',
passwd='mysql',
db='scrapyDB',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
def close_spider(self, spider):
try:
# open the cursor
self.cursor = self.conn.cursor()
# get data from csv file
# reload data
f = open(r'job.csv', 'r')
f.close()
job_info = pandas.read_csv(r'job.csv', iterator=True,chunksize=1,
header=None,names=
['job_name','company','address','bottom_salary','top_salary','salary','time'])
# store data
for i, job in enumerate (job_info):
# use -1 or ' ' to fill NAN
job = job.fillna({'job_name':'','company':'','address':'','time':''})
job = job.fillna(-1)
# transform series to list type
job = job.values[0]
sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (
job[6], job[2], job[3], job[1], job[5], job[0], job[6])
self.cursor.execute(sql)
self.conn.commit()
finally:
# close the connection
self.conn.close()
# Function2
# 未将打開關閉資料庫拆分出來, 請自行修改
db = pymysql.connect(
host='localhost',
user='root',
passwd='mysql',
db='scrapyDB',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
try:
# open the cursor
cursor = db.cursor()
sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (job_name,item['company'],item['address'],item['bottomSalary'],item['topSalary'],item['salary'],item['time'])
# execute the sql
cursor.execute(sql)
db.commit()
finally:
# close the connection
db.close()
最終的資料庫代碼中, 暫時删除了
unique_info
索引, 原因是目前隻需要尚不需要進行增量爬取. 使用
unique_info
索引後, 如果遇到重複的資料将直接
RollBack
, 而我們是在最後才一次性
commit
的, 這樣肯定不行
就需要增加開銷去每插入一條資料送出一次
後續将對這個問題進行處理, 敬請期待