小白爬蟲,大神繞道
軟體:mysql8 python3.8
首先觀察漫畫網站結構
2020/2/15 VictorGanro原創
僅學習使用
目标網站連結: http://6mh6.com/(我比較喜歡的漫畫網站,超多付費漫畫,白嫖的我笑了)
本來已漫畫網站的分類來作為漫畫索引
觀察 點我檢視代碼
發現Ajax采用Post的方法,這沒什麼大不了的。但是其API之請求傳回100個漫畫資料會無資料。是以放棄這種索引方案
抱着一絲沮喪,點開多本漫畫進行觀察 例如http://6mh6.com/16041/
可以發現漫畫網址結構為 http://6mh6.com/(漫畫id)
這樣測試和發現有上萬本漫畫
然後就是漫畫資訊爬取,光請求HTML可以發現僅僅隻要小部分章節的資料,還是要Ajax請求
繼續觀察JS代碼,代碼如下:`
$(".gengduo_dt1").one(‘click’,function(){
$(".gengduo_dt1 a").text(’…’);
var id = $(".gengduo_dt1").data(“id”);
var vid = $(".gengduo_dt1").data(“vid”);
$.ajax({
type: “POST”,
url: “/bookchapter/”,
data: { “id”: id, “id2”: vid},
dataType: ‘json’,
success: function (res) {
for (var a = 0; a < res.length; a++){
var mh_id=res[a].chapterid;
var mh_name=res[a].chaptername;
var b_sn=’
- ’+mh_name+’
-
’;
$("#chapter-list1").append(b_sn);
}
$(".gengduo_dt1").css(‘display’,‘none’);
}
});
});
$(".gengduo_dt2").one(‘click’,function(){
$(".gengduo_dt2 a").text(‘姝e湪鍔犺澆…’);
var id = $(".gengduo_dt2").data(“id”);
var vid = $(".gengduo_dt2").data(“vid”);
$.ajax({
type: “POST”,
url: “/bookchapter/”,
data: { “id”: id, “id2”: vid},
dataType: ‘json’,
success: function (res) {
for (var a = 0; a < res.length; a++){
var mh_id=res[a].chapterid;
var mh_name=res[a].chaptername;
var b_sn=’
- ’+mh_name+’
-
’;
$("#chapter-list2").append(b_sn);
}
$(".gengduo_dt2").css(‘display’,‘none’);
}
});
});
`
gengduo_dt1 為第一個資料點的章節CLASS名
Post結構為可以從代碼中可看出
data: { “id”: id, “id2”: vid},
id ->id
id2->vid
那麼這2個資料從哪裡可看出,其實挺簡單的
var id = $(".gengduo_dt2").data(“id”);
var vid = $(".gengduo_dt2").data(“vid”);
就再标簽中或者直接分析 vid為站點的編号(現 1 或者2)然後id就是漫畫編号
但是你會發現章節是從大到小
對于Python玩家隻是一個數組倒反
觀察閱讀頁面,是下拉式,NETWORK沒反應,HTML請求沒代碼,JS混淆到看不懂,但是控制台是個好東西
觀察代碼如
點我看代碼
它是一個全局變量擷取的,名為 newImgs的變量
通過Console可以看出
這個方法是正确的,那麼可以使用 selenium來解決
分析完畢開始寫代碼
以下代碼均為原創封裝,喜歡就複制吧!!!
首先我喜歡讀取INI檔案來寫爬蟲參數,然後封裝,代碼如下
如何便是資料庫的控制類,我也封裝好了,代碼如下:import configparser import chardet class ini_reader: def __init__(self,ini_name): #打開檔案并讀取分析 def bianma(ini_de_name): f3 = open(file=ini_de_name,mode='rb') data = f3.read() f3.close() result = chardet.detect(data) bianmas = result['encoding'] return bianmas self.ini_name = ini_name config = configparser.ConfigParser() self.ini_dictionary = config ini_bianma = bianma(ini_name) config.read(self.ini_name,encoding=ini_bianma) def ini_Classof_AllFatherPrintAndReturn(self): #列印并傳回所有的父級元素 father_result = self.ini_dictionary.sections() print(father_result) return father_result def ini_ClassOf_AllSonPrintAndReturn(self,Father_class_name): #列印并傳回一個父級下的子集 son_result = self.ini_dictionary.options(Father_class_name) print(son_result) return son_result def ini_get_text(self,yous_want_get_class_father,yous_want_class_son): #擷取想要的元素的值 the_result = self.ini_dictionary.get(yous_want_get_class_father,yous_want_class_son) return the_result def ini_writer(self,Father_name,son_name,Set_Value): #向指定元素進行修改 try: self.ini_dictionary.set(Father_name,son_name,Set_Value) self.ini_dictionary.write(open(self.ini_name,"w")) print("writer-------->>>OK!") except: print("writer-------->>>error!") def the_son_dictionary_return(self,Father_name): #擷取子集的字典 return self.ini_dictionary.items(Father_name)
POST資料請求接口:import mysql.connector from INI_READER import ini_reader DATA_MAP = ini_reader("WEB_SETTINGS.ini") class sql_conter: def __init__(self,cont_sql_name): self.mydb = None try: #print(DATA_MAP.ini_get_text("mysql","MYSQL_IP")+DATA_MAP.ini_get_text("mysql","connect_id")+DATA_MAP.ini_get_text("mysql","localhost")+cont_sql_name) self.mydb = mysql.connector.connect( host=DATA_MAP.ini_get_text("mysql","MYSQL_IP"), # 資料庫主機位址 user=DATA_MAP.ini_get_text("mysql","connect_id"), # 資料庫使用者名 passwd=DATA_MAP.ini_get_text("mysql","password"),# 資料庫密碼 database=cont_sql_name ) except: print("CONTER ERROR Because your confige is error or you password error") def create_sql(self,sql_name): if self.mydb == None: print("please connect!") else: mycursor = self.mydb.cursor() mycursor.execute("CREATE DATABASE "+sql_name) def show_all_data_base(self): if self.mydb == None: print("please connect!") else: mycursor = self.mydb.cursor() mycursor.execute("SHOW DATABASES") data_list = [] for x in mycursor: data_list.append(x) return data_list def Create_table(self,table_name,table_drcitinaary): #table_dictionray -> {"name":"VARCHAR(255)","age":"VARCHAR(255)"} KEY -> {"id":"INT AUTO_INCREMENT PRIMARY KEY"} if self.mydb == None: print("please connect!") else: code_list = "" flage = False for data_name in table_drcitinaary: if flage == False: code_list = code_list + data_name +" "+table_drcitinaary[data_name] flage = True else: code_list = code_list + ", " +data_name + " "+table_drcitinaary[data_name] mycursor = self.mydb.cursor() mycursor.execute("CREATE TABLE "+table_name+" ("+code_list+")") def ADD_PRIMARY_KEY_SET(self,table_name,key_name): if self.mydb == None: print("please connect!") else: mycursor = self.mydb.cursor() mycursor.execute("ALTER TABLE "+table_name+" ADD COLUMN "+key_name+" INT AUTO_INCREMENT PRIMARY KEY") def CIN_DATA(self,table_name,table_data_list,table_data): #table_data_list ->["name","url"] #table_data - >[('Google', 'https://www.google.com'),('Github', 'https://www.github.com'),('Taobao', 'https://www.taobao.com'),('stackoverflow', 'https://www.stackoverflow.com/')] if self.mydb == None: print("please connect!") else: data_code = "" data_code_2 = "" flage = False for data_name in table_data_list: if flage == False: data_code = data_code+data_name data_code_2 = data_code_2+"%s" flage = True else: data_code = data_code +","+data_name data_code_2 = data_code_2+", %s" sql_code = "INSERT INTO "+table_name+" ("+data_code+") VALUES ("+data_code_2+")" #print(sql_code) mycursor = self.mydb.cursor() mycursor.executemany(sql_code,table_data) self.mydb.commit() def sql_search(self,table_name,search_code): if self.mydb == None: print("please connect!") else: mycursor = self.mydb.cursor() mycursor.execute(search_code) myresult = mycursor.fetchall()#myresult - >[('Google', 'https://www.google.com'),('Github', 'https://www.github.com'),('Taobao', 'https://www.taobao.com'),('stackoverflow', 'https://www.stackoverflow.com/')] return myresult def delete_data(self,delete_code): if self.mydb == None: print("please connect!") else: mycursor = self.mydb.cursor() mycursor.execute(delete_code) self.mydb.commit() def delete_table(self,table_name): if self.mydb==None: print("please connect!") else: mycursor = self.mydb.cursor() sql_code = "DROP TABLE IF EXISTS "+table_name mycursor.execute(sql_code)
import urllib.request import ssl import random import os from fake_useragent import UserAgent import requests ssl._create_default_https_context = ssl._create_unverified_context def htmlget(url,bianma,POST_OR_NOT,POST_data): #POST_DATA IS DICTIONARY TYPE {"NAME":"123"} try: location = os.getcwd() + '\\data.json' ua = UserAgent(path=location) head = ua.random #print(head) headersl = {"User-Agent":head} if POST_OR_NOT == "YES": get_url = requests.post(url = url,data=POST_data,headers=headersl,timeout=5) else: get_url = requests.get(url,headers=headersl,timeout=5) get_url.encoding = bianma print("[+]LOG: GET SUCCESS") return get_url.text except: print("[-]LOG: GET ERROR") #print("連結目标網站逾時,更換随機header重試") while True: fla = 0 try: head = ua.random #print(head) headersl = {"User-Agent":head} if POST_OR_NOT == "YES": get_url = requests.post(url = url,data=POST_data,headers=headersl,timeout=5) else: get_url = requests.get(url,headers=headersl,timeout=5) get_url.encoding = bianma print("[+]LOG: GET SUCCESS") return get_url.text except: fla = fla +1 if fla ==4: break return None #print("連結目标網站逾時,繼續更換")
然後HTML資料分析我用LXML中的XPATH(我的推薦)
下一步,初始化建立資料庫:
接下來就是章節的錄入和圖檔的下載下傳,因為圖檔是影響資料采集的最大因素,我選擇了多線程。Python3.8的特色 IO異步 asynciofrom SQL_CONTERE import sql_conter sql_obj = sql_conter("mysql") sql_obj.create_sql("m_index") del sql_obj sql_con_2 = sql_conter("m_index") code = {"m_bianhao":"VARCHAR(255) not null","m_name":"VARCHAR(255) not null","m_writer":"VARCHAR(255) not null","m_leixin":"VARCHAR(255) not null","m_img":"VARCHAR(255) not null","m_jianjie":"text not null","m_update":"datetime not null","m_zhandian":"VARCHAR(255) not null"} sql_con_2.Create_table("m_index",code) del sql_con_2 sql_con_3 = sql_conter("m_index") sql_con_3.create_sql("m_z_sql_data") del sql_con_3 print("STATIC SUCCESS")
資料庫的資料錄入:import os from INI_READER import ini_reader import urllib.request import requests import asyncio rader = ini_reader("WEB_SETTINGS.ini") path = rader.ini_get_text("spdier_static_path","static_path") async def base_download(path,url,img_name): img_path = path+"\\"+img_name.replace(" ","").replace(".","")+".jpg" try: html = requests.get(url) print("[+]LOG : DOWNLOAD SUCCESS "+url) with open(img_path,"wb")as f: f.write(html.content) except: print("[-]LOG : DOWNLOAD ERROR NOW REPLAY"+url) a= 1 while True: try: html = requests.get(url) print("[+]LOG : DOWNLOAD SUCCESS "+url) with open(img_path,"wb")as f: f.write(html.content) break except: a = a+1 print("[-]LOG : DOWNLOAD ERROR "+url) if a == 4: w = open("LOG.txt","a",encoding="utf-8") w.write("[-]LOG : DOWNLOAD ERROR "+url+"\n") w.close() break def file_maker(manhua_naem): ALL_path = path + "\\"+manhua_naem.replace(" ","").replace(",","").replace(".","") os.system("mkdir "+ALL_path) print("[+]LOG : MAKE FILE FOR M MAIN SUCCESS") def img_for_cin(img_list,ZJ_name,manhua_name): ALL_path = path + "\\"+manhua_name.replace(" ","").replace(",","").replace(".","")+"\\"+ZJ_name.replace(" ","").replace(",","").replace(".","") os.system("mkdir "+ALL_path) json_code = '{"zhangjie_name":"'+ZJ_name+'"' for data in range(len(img_list)): json_code = json_code + ',"'+str(data)+'":"'+img_list[data]+'"' json_code = json_code + "}" json_path = ALL_path + "\\imgs_j.json" print("[+]LOG : MAKE FILE FOR M ZHNAGJIE "+ZJ_name+" SUCCESS") f = open(json_path,"w",encoding="utf-8") f.write(json_code) f.close() loop = asyncio.get_event_loop() all_data_get_map = [] for url_num in range(len(img_list)): all_data_get_map.append(base_download(ALL_path,img_list[url_num],str(url_num))) loop.run_until_complete(asyncio.wait(all_data_get_map))
然後就是爬蟲代碼的主體了:from SQL_CONTERE import sql_conter def sql_index_cin(xinxi_dictionary): try: m_naem = xinxi_dictionary["m_name"].replace("\r","").replace("\n","") m_bianhao = xinxi_dictionary["m_bianhao"].replace("\r","").replace("\n","") m_writer = xinxi_dictionary["m_writer"].replace("\r","").replace("\n","") m_leixin = xinxi_dictionary["m_leixin"].replace("\r","").replace("\n","") m_img = xinxi_dictionary["m_img"].replace("\r","").replace("\n","") m_jianjie = xinxi_dictionary["m_jianjie"].replace("\r","").replace("\n","") m_update = xinxi_dictionary["m_update"].replace("\r","").replace("\n","") m_zhandian = xinxi_dictionary["m_zhandian"].replace("\r","").replace("\n","") sql_map_con = sql_conter("m_index") ZY_list =["m_bianhao","m_name","m_writer","m_leixin","m_img","m_jianjie","m_update","m_zhandian"] data_list =[(m_bianhao,m_naem,m_writer,m_leixin,m_img,m_jianjie,m_update,m_zhandian)] #print(str(data_list)) sql_map_con.CIN_DATA("m_index",ZY_list,data_list) del sql_map_con print("[+]LOG: data cin success") except: print("[-]LOG: ERROR!") def sql_zhangjie_maker(zhangjie_name_list,bianhao): sql_obj = sql_conter("m_z_sql_data") code = {"id":"VARCHAR(255) not null","m_zhangjie_name":"VARCHAR(255) not null"} sql_obj.Create_table(bianhao,code) map = ["id","m_zhangjie_name"] data_list = [] for i in range(len(zhangjie_name_list)): apa = (str(i+1),zhangjie_name_list[i].replace(" ","")) data_list.append(apa) #print(str(data_list)) sql_obj.CIN_DATA(bianhao,map,data_list) del sql_obj print("[+]LOG: CIN ZHANGJIE SQL SUCCESS")
from htmlget_base import htmlget_baseip from xpath_reader import xpathReader from SQL_CONTERE import sql_conter from breome_proxy import get_ip_base import asyncio from selenium import webdriver from selenium.webdriver.chrome.options import Options import time from UPDATE_HAVE import have_map from json_reader import Json_Reader from HTML_GET import htmlget from M_SQL_CIN import sql_index_cin from M_SQL_CIN import sql_zhangjie_maker from json_for_img_maker import img_for_cin from concurrent.futures import ThreadPoolExecutor from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from list_re import reverse_string chrome_options =Options() chrome_options.add_argument('--headless') def img_get(data_dic,m_name): desired_capabilities = DesiredCapabilities.CHROME desired_capabilities["pageLoadStrategy"] = "none" dr_console = webdriver.Chrome(options=chrome_options) for name in data_dic: dr_console.get(data_dic[name]) time.sleep(3) try: img_list = dr_console.execute_script("return newImgs;") except: while True: try: dr_console.close() dr_console = webdriver.Chrome(options=chrome_options) dr_console.get(data_dic[name]) time.sleep(5) img_list = dr_console.execute_script("return newImgs;") break except: pass img_for_cin(img_list,name,m_name) #異步圖檔下載下傳 dr_console.close() def xinxi_make(m_z_index_url,id): ZHANGJEI_URL_GET_LIST = [] #漫畫章節的URL ZHANGJEI_NAME_GET_LIST = [] #對于漫畫章節 flage = False html_data = htmlget(m_z_index_url,"utf-8","NO","") #print(html_data) x_html = xpathReader(html_data) TisHave = x_html.xpath('''//*[@id="chapter-list1"]/a/li/span/text()''') if TisHave == []: flage = True del TisHave if flage == True: ZJ_M_url = x_html.xpath('''//*[@id="chapter-list1"]/a/@href''') for zj_code in range(len(ZJ_M_url)): ZJ_M_url[zj_code] = "http://6mh6.com"+ZJ_M_url[zj_code] ZJ_M_name = x_html.xpath('''//*[@id="chapter-list1"]/a/li/p/text()''') PP = {"id":str(id),"id2":"1"} JSON_DATA = htmlget("http://6mh6.com/bookchapter/","utf-8","YES",PP) J_data = Json_Reader(JSON_DATA) for i in range(len(J_data)): Url = "http://6mh6.com/"+id+"/"+J_data[i]["chapterid"]+".html" ZJ_M_url.append(Url) ZJ_M_name.append(J_data[i]["chaptername"]) ZHANGJEI_NAME_GET_LIST = reverse_string(ZJ_M_name) del ZJ_M_name ZHANGJEI_URL_GET_LIST = reverse_string(ZJ_M_url) del ZJ_M_url def dictontry(zj_naem_list,zj_url_list): dic = {} for x in range(len(zj_naem_list)): dic[zj_naem_list[x]] = zj_url_list[x] return dic ALL_dic = dictontry(ZHANGJEI_NAME_GET_LIST,ZHANGJEI_URL_GET_LIST) del ZHANGJEI_URL_GET_LIST #章節字典 -->>> ALL_dic def ALL_xinxi_data_maker(X_obj): x_name = X_obj.xpath('''//*[@class="cartoon-title"]/text()''')[0].replace("\r","").replace("\n","") x_img = X_obj.xpath('''//*[@class="cartoon-poster"]/@src''')[0].replace("\r","").replace("\n","") x_jianjie = X_obj.xpath('''//*[@class="introduction"]/text()''')[0].replace("\r","").replace("\n","") x_writer = X_obj.xpath('''//*[@itemprop="actor"]/@content''')[0].replace("\r","").replace("\n","") x_bianhao = "6MH6"+str(id) x_leixin = "玄幻||戀愛||穿越||熱血||古風(LOG:無法判别類型,采用全局)" x_update_time = X_obj.xpath('''//*[@itemprop="uploadDate"]/@content''')[0].replace("\r","").replace("\n","") ALL_INDEX_XINXI_MAP = {} ALL_INDEX_XINXI_MAP["m_bianhao"] = x_bianhao ALL_INDEX_XINXI_MAP["m_name"] = x_name ALL_INDEX_XINXI_MAP["m_writer"]= x_writer ALL_INDEX_XINXI_MAP["m_leixin"]= x_leixin ALL_INDEX_XINXI_MAP["m_img"] = x_img ALL_INDEX_XINXI_MAP["m_jianjie"] = x_jianjie ALL_INDEX_XINXI_MAP["m_update"] = x_update_time ALL_INDEX_XINXI_MAP["m_zhandian"] = "6MH6" return ALL_INDEX_XINXI_MAP X_data = ALL_xinxi_data_maker(x_html) zt = have_map(X_data["m_name"]) if zt == True: del html_data del x_html sql_index_cin(X_data) sql_zhangjie_maker(ZHANGJEI_NAME_GET_LIST,X_data["m_bianhao"]) img_get(ALL_dic,X_data["m_name"]) else: del html_data del x_html def Main_Get(How_Many_carton): for num in range(How_Many_carton): xinxi_make("http://6mh6.com/"+str(14400+num)+"/",str(14400+num)) print("----->>>>>進度:"+str((num+1)/How_Many_carton*100)) if __name__ == '__main__': Main_Get(100)
如果想代碼多線程可以用asyncio對Main_Get函數和主函數稍作修改
運作圖如下:
轉載請留言,謝謝!!!!