天天看點

利用爬蟲技術采集國外肺炎疫情資料

前言:随着國内肺炎疫情的逐漸好轉,國外的疫情卻越來越嚴重,其中原因,相必大家都心領神會,想到這裡,我打算采用自身所學的技術采集下最新的國外資料,掌握最新的動态,希望能有一天也能看到不再增長的疫情傳播。

前期準備:作為爬蟲,我們首先就要選擇一個合适的目标網站,這裡我們選擇的是丁香園的資料,如下圖所示

找到目标網站後,我們需要對網站提供的資料來源進行分析,找到其真實的資料請求,我們打開浏覽器的F12,看下network裡的請求,從上到下依次分析,當我們宣召到如下圖所示的請求中,可以發現他的響應中似乎包含了我們想要的資料,當然這隻是不确定的猜測,我們可以繼續往下找,如果下面沒有更符合的,那麼說明這個就極有可能是我們想要的;

确定是這個請求後,我們需要分析這個響應的内容

通過分析我們發現他的資料是以腳本的方式存在于代碼裡的,那麼我們豈不是可以直接通過正則就可以得到這些資料了麼,是以我先進行了如下測試

res = requests.get(self.url)
res.encoding = 'utf - 8'
pat0 = re.compile('window.getListByCountryTypeService2true = ([\s\S]*?)</script>')
data_list = pat0.findall(res.text)
data = data_list[0].replace('}catch(e){}', '')
true = True
false = False
data = eval(data)
           

這個地方的self.url就是上面我們說到的請求,這段代碼的大緻意思是我們通過requests庫發起這個請求,解析響應的資料,通過正規表達式擷取

window.getListByCountryTypeService2true

這一段的代碼,其中就包含了我們需要的資料,然後過濾不需要的一些字元串。

擷取到資料後,我們可以觀察下,它的資料是JSON數組的格式,其實到這裡我們就已經拿到我們想要的了

這裡我們可以看到他這個裡面statisticsData字段,這個資料我們通過分析得知這是每個國家的一些詳情資料,這也是一個請求,直接用requests請求便可以擷取到其中的資料,類似下圖的代碼

def getStatisticsData(self,url,province_name):
        reponse = requests.get(url)
        json_data = reponse.json()
        data = json_data['data']
        for item in data:
            dateId = item['dateId']
            item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
            item['province_name'] = province_name
            self.save_incr_mysql(item)
        # 儲存所有資料至json檔案
        update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
        self.save_incr_json(data,province_name,update_time)
           

下面一步就是将資料儲存到資料庫,核心代碼如下

# 儲存資料增長趨勢到資料庫
    def save_incr_mysql(self,item):
        # 
        query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
        values = [item['modify_time'],item['province_name']]
        self.cursor.execute(query_sql,values)
        data = self.cursor.fetchone()
        if(data['count'] == 0):
            sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr\
            ,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
            ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
            values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
            ,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]      
            self.cursor.execute(sql,values)
            self.conn.commit()

    # 儲存最新的資料到資料庫
    def save_last_mysql(self,item):
        # 更新所有曆史資料的is_new字段
        update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
        values= [item['province_name']]
        self.cursor.execute(update_sql,values)
        self.conn.commit()
        sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count\
        ,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
        ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
        values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
        ,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]      
        self.cursor.execute(sql,values)
        self.conn.commit()
           

無非就是隻儲存最新的資料到資料庫而已

完整的爬蟲代碼如下:

import re
import time
import json
import datetime
import requests
import pymysql
import pandas as pd
import os

"""
采集丁香園國外的疫情資料
"""
class VirusSupervise(object):
    def __init__(self):
        self.url = 'https://3g.dxy.cn/newh5/view/pneumonia?scene=2&amp;clicktime=1579582238&amp;enterid=1579582238&amp;from=timeline&amp;isappinstalled=0'
        self.all_data = list()
        host_ip = "127.0.0.1"  # 你的mysql伺服器位址
        host_user = "xxxxx" #你的資料庫使用者名
        password = "xxxx"  # 你的mysql密碼
        db = 'feiyanyiqing'
        port = 3306
        charset= 'utf8'
        self.conn = pymysql.connect(host=host_ip, port=port, user=host_user, passwd=password, db=db, charset=charset)
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

    def request_page(self):
        """
        請求頁面資料
        """
        res = requests.get(self.url)
        res.encoding = 'utf - 8'
        pat0 = re.compile('window.getListByCountryTypeService2true = ([\s\S]*?)</script>')
        data_list = pat0.findall(res.text)
        data = data_list[0].replace('}catch(e){}', '')
        true = True
        false = False
        data = eval(data)
        return data



    def getStatisticsData(self,url,province_name):
        reponse = requests.get(url)
        json_data = reponse.json()
        data = json_data['data']
        for item in data:
            dateId = item['dateId']
            item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
            item['province_name'] = province_name
            self.save_incr_mysql(item)
        # 儲存所有資料至json檔案
        update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
        self.save_incr_json(data,province_name,update_time)

    def filtration_data(self):
        """
        過濾資料
        """
        data = self.request_page()
        print(data)
        result = []
        for item in data:
            # 省份/國家
            provinceName = item['provinceName']
            provinceId = item['provinceId']
            # 國家/州
            continents = item['continents']
            # 目前确診人數
            currentConfirmedCount = item['currentConfirmedCount']
            # 确診總人數
            confirmedCount = item['confirmedCount']
            # 治愈人數
            curedCount = item['curedCount']
            # 死亡人數
            deadCount = item['deadCount']
            # 疑似病例
            suspectedCount = item['suspectedCount']
            # 城市類型
            countryType = item['countryType']
            # 更新時間
            if('中國' == provinceName):
                modifyTime = datetime.datetime.now()
            else:
                modifyTime = item['modifyTime']
                modifyTime = float(modifyTime/1000)
                timeArray = time.localtime(modifyTime)
                modifyTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
            info = {'province_name':provinceName,'province_id':provinceId,'continents':continents
            ,'current_confirmed_count':currentConfirmedCount,'confirmed_count':confirmedCount,'is_new':1
            ,'cured_count':curedCount,'dead_count':deadCount,'suspected_count':suspectedCount,'country_type':countryType,'modify_time':modifyTime}
            self.save_last_mysql(info)
            result.append(info)
            # 靜态資料 每日各項資料的變化
            if(hasattr(item, 'statisticsData')):
                self.getStatisticsData(item['statisticsData'],provinceName)

    def save_incr_json(self,info,province_name,update_time):
        file_dir = os.path.abspath(os.path.join(os.getcwd(), "jsonData"))
        filename= file_dir+'/'+update_time
        if(not os.path.exists(filename)):
            os.makedirs(filename)
        filename = filename+'/'+province_name+'.json'
        with open(filename,'w',encoding = 'utf-8') as file_obj:
            json.dump(info,file_obj,ensure_ascii=False)


    # 儲存資料增長趨勢到資料庫
    def save_incr_mysql(self,item):
        # 
        query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
        values = [item['modify_time'],item['province_name']]
        self.cursor.execute(query_sql,values)
        data = self.cursor.fetchone()
        if(data['count'] == 0):
            sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr\
            ,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
            ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
            values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
            ,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]      
            self.cursor.execute(sql,values)
            self.conn.commit()

    # 儲存最新的資料到資料庫
    def save_last_mysql(self,item):
        # 更新所有曆史資料的is_new字段
        update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
        values= [item['province_name']]
        self.cursor.execute(update_sql,values)
        self.conn.commit()
        sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count\
        ,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
        ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
        values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
        ,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]      
        self.cursor.execute(sql,values)
        self.conn.commit()


if __name__ == '__main__':
    sup = VirusSupervise()
    sup.filtration_data()
           

做好這一步我們的采集工作已經完成了,美中不足的是每次都要我主動去運作,很不友善,是以我編寫了一個定時器去每天定時采集資料,代碼如下

import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from feiyan_spider import VirusSupervise

# 錯誤監控
def my_listener(event):
    if event.exception:
        print ('任務出錯了!!!!!!')
    else:
        print ('任務照常運作...')

def feiyan_spider():
    print("開始采集:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
    vs = VirusSupervise()
    vs.filtration_data()

# 任務
def start():
    print('建立任務')
    #建立排程器:BlockingScheduler
    scheduler = BlockingScheduler()
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    # 添加爬取的定時任務:每天的早上7點執行
    scheduler.add_job(feiyan_spider, 'cron', hour=8, minute=10)
    scheduler.start()


if __name__ == "__main__":
    start()
           

至此,全部工作就算完成了,其實是很簡單的,無非就是要多花點心思去研究目标網站,下一篇文章将介紹我利用這些資料開發的基于python的flask架構的肺炎疫情分析系統。

本文首發于https://www.bizhibihui.com/blog/article/29