天天看點

python多線程資料庫操作_python多線程 + 批量插入 資料庫 健壯你的小爬蟲

參考連結:

python多線程資料庫操作_python多線程 + 批量插入 資料庫 健壯你的小爬蟲

擷取 url,标題以及時間。。。。(很簡單的奧,xpath一下子就可以提取到了)

(這次主要是練習多線程和批處理存入資料庫的,是以隻是簡單的解析,擷取頁面元素)

重要思路: 開啟多線程時

1)首先将爬取的url 放入到資料結構的隊列裡,保證資料安全。

2)将爬取到的結果,全部存入到一個結果集隊列裡,進行下一步的操作。

3)隊列裡的get和put方法不要混淆,put是向隊列裡添加元素,get是取出或者是踢出并傳回這個元素!!!!

第一步: 建立線程以及存儲隊列:

def main():

start_url = Queue.Queue() # 存放url的隊列

result_queue = Queue.Queue() # 結果集隊列

for i in range(1, 3):# 網站分頁

page_url = 'http://data.stcn.com/list/djsj_%s.shtml' % i

start_url.put(page_url) # 将值添加到start_url隊列中

# 建構線程

thread_list = [] # 存放線程的容器

for n in range(4): # 一次運作4 個線程

# 建立線程,target調用get_news_url方法,args傳入參數

t_t = threading.Thread(target=get_news_url, args=(start_url, result_queue))

thread_list.append(t_t)

for t in thread_list:

t.start() # 啟動線程

第二步:解析網頁,擷取目标元素(不多介紹了哈)

def get_news_url(start_url, result_queue): # 在main方法裡傳入參數

result = []

while start_url.qsize():

page_url = start_url.get() # 從隊列中取出并傳回這個資料

try:

response = requests.get(page_url)

except Exception as e:

print "抓取網頁錯誤,錯誤為:%s" % e

return None

if response.status_code == 200:

selector = etree.HTML(response.text)

web_content = selector.xpath('//p[@class="tit"]')

for news in web_content:

item_result = {}

item_result['href'] = news.xpath('a/@href')[0]

item_result['title'] = news.xpath('a/text()')[0]

item_result['date_news'] = news.xpath('span/text()')[0]

result.append(item_result)

if len(result) > 0:#如果result裡有資料的話,

result_queue.put(result) # put是向結果集 隊列裡添加元素result

start_url.task_done() #是指這個任務結束

else:

time.sleep(5)

第三步: 存,批量插入Mysql資料庫

到這一步我們擷取得到的資料結構是形如這樣的[{},{},{},{}]:

而插入資料庫的關鍵,就是擷取插入的值!!!這裡周遊出是個字典格式的資料,是以需要用dict的方法擷取元素!!!

核心代碼就兩行!!!!

data = [item.values() for item in result] #周遊得到每個{}裡的values值

cur.executemany(sql2, tuple(data)) #記得一定要轉化成元組

print 'insert sucessful'

多條記錄的插入,需要用executemany(速度哦,真的比之前快好多好多~~~)

不要忘記大前提

連結資料庫 和 建立資料庫 和表(這部分 可以手動建立 也可以代碼建立)

代碼連結資料庫 和建立 資料表

def save_news_mysql(result):

con = MySQLdb.connect(host= '127.0.0.1', user= 'root', passwd= '123456' ,charset='utf8',port = 3306)

cur = con.cursor()

sql = 'create database if not exists cstn_database default charset utf8'

cur.execute(sql)

con.select_db('cstn_database') # 以上是連接配接和建立 資料庫

sql = 'create table if not exists news_cstn'+"(id int auto_increment, href varchar(255), title varchar(255),\date_news varchar(255), primary key(ID))"

cur.execute(sql) # 建立表結構 這部分代碼以後直接在mysql裡建立就可以了

sql2 = 'insert into news_cstn (href,title,date_news) VALUES (%s,%s,%s)'

data = [item.values() for item in result]

cur.executemany(sql2, tuple(data))

print 'insert sucessful'

資料庫名:cstn_database

表名:cstn

字段:id,href,title,data_news

看似簡單,其實虐我好久丫!!!

敬上完整代碼:

# -*- coding: UTF-8 -*-

import requests

from lxml import etree

import csv

import MySQLdb

import xlwt

import Queue

import threading

import time

import sys

reload(sys)

sys.setdefaultencoding('utf-8')

# from util.crawler import Header, Proxy 代理 請求頭 我放在另個檔案夾

# from database.db import Database

#

# con = Database.getConnection() # 連接配接資料庫

# cur = con.cursor() # 遊标對象

def get_news_url(start_url, result_queue):

result = []

while start_url.qsize():

page_url = start_url.get() # 從隊列中移除并傳回這個資料

try:

response = requests.get(page_url)

except Exception as e:

print "抓取網頁錯誤,錯誤為:%s" % e

return None

if response.status_code == 200:

selector = etree.HTML(response.text)

web_content = selector.xpath('//p[@class="tit"]')

for news in web_content:

item_result = {}

item_result['href'] = news.xpath('a/@href')[0]

item_result['title'] = news.xpath('a/text()')[0]

item_result['date_news'] = news.xpath('span/text()')[0]

result.append(item_result)

if len(result) > 0:

result_queue.put(result) # put是向結果集 隊列裡添加元素

start_url.task_done()

else:

time.sleep(5)

def save_to_excel(result):

workbook = xlwt.Workbook()

sheet = workbook.add_sheet('result3')

title = ['href','title','date']

for i ,item in enumerate(title):

sheet.write(0, i, item)

data = [item.values() for item in result]

print data

for row, item in enumerate(data):

for i, info in enumerate(item):

print row+1, i ,info

sheet.write(row+1 , i , info)

workbook.save('Myresult.xls')

def save_news_mysql(result):

con = MySQLdb.connect(host= '127.0.0.1', user= 'root', passwd= '123456' ,charset='utf8',port = 3306)

cur = con.cursor()

sql = 'create database if not exists cstn_database default charset utf8'

cur.execute(sql)

con.select_db('cstn_database')

sql = 'create table if not exists news_cstn'+"(id int auto_increment, href varchar(255), title varchar(255), \

date_news varchar(255), primary key(ID))"

cur.execute(sql) # 建立表結構 這部分代碼以後直接在mysql裡建立就可以了

sql2 = 'insert into news_cstn (href,title,date_news) VALUES (%s,%s,%s)'

data = [item.values() for item in result]

cur.executemany(sql2, tuple(data))

print 'insert sucessful'

# for elem in result: # 單條插入

# sql = 'insert into news_cstn (href,title,date_news) VALUES (\'%s\',\'%s\',\'%s\')' % (elem['href'],elem['title'],elem['date_news'])

# cur.execute(sql)

con.commit()

cur.close()

con.close()

def main():

start_url = Queue.Queue() # 存放url的隊列

result_queue = Queue.Queue() # 結果集隊列

for i in range(1, 3):

page_url = 'http://data.stcn.com/list/djsj_%s.shtml' % i

start_url.put(page_url) # 将值插入隊列中

# 建構線程

thread_list = []

for n in range(4): # 建立4 個線程

t_t = threading.Thread(target=get_news_url, args=(start_url, result_queue)) # 建立線程,調用get_news_url方法,args傳入參數

thread_list.append(t_t)

for t in thread_list:

t.start()

start_url.join() # 就是當所有的url全部擷取完,放入到結果集裡才開始存入資料庫,防止出現 插入資料庫報錯的情況

while result_queue.qsize(): # 傳回隊列的大小

save_news_mysql(result_queue.get()) # 将結果存入資料庫中

if __name__ == "__main__":

main()