天天看點

Python多程序和多線程

一.

二.程序

使用multiprocessing庫:

from multiprocessing import Pool, Manager
           

代碼模闆如下:

def mycallback(task):
    with open('./tmp','a+',encoding='utf-8') as f:
        f.write(task)
   

def method(task):
    return task


def get_queue():
    manager=Manager()
    q=manager.Queue()

    for i in range(1,100):
        q.put('task{}'.format(i))

    return q


def main():
    q=get_queue()

    pool = Pool(processes=4)
    '''
    for task in tasks:
        result=pool.apply_async(method, args=(task, ), callback=mycallback)
        res=result.get() #1.沒有get方法,有可能沒有子程序報錯資訊!!2.在for内部添加此行代碼,将變成順序執行各個子程序,而不是并行執行。
    建議改為以下兩行代碼
    '''
    results=[pool.apply_async(method, args=(task, ), callback=mycallback) for task in tasks]
    final_results=[result.get() for result in results] #result.get()得到子程序method函數的傳回值,而不是回調函數的傳回值 

    pool.close() #關閉pool,程序池不再接受主程序新的任務
    pool.join() #阻塞主程序,等子程序運作完成後,再把主程序關掉
           

需要注意的細節:

1.

Pool可以提供指定數量的程序供使用者調用。當有新的請求送出到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執>行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序。

2.

Manager實作程序間通信,資料共享。

3.

apply_async是異步非阻塞式。非阻塞即子程序不用等待目前程序執行完畢,而是随時由作業系統排程來進行程序切換,多個子程序并行執行,提高程式的執行效率。(args參數要多一個',')

4.

擷取子程序傳回值。

res=pool.apply_async(method, args=(task, ), callback=mycallback).get()
           

5.

多程序讀寫同一檔案。

使用回調函數callback,method的傳回值傳遞給callback。(method方法中讀寫會造成子程序阻塞等問題)

6.

pool.close() 關閉pool,使其不再接受新的任務。

7.

pool.join() 阻塞主程序,使主程序在此處等所有待子程序完成。

8. tmp

Pool:多程序和小IO

Process:程序數量很少且IO操作很長

9.報錯:

import queue
q=queue.Queue()
TypeError: can't pickle _thread.lock objects

原因:queue.Queue() was built for threading, using in-memory locks. 
In a Multiprocess environment, each subprocess would get it's own copy of a queue.Queue() instance in their own memory space, since subprocesses don't share memory.
           
import multiprocessing
q=multiprocessing.Queue()
RuntimeError: Queue objects should only be shared between processes through inheritance
解決:用Manager來建立共享變量
eg:
lock=multiprocessing.Manager().Lock()
index=multiprocessing.Manager().Value('i',0)
q=multiprocessing.Manager().Queue()
           
import multiprocessing
m=multiprocessing.Manager()
q=m.Queue()
           

三.線程

使用threading庫

import threading
           

代碼模闆:

class MyThread(threading.Thread):
        def __init__(self,tname,q):
                threading.Thread.__init__(self)
                self.tname=tname
                self.q=q
        def run(self):
                print(self.tname,' start')
                crawl(self.tname,self.q)


def crawl(tname,q):
    global index
    while q.qsize()!=0:

        threadLock.acquire()
        index+=1
        tmp_index=index
        num=q.get()
        threadLock.release()

        print(tmp_index, tname, num)

        
         
def main():
    q=queue.Queue()
    for i in range(1,100):
        q.put(i)
        
    threads=[]
    thread_list=['crawler_1','crawler_2','crawler_3']
    for tname in thread_list:
        thread=MyThread(tname,q)
        thread.start()
        threads.append(thread)

    for t in threads:
        t.join()

index=0
threadLock = threading.Lock()
proxy_queue=queue.Queue()
main()
           

1.

threading.Lock() 控制不同程序間讀寫同一變量。