一.
二.程序
使用multiprocessing庫:
from multiprocessing import Pool, Manager
代碼模闆如下:
def mycallback(task):
with open('./tmp','a+',encoding='utf-8') as f:
f.write(task)
def method(task):
return task
def get_queue():
manager=Manager()
q=manager.Queue()
for i in range(1,100):
q.put('task{}'.format(i))
return q
def main():
q=get_queue()
pool = Pool(processes=4)
'''
for task in tasks:
result=pool.apply_async(method, args=(task, ), callback=mycallback)
res=result.get() #1.沒有get方法,有可能沒有子程序報錯資訊!!2.在for内部添加此行代碼,将變成順序執行各個子程序,而不是并行執行。
建議改為以下兩行代碼
'''
results=[pool.apply_async(method, args=(task, ), callback=mycallback) for task in tasks]
final_results=[result.get() for result in results] #result.get()得到子程序method函數的傳回值,而不是回調函數的傳回值
pool.close() #關閉pool,程序池不再接受主程序新的任務
pool.join() #阻塞主程序,等子程序運作完成後,再把主程序關掉
需要注意的細節:
1.
Pool可以提供指定數量的程序供使用者調用。當有新的請求送出到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執>行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序。
2.
Manager實作程序間通信,資料共享。
3.
apply_async是異步非阻塞式。非阻塞即子程序不用等待目前程序執行完畢,而是随時由作業系統排程來進行程序切換,多個子程序并行執行,提高程式的執行效率。(args參數要多一個',')
4.
擷取子程序傳回值。
res=pool.apply_async(method, args=(task, ), callback=mycallback).get()
5.
多程序讀寫同一檔案。
使用回調函數callback,method的傳回值傳遞給callback。(method方法中讀寫會造成子程序阻塞等問題)
6.
pool.close() 關閉pool,使其不再接受新的任務。
7.
pool.join() 阻塞主程序,使主程序在此處等所有待子程序完成。
8. tmp
Pool:多程序和小IO
Process:程序數量很少且IO操作很長
9.報錯:
import queue
q=queue.Queue()
TypeError: can't pickle _thread.lock objects
原因:queue.Queue() was built for threading, using in-memory locks.
In a Multiprocess environment, each subprocess would get it's own copy of a queue.Queue() instance in their own memory space, since subprocesses don't share memory.
import multiprocessing
q=multiprocessing.Queue()
RuntimeError: Queue objects should only be shared between processes through inheritance
解決:用Manager來建立共享變量
eg:
lock=multiprocessing.Manager().Lock()
index=multiprocessing.Manager().Value('i',0)
q=multiprocessing.Manager().Queue()
import multiprocessing
m=multiprocessing.Manager()
q=m.Queue()
三.線程
使用threading庫
import threading
代碼模闆:
class MyThread(threading.Thread):
def __init__(self,tname,q):
threading.Thread.__init__(self)
self.tname=tname
self.q=q
def run(self):
print(self.tname,' start')
crawl(self.tname,self.q)
def crawl(tname,q):
global index
while q.qsize()!=0:
threadLock.acquire()
index+=1
tmp_index=index
num=q.get()
threadLock.release()
print(tmp_index, tname, num)
def main():
q=queue.Queue()
for i in range(1,100):
q.put(i)
threads=[]
thread_list=['crawler_1','crawler_2','crawler_3']
for tname in thread_list:
thread=MyThread(tname,q)
thread.start()
threads.append(thread)
for t in threads:
t.join()
index=0
threadLock = threading.Lock()
proxy_queue=queue.Queue()
main()
1.
threading.Lock() 控制不同程序間讀寫同一變量。