前言:随着國内肺炎疫情的逐漸好轉,國外的疫情卻越來越嚴重,其中原因,相必大家都心領神會,想到這裡,我打算采用自身所學的技術采集下最新的國外資料,掌握最新的動态,希望能有一天也能看到不再增長的疫情傳播。
前期準備:作為爬蟲,我們首先就要選擇一個合适的目标網站,這裡我們選擇的是丁香園的資料,如下圖所示
找到目标網站後,我們需要對網站提供的資料來源進行分析,找到其真實的資料請求,我們打開浏覽器的F12,看下network裡的請求,從上到下依次分析,當我們宣召到如下圖所示的請求中,可以發現他的響應中似乎包含了我們想要的資料,當然這隻是不确定的猜測,我們可以繼續往下找,如果下面沒有更符合的,那麼說明這個就極有可能是我們想要的;
确定是這個請求後,我們需要分析這個響應的内容
通過分析我們發現他的資料是以腳本的方式存在于代碼裡的,那麼我們豈不是可以直接通過正則就可以得到這些資料了麼,是以我先進行了如下測試
res = requests.get(self.url)
res.encoding = 'utf - 8'
pat0 = re.compile('window.getListByCountryTypeService2true = ([\s\S]*?)</script>')
data_list = pat0.findall(res.text)
data = data_list[0].replace('}catch(e){}', '')
true = True
false = False
data = eval(data)
這個地方的self.url就是上面我們說到的請求,這段代碼的大緻意思是我們通過requests庫發起這個請求,解析響應的資料,通過正規表達式擷取
window.getListByCountryTypeService2true
這一段的代碼,其中就包含了我們需要的資料,然後過濾不需要的一些字元串。
擷取到資料後,我們可以觀察下,它的資料是JSON數組的格式,其實到這裡我們就已經拿到我們想要的了
這裡我們可以看到他這個裡面statisticsData字段,這個資料我們通過分析得知這是每個國家的一些詳情資料,這也是一個請求,直接用requests請求便可以擷取到其中的資料,類似下圖的代碼
def getStatisticsData(self,url,province_name):
reponse = requests.get(url)
json_data = reponse.json()
data = json_data['data']
for item in data:
dateId = item['dateId']
item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
item['province_name'] = province_name
self.save_incr_mysql(item)
# 儲存所有資料至json檔案
update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
self.save_incr_json(data,province_name,update_time)
下面一步就是将資料儲存到資料庫,核心代碼如下
# 儲存資料增長趨勢到資料庫
def save_incr_mysql(self,item):
#
query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
values = [item['modify_time'],item['province_name']]
self.cursor.execute(query_sql,values)
data = self.cursor.fetchone()
if(data['count'] == 0):
sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr\
,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]
self.cursor.execute(sql,values)
self.conn.commit()
# 儲存最新的資料到資料庫
def save_last_mysql(self,item):
# 更新所有曆史資料的is_new字段
update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
values= [item['province_name']]
self.cursor.execute(update_sql,values)
self.conn.commit()
sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count\
,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]
self.cursor.execute(sql,values)
self.conn.commit()
無非就是隻儲存最新的資料到資料庫而已
完整的爬蟲代碼如下:
import re
import time
import json
import datetime
import requests
import pymysql
import pandas as pd
import os
"""
采集丁香園國外的疫情資料
"""
class VirusSupervise(object):
def __init__(self):
self.url = 'https://3g.dxy.cn/newh5/view/pneumonia?scene=2&clicktime=1579582238&enterid=1579582238&from=timeline&isappinstalled=0'
self.all_data = list()
host_ip = "127.0.0.1" # 你的mysql伺服器位址
host_user = "xxxxx" #你的資料庫使用者名
password = "xxxx" # 你的mysql密碼
db = 'feiyanyiqing'
port = 3306
charset= 'utf8'
self.conn = pymysql.connect(host=host_ip, port=port, user=host_user, passwd=password, db=db, charset=charset)
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
def request_page(self):
"""
請求頁面資料
"""
res = requests.get(self.url)
res.encoding = 'utf - 8'
pat0 = re.compile('window.getListByCountryTypeService2true = ([\s\S]*?)</script>')
data_list = pat0.findall(res.text)
data = data_list[0].replace('}catch(e){}', '')
true = True
false = False
data = eval(data)
return data
def getStatisticsData(self,url,province_name):
reponse = requests.get(url)
json_data = reponse.json()
data = json_data['data']
for item in data:
dateId = item['dateId']
item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
item['province_name'] = province_name
self.save_incr_mysql(item)
# 儲存所有資料至json檔案
update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
self.save_incr_json(data,province_name,update_time)
def filtration_data(self):
"""
過濾資料
"""
data = self.request_page()
print(data)
result = []
for item in data:
# 省份/國家
provinceName = item['provinceName']
provinceId = item['provinceId']
# 國家/州
continents = item['continents']
# 目前确診人數
currentConfirmedCount = item['currentConfirmedCount']
# 确診總人數
confirmedCount = item['confirmedCount']
# 治愈人數
curedCount = item['curedCount']
# 死亡人數
deadCount = item['deadCount']
# 疑似病例
suspectedCount = item['suspectedCount']
# 城市類型
countryType = item['countryType']
# 更新時間
if('中國' == provinceName):
modifyTime = datetime.datetime.now()
else:
modifyTime = item['modifyTime']
modifyTime = float(modifyTime/1000)
timeArray = time.localtime(modifyTime)
modifyTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
info = {'province_name':provinceName,'province_id':provinceId,'continents':continents
,'current_confirmed_count':currentConfirmedCount,'confirmed_count':confirmedCount,'is_new':1
,'cured_count':curedCount,'dead_count':deadCount,'suspected_count':suspectedCount,'country_type':countryType,'modify_time':modifyTime}
self.save_last_mysql(info)
result.append(info)
# 靜态資料 每日各項資料的變化
if(hasattr(item, 'statisticsData')):
self.getStatisticsData(item['statisticsData'],provinceName)
def save_incr_json(self,info,province_name,update_time):
file_dir = os.path.abspath(os.path.join(os.getcwd(), "jsonData"))
filename= file_dir+'/'+update_time
if(not os.path.exists(filename)):
os.makedirs(filename)
filename = filename+'/'+province_name+'.json'
with open(filename,'w',encoding = 'utf-8') as file_obj:
json.dump(info,file_obj,ensure_ascii=False)
# 儲存資料增長趨勢到資料庫
def save_incr_mysql(self,item):
#
query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
values = [item['modify_time'],item['province_name']]
self.cursor.execute(query_sql,values)
data = self.cursor.fetchone()
if(data['count'] == 0):
sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr\
,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]
self.cursor.execute(sql,values)
self.conn.commit()
# 儲存最新的資料到資料庫
def save_last_mysql(self,item):
# 更新所有曆史資料的is_new字段
update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
values= [item['province_name']]
self.cursor.execute(update_sql,values)
self.conn.commit()
sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count\
,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]
self.cursor.execute(sql,values)
self.conn.commit()
if __name__ == '__main__':
sup = VirusSupervise()
sup.filtration_data()
做好這一步我們的采集工作已經完成了,美中不足的是每次都要我主動去運作,很不友善,是以我編寫了一個定時器去每天定時采集資料,代碼如下
import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from feiyan_spider import VirusSupervise
# 錯誤監控
def my_listener(event):
if event.exception:
print ('任務出錯了!!!!!!')
else:
print ('任務照常運作...')
def feiyan_spider():
print("開始采集:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
vs = VirusSupervise()
vs.filtration_data()
# 任務
def start():
print('建立任務')
#建立排程器:BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 添加爬取的定時任務:每天的早上7點執行
scheduler.add_job(feiyan_spider, 'cron', hour=8, minute=10)
scheduler.start()
if __name__ == "__main__":
start()
至此,全部工作就算完成了,其實是很簡單的,無非就是要多花點心思去研究目标網站,下一篇文章将介紹我利用這些資料開發的基于python的flask架構的肺炎疫情分析系統。
本文首發于https://www.bizhibihui.com/blog/article/29