天天看點

Python漫畫爬蟲開源 66漫畫 AJAX,包含資料庫連接配接,圖檔下載下傳處理

小白爬蟲,大神繞道

軟體:mysql8 python3.8

首先觀察漫畫網站結構

2020/2/15 VictorGanro原創

僅學習使用

目标網站連結: http://6mh6.com/(我比較喜歡的漫畫網站,超多付費漫畫,白嫖的我笑了)

本來已漫畫網站的分類來作為漫畫索引

觀察 點我檢視代碼

發現Ajax采用Post的方法,這沒什麼大不了的。但是其API之請求傳回100個漫畫資料會無資料。是以放棄這種索引方案

抱着一絲沮喪,點開多本漫畫進行觀察 例如http://6mh6.com/16041/

可以發現漫畫網址結構為 http://6mh6.com/(漫畫id)

這樣測試和發現有上萬本漫畫

然後就是漫畫資訊爬取,光請求HTML可以發現僅僅隻要小部分章節的資料,還是要Ajax請求

繼續觀察JS代碼,代碼如下:`

$(".gengduo_dt1").one(‘click’,function(){

$(".gengduo_dt1 a").text(’…’);

var id = $(".gengduo_dt1").data(“id”);

var vid = $(".gengduo_dt1").data(“vid”);

$.ajax({

type: “POST”,

url: “/bookchapter/”,

data: { “id”: id, “id2”: vid},

dataType: ‘json’,

success: function (res) {

for (var a = 0; a < res.length; a++){

var mh_id=res[a].chapterid;

var mh_name=res[a].chaptername;

var b_sn=’

  • ’+mh_name+’
  • ’;

    $("#chapter-list1").append(b_sn);

    }

    $(".gengduo_dt1").css(‘display’,‘none’);

    }

    });

    });

    $(".gengduo_dt2").one(‘click’,function(){

    $(".gengduo_dt2 a").text(‘姝e湪鍔犺澆…’);

    var id = $(".gengduo_dt2").data(“id”);

    var vid = $(".gengduo_dt2").data(“vid”);

    $.ajax({

    type: “POST”,

    url: “/bookchapter/”,

    data: { “id”: id, “id2”: vid},

    dataType: ‘json’,

    success: function (res) {

    for (var a = 0; a < res.length; a++){

    var mh_id=res[a].chapterid;

    var mh_name=res[a].chaptername;

    var b_sn=’

  • ’+mh_name+’
  • ’;

    $("#chapter-list2").append(b_sn);

    }

    $(".gengduo_dt2").css(‘display’,‘none’);

    }

    });

    });

    `

    gengduo_dt1 為第一個資料點的章節CLASS名

    Post結構為可以從代碼中可看出

    data: { “id”: id, “id2”: vid},

    id ->id

    id2->vid

    那麼這2個資料從哪裡可看出,其實挺簡單的

    var id = $(".gengduo_dt2").data(“id”);

    var vid = $(".gengduo_dt2").data(“vid”);

    就再标簽中或者直接分析 vid為站點的編号(現 1 或者2)然後id就是漫畫編号

    但是你會發現章節是從大到小

    對于Python玩家隻是一個數組倒反

    觀察閱讀頁面,是下拉式,NETWORK沒反應,HTML請求沒代碼,JS混淆到看不懂,但是控制台是個好東西

    觀察代碼如

    點我看代碼

    它是一個全局變量擷取的,名為 newImgs的變量

    通過Console可以看出

    Python漫畫爬蟲開源 66漫畫 AJAX,包含資料庫連接配接,圖檔下載下傳處理

    這個方法是正确的,那麼可以使用 selenium來解決

    分析完畢開始寫代碼

    以下代碼均為原創封裝,喜歡就複制吧!!!

    首先我喜歡讀取INI檔案來寫爬蟲參數,然後封裝,代碼如下

    import configparser
    import chardet
    class ini_reader:
        def __init__(self,ini_name): #打開檔案并讀取分析
            def bianma(ini_de_name):
                f3 = open(file=ini_de_name,mode='rb')
                data = f3.read()
                f3.close()
                result = chardet.detect(data)
                bianmas = result['encoding']
                return bianmas
            self.ini_name = ini_name
            config = configparser.ConfigParser()
            self.ini_dictionary = config
            ini_bianma = bianma(ini_name)
            config.read(self.ini_name,encoding=ini_bianma)
    
        def ini_Classof_AllFatherPrintAndReturn(self): #列印并傳回所有的父級元素
            father_result = self.ini_dictionary.sections()
            print(father_result)
            return father_result
    
    
        def ini_ClassOf_AllSonPrintAndReturn(self,Father_class_name): #列印并傳回一個父級下的子集
            son_result = self.ini_dictionary.options(Father_class_name)
            print(son_result)
            return son_result
    
    
        def ini_get_text(self,yous_want_get_class_father,yous_want_class_son): #擷取想要的元素的值
            the_result = self.ini_dictionary.get(yous_want_get_class_father,yous_want_class_son)
            return the_result
    
        def ini_writer(self,Father_name,son_name,Set_Value): #向指定元素進行修改
            try:
                self.ini_dictionary.set(Father_name,son_name,Set_Value)
                self.ini_dictionary.write(open(self.ini_name,"w"))
                print("writer-------->>>OK!")
            except:
                print("writer-------->>>error!")
    
        def the_son_dictionary_return(self,Father_name): #擷取子集的字典
            return self.ini_dictionary.items(Father_name)
            
    
               
    如何便是資料庫的控制類,我也封裝好了,代碼如下:
    import mysql.connector
    from INI_READER import ini_reader
    DATA_MAP = ini_reader("WEB_SETTINGS.ini")
    class sql_conter:
        def __init__(self,cont_sql_name):
            self.mydb = None
            try:
                #print(DATA_MAP.ini_get_text("mysql","MYSQL_IP")+DATA_MAP.ini_get_text("mysql","connect_id")+DATA_MAP.ini_get_text("mysql","localhost")+cont_sql_name)
                self.mydb = mysql.connector.connect(
        host=DATA_MAP.ini_get_text("mysql","MYSQL_IP"),       # 資料庫主機位址
        user=DATA_MAP.ini_get_text("mysql","connect_id"),    # 資料庫使用者名
        passwd=DATA_MAP.ini_get_text("mysql","password"),# 資料庫密碼
        database=cont_sql_name
    )
            except:
                print("CONTER ERROR Because your confige is error or you password error")
        def create_sql(self,sql_name):
            if self.mydb == None:
                print("please connect!")
            else:
                mycursor = self.mydb.cursor()
                mycursor.execute("CREATE DATABASE "+sql_name)
        def show_all_data_base(self):
            if self.mydb == None:
                print("please connect!")
            else:
                mycursor = self.mydb.cursor()
                mycursor.execute("SHOW DATABASES")
                data_list = []
                for x in mycursor:
                    data_list.append(x)
                return data_list
        def Create_table(self,table_name,table_drcitinaary): #table_dictionray -> {"name":"VARCHAR(255)","age":"VARCHAR(255)"} KEY -> {"id":"INT AUTO_INCREMENT PRIMARY KEY"}
            if self.mydb == None:
                print("please connect!")
            else:
                code_list = ""
                flage = False
                for data_name in table_drcitinaary:
                    if flage == False:
                        code_list = code_list + data_name +" "+table_drcitinaary[data_name]
                        flage = True
                    else:
                        code_list = code_list + ", " +data_name + " "+table_drcitinaary[data_name]
                mycursor = self.mydb.cursor()
                mycursor.execute("CREATE TABLE "+table_name+" ("+code_list+")")
        def ADD_PRIMARY_KEY_SET(self,table_name,key_name):
            if self.mydb == None:
                print("please connect!")
            else:
                mycursor = self.mydb.cursor()
                mycursor.execute("ALTER TABLE "+table_name+" ADD COLUMN "+key_name+" INT AUTO_INCREMENT PRIMARY KEY")
        def CIN_DATA(self,table_name,table_data_list,table_data): #table_data_list ->["name","url"] 
            #table_data - >[('Google', 'https://www.google.com'),('Github', 'https://www.github.com'),('Taobao', 'https://www.taobao.com'),('stackoverflow', 'https://www.stackoverflow.com/')]
            if self.mydb == None:
                print("please connect!")
            else:
                data_code = ""
                data_code_2 = ""
                flage = False
                for data_name in table_data_list:
                    if flage == False:
                        data_code = data_code+data_name
                        data_code_2 = data_code_2+"%s"
                        flage = True
                    else:
                        data_code = data_code +","+data_name
                        data_code_2 = data_code_2+", %s"
                sql_code = "INSERT INTO "+table_name+" ("+data_code+") VALUES ("+data_code_2+")"
                #print(sql_code)
                mycursor = self.mydb.cursor()
                mycursor.executemany(sql_code,table_data)
                self.mydb.commit()
        def sql_search(self,table_name,search_code):
            if self.mydb == None:
                print("please connect!")
            else:
                mycursor = self.mydb.cursor()
                mycursor.execute(search_code)
                myresult = mycursor.fetchall()#myresult - >[('Google', 'https://www.google.com'),('Github', 'https://www.github.com'),('Taobao', 'https://www.taobao.com'),('stackoverflow', 'https://www.stackoverflow.com/')]
                return myresult
        def delete_data(self,delete_code):
            if self.mydb == None:
                print("please connect!")
            else:
                mycursor = self.mydb.cursor()
                mycursor.execute(delete_code)
                self.mydb.commit()
        def delete_table(self,table_name):
            if self.mydb==None:
                print("please connect!")
            else:
                mycursor = self.mydb.cursor()
                sql_code = "DROP TABLE IF EXISTS "+table_name
                mycursor.execute(sql_code)
    
               
    POST資料請求接口:
    import urllib.request
    import ssl
    import random
    import os 
    from fake_useragent import UserAgent
    import requests
    ssl._create_default_https_context = ssl._create_unverified_context
    def htmlget(url,bianma,POST_OR_NOT,POST_data):  
        #POST_DATA IS DICTIONARY TYPE {"NAME":"123"}
        try:
            location = os.getcwd() + '\\data.json'
            ua = UserAgent(path=location)
            head = ua.random
            #print(head)
            headersl = {"User-Agent":head}
            if POST_OR_NOT == "YES":
                get_url = requests.post(url = url,data=POST_data,headers=headersl,timeout=5)
            else:
                get_url = requests.get(url,headers=headersl,timeout=5)
            get_url.encoding = bianma
            print("[+]LOG: GET SUCCESS")
            return get_url.text
        except:
            print("[-]LOG: GET ERROR")
            #print("連結目标網站逾時,更換随機header重試")
            while True:
                fla = 0
                try:
                    head = ua.random
                    #print(head)
                    headersl = {"User-Agent":head}
                    if POST_OR_NOT == "YES":
                        get_url = requests.post(url = url,data=POST_data,headers=headersl,timeout=5)
                    else:
                        get_url = requests.get(url,headers=headersl,timeout=5)
                    get_url.encoding = bianma
                    print("[+]LOG: GET SUCCESS")
                    return get_url.text
                except:
                    fla = fla +1
                    if fla ==4:
                        break
                        return None
                    #print("連結目标網站逾時,繼續更換")
    
    
               

    然後HTML資料分析我用LXML中的XPATH(我的推薦)

    下一步,初始化建立資料庫:

    from SQL_CONTERE import sql_conter
    sql_obj  = sql_conter("mysql")
    sql_obj.create_sql("m_index")
    del sql_obj
    sql_con_2 = sql_conter("m_index")
    code = {"m_bianhao":"VARCHAR(255) not null","m_name":"VARCHAR(255) not null","m_writer":"VARCHAR(255) not null","m_leixin":"VARCHAR(255) not null","m_img":"VARCHAR(255) not null","m_jianjie":"text not null","m_update":"datetime not null","m_zhandian":"VARCHAR(255) not null"}
    sql_con_2.Create_table("m_index",code)
    del sql_con_2
    sql_con_3 = sql_conter("m_index")
    sql_con_3.create_sql("m_z_sql_data")
    del sql_con_3
    print("STATIC SUCCESS")
    
    
               
    接下來就是章節的錄入和圖檔的下載下傳,因為圖檔是影響資料采集的最大因素,我選擇了多線程。Python3.8的特色 IO異步 asyncio
    import os
    from INI_READER import ini_reader
    import urllib.request
    import requests
    import asyncio
    rader = ini_reader("WEB_SETTINGS.ini")
    path = rader.ini_get_text("spdier_static_path","static_path")
    async def base_download(path,url,img_name):
        img_path = path+"\\"+img_name.replace(" ","").replace(".","")+".jpg"
        try:
            html = requests.get(url)
            print("[+]LOG : DOWNLOAD SUCCESS "+url)
            with open(img_path,"wb")as f:
                f.write(html.content)
        except:
            print("[-]LOG : DOWNLOAD ERROR  NOW REPLAY"+url)
            a= 1
            while True:
                try:
                    html = requests.get(url)
                    print("[+]LOG : DOWNLOAD SUCCESS "+url)
                    with open(img_path,"wb")as f:
                        f.write(html.content)
                    break
                except:
                    a = a+1
                    print("[-]LOG : DOWNLOAD ERROR "+url)
                    if a == 4:
                        w = open("LOG.txt","a",encoding="utf-8")
                        w.write("[-]LOG : DOWNLOAD ERROR "+url+"\n")
                        w.close()
                        break
    def file_maker(manhua_naem):
        ALL_path = path + "\\"+manhua_naem.replace(" ","").replace(",","").replace(".","")
        os.system("mkdir "+ALL_path)
        print("[+]LOG : MAKE FILE FOR M MAIN SUCCESS")
    def img_for_cin(img_list,ZJ_name,manhua_name):
        ALL_path = path + "\\"+manhua_name.replace(" ","").replace(",","").replace(".","")+"\\"+ZJ_name.replace(" ","").replace(",","").replace(".","")
        os.system("mkdir "+ALL_path)
        json_code = '{"zhangjie_name":"'+ZJ_name+'"'
        for data in range(len(img_list)):
            json_code = json_code + ',"'+str(data)+'":"'+img_list[data]+'"'
        json_code = json_code + "}"
        json_path = ALL_path + "\\imgs_j.json"
        print("[+]LOG : MAKE FILE FOR M ZHNAGJIE "+ZJ_name+" SUCCESS")
        f = open(json_path,"w",encoding="utf-8")
        f.write(json_code)
        f.close()
        loop = asyncio.get_event_loop()
        all_data_get_map = []
        for url_num in range(len(img_list)):
            all_data_get_map.append(base_download(ALL_path,img_list[url_num],str(url_num)))
        loop.run_until_complete(asyncio.wait(all_data_get_map))
               
    資料庫的資料錄入:
    from SQL_CONTERE import sql_conter
    def sql_index_cin(xinxi_dictionary):
        try:
            m_naem = xinxi_dictionary["m_name"].replace("\r","").replace("\n","")
            m_bianhao = xinxi_dictionary["m_bianhao"].replace("\r","").replace("\n","")
            m_writer = xinxi_dictionary["m_writer"].replace("\r","").replace("\n","")
            m_leixin = xinxi_dictionary["m_leixin"].replace("\r","").replace("\n","")
            m_img = xinxi_dictionary["m_img"].replace("\r","").replace("\n","")
            m_jianjie = xinxi_dictionary["m_jianjie"].replace("\r","").replace("\n","")
            m_update = xinxi_dictionary["m_update"].replace("\r","").replace("\n","")
            m_zhandian = xinxi_dictionary["m_zhandian"].replace("\r","").replace("\n","")
            sql_map_con =  sql_conter("m_index")
            ZY_list =["m_bianhao","m_name","m_writer","m_leixin","m_img","m_jianjie","m_update","m_zhandian"]
            data_list =[(m_bianhao,m_naem,m_writer,m_leixin,m_img,m_jianjie,m_update,m_zhandian)]
            #print(str(data_list))
            sql_map_con.CIN_DATA("m_index",ZY_list,data_list)
            del sql_map_con
            print("[+]LOG: data cin success")
        except:
            print("[-]LOG: ERROR!")
    def sql_zhangjie_maker(zhangjie_name_list,bianhao):
        sql_obj = sql_conter("m_z_sql_data")
        code = {"id":"VARCHAR(255) not null","m_zhangjie_name":"VARCHAR(255) not null"}
        sql_obj.Create_table(bianhao,code)
        map = ["id","m_zhangjie_name"]
        data_list = []
        for i in range(len(zhangjie_name_list)):
            apa = (str(i+1),zhangjie_name_list[i].replace(" ",""))
            data_list.append(apa)
        #print(str(data_list))
        sql_obj.CIN_DATA(bianhao,map,data_list)
        del sql_obj
        print("[+]LOG: CIN ZHANGJIE SQL SUCCESS")
               
    然後就是爬蟲代碼的主體了:
    from htmlget_base import htmlget_baseip
    from xpath_reader import xpathReader
    from SQL_CONTERE import sql_conter
    from breome_proxy import get_ip_base
    import asyncio
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    import time
    from UPDATE_HAVE import have_map
    from json_reader import Json_Reader
    from HTML_GET import htmlget
    from M_SQL_CIN import sql_index_cin
    from M_SQL_CIN import sql_zhangjie_maker
    from json_for_img_maker import img_for_cin
    from concurrent.futures import ThreadPoolExecutor
    from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
    from list_re import reverse_string
    chrome_options =Options()
    chrome_options.add_argument('--headless')
    def img_get(data_dic,m_name):
        desired_capabilities = DesiredCapabilities.CHROME
        desired_capabilities["pageLoadStrategy"] = "none"
        dr_console = webdriver.Chrome(options=chrome_options)
        for name in data_dic:
            dr_console.get(data_dic[name])
            time.sleep(3)
            try:
                img_list = dr_console.execute_script("return newImgs;")
            except:
                while  True:
                    try:
                        dr_console.close()
                        dr_console = webdriver.Chrome(options=chrome_options)
                        dr_console.get(data_dic[name])
                        time.sleep(5)
                        img_list = dr_console.execute_script("return newImgs;")
                        break
                    except:
                        pass
            img_for_cin(img_list,name,m_name) #異步圖檔下載下傳
        dr_console.close()
    def xinxi_make(m_z_index_url,id):
        ZHANGJEI_URL_GET_LIST = [] #漫畫章節的URL
        ZHANGJEI_NAME_GET_LIST = [] #對于漫畫章節
        flage = False
        html_data = htmlget(m_z_index_url,"utf-8","NO","")
        #print(html_data)
        x_html = xpathReader(html_data)
        TisHave = x_html.xpath('''//*[@id="chapter-list1"]/a/li/span/text()''')
        if TisHave == []:
            flage = True
        del TisHave
        if flage == True:
            ZJ_M_url = x_html.xpath('''//*[@id="chapter-list1"]/a/@href''')
            for zj_code in range(len(ZJ_M_url)):
                ZJ_M_url[zj_code] = "http://6mh6.com"+ZJ_M_url[zj_code]
            ZJ_M_name = x_html.xpath('''//*[@id="chapter-list1"]/a/li/p/text()''')
            PP = {"id":str(id),"id2":"1"}
            JSON_DATA = htmlget("http://6mh6.com/bookchapter/","utf-8","YES",PP)
            J_data = Json_Reader(JSON_DATA)
            for i in range(len(J_data)):
                Url = "http://6mh6.com/"+id+"/"+J_data[i]["chapterid"]+".html"
                ZJ_M_url.append(Url)
                ZJ_M_name.append(J_data[i]["chaptername"])
            ZHANGJEI_NAME_GET_LIST = reverse_string(ZJ_M_name)
            del ZJ_M_name
            ZHANGJEI_URL_GET_LIST = reverse_string(ZJ_M_url)
            del ZJ_M_url
            def dictontry(zj_naem_list,zj_url_list):
                dic = {}
                for x in range(len(zj_naem_list)):
                    dic[zj_naem_list[x]] = zj_url_list[x]
                return dic
            ALL_dic = dictontry(ZHANGJEI_NAME_GET_LIST,ZHANGJEI_URL_GET_LIST)
            del ZHANGJEI_URL_GET_LIST
            #章節字典 -->>> ALL_dic
            def ALL_xinxi_data_maker(X_obj):
                x_name = X_obj.xpath('''//*[@class="cartoon-title"]/text()''')[0].replace("\r","").replace("\n","")
                x_img = X_obj.xpath('''//*[@class="cartoon-poster"]/@src''')[0].replace("\r","").replace("\n","")
                x_jianjie = X_obj.xpath('''//*[@class="introduction"]/text()''')[0].replace("\r","").replace("\n","")
                x_writer = X_obj.xpath('''//*[@itemprop="actor"]/@content''')[0].replace("\r","").replace("\n","")
                x_bianhao = "6MH6"+str(id)
                x_leixin = "玄幻||戀愛||穿越||熱血||古風(LOG:無法判别類型,采用全局)"
                x_update_time = X_obj.xpath('''//*[@itemprop="uploadDate"]/@content''')[0].replace("\r","").replace("\n","")
                ALL_INDEX_XINXI_MAP = {}
                ALL_INDEX_XINXI_MAP["m_bianhao"] = x_bianhao
                ALL_INDEX_XINXI_MAP["m_name"] = x_name
                ALL_INDEX_XINXI_MAP["m_writer"]= x_writer
                ALL_INDEX_XINXI_MAP["m_leixin"]= x_leixin
                ALL_INDEX_XINXI_MAP["m_img"] = x_img
                ALL_INDEX_XINXI_MAP["m_jianjie"] = x_jianjie
                ALL_INDEX_XINXI_MAP["m_update"] = x_update_time
                ALL_INDEX_XINXI_MAP["m_zhandian"] = "6MH6"
                return ALL_INDEX_XINXI_MAP
            X_data = ALL_xinxi_data_maker(x_html)
            zt = have_map(X_data["m_name"])
            if zt == True:
                del html_data
                del x_html
                sql_index_cin(X_data)
                sql_zhangjie_maker(ZHANGJEI_NAME_GET_LIST,X_data["m_bianhao"])
                img_get(ALL_dic,X_data["m_name"])
            else:
                del html_data
                del x_html
    def Main_Get(How_Many_carton):
        for num in range(How_Many_carton):
            xinxi_make("http://6mh6.com/"+str(14400+num)+"/",str(14400+num))
            print("----->>>>>進度:"+str((num+1)/How_Many_carton*100))
    if __name__ == '__main__':
        Main_Get(100)
    
               

    如果想代碼多線程可以用asyncio對Main_Get函數和主函數稍作修改

    運作圖如下:

    Python漫畫爬蟲開源 66漫畫 AJAX,包含資料庫連接配接,圖檔下載下傳處理
    Python漫畫爬蟲開源 66漫畫 AJAX,包含資料庫連接配接,圖檔下載下傳處理
    Python漫畫爬蟲開源 66漫畫 AJAX,包含資料庫連接配接,圖檔下載下傳處理
    轉載請留言,謝謝!!!!