天天看點

程序池的使用 | 手把手教你入門Python之一百零九

上一篇: 隊列的使用 | 手把手教你入門Python之一百零八 下一篇: 簡單的HTTP伺服器搭建| 手把手教你入門Python之一百一十 本文來自于千鋒教育在阿裡雲開發者社群學習中心上線課程 《Python入門2020最新大課》 ,主講人姜偉。

程序池的使用

當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動态成生多個程序,但如果是上百甚至上千個目标,手動的去建立程序的工作量巨大,此時就可以用到multiprocessing子產品提供的Pool方法。

Pool

開啟過多的程序并不能提高你的效率,反而會降低你的效率,假設有500個任務,同時開啟500個程序,這500個程序除了不能一起執行之外(cpu沒有那麼多核),作業系統排程這500個程序,讓他們平均在4個或8個cpu上執行,這會占用很大的空間。

如果要啟動大量的子程序,可以用程序池的方式批量建立子程序:

def task(n):
    print('{}----->start'.format(n))
    time.sleep(1)
    print('{}------>end'.format(n))


if __name__ == '__main__':
    p = Pool(8)  # 建立程序池,并指定線程池的個數,預設是CPU的核數
    for i in range(1, 11):
        # p.apply(task, args=(i,)) # 同步執行任務,一個一個的執行任務,沒有并發效果
        p.apply_async(task, args=(i,)) # 異步執行任務,可以達到并發效果
    p.close()
    p.join()           

程序池擷取任務的執行結果:

def task(n):
    print('{}----->start'.format(n))
    time.sleep(1)
    print('{}------>end'.format(n))
    return n ** 2


if __name__ == '__main__':
    p = Pool(4)
    for i in range(1, 11):
        res = p.apply_async(task, args=(i,))  # res 是任務的執行結果
        print(res.get())  # 直接擷取結果的弊端是,多任務又變成同步的了
       p.close()
    # p.join()  不需要再join了,因為 res.get()本身就是一個阻塞方法           

異步擷取線程的執行結果:

import time
from multiprocessing.pool import Pool


def task(n):
    print('{}----->start'.format(n))
    time.sleep(1)
    print('{}------>end'.format(n))
    return n ** 2


if __name__ == '__main__':
    p = Pool(4)
    res_list = []
    for i in range(1, 11):
        res = p.apply_async(task, args=(i,))
        res_list.append(res)  # 使用清單來儲存程序執行結果
    for re in res_list: 
        print(re.get())
    p.close()           

初始化Pool時,可以指定一個最大程序數,當有新的請求送出到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,請看下面的執行個體:

from multiprocessing import Pool
import os, time, random


def worker(msg):
    t_start = time.time()
    print("%s開始執行,程序号為%d" % (msg, os.getpid()))
    # random.random()随機生成0~1之間的浮點數
    time.sleep(random.random() * 2)
    t_stop = time.time()
    print(msg, "執行完畢,耗時%0.2f" % (t_stop - t_start))


if __name__ == '__main__':
    po = Pool(3)  # 定義一個程序池,最大程序數3
    for i in range(0, 10):
        # Pool().apply_async(要調用的目标,(傳遞給目标的參數元祖,))
        # 每次循環将會用空閑出來的子程序去調用目标
        po.apply_async(worker, (i,))

    print("----start----")
    po.close()  # 關閉程序池,關閉後po不再接收新的請求
    po.join()  # 等待po中所有子程序執行完成,必須放在close語句之後
    print("-----end-----")           

運作效果:

----start----
0開始執行,程序号為21466
1開始執行,程序号為21468
2開始執行,程序号為21467
0 執行完畢,耗時1.01
3開始執行,程序号為21466
2 執行完畢,耗時1.24
4開始執行,程序号為21467
3 執行完畢,耗時0.56
5開始執行,程序号為21466
1 執行完畢,耗時1.68
6開始執行,程序号為21468
4 執行完畢,耗時0.67
7開始執行,程序号為21467
5 執行完畢,耗時0.83
8開始執行,程序号為21466
6 執行完畢,耗時0.75
9開始執行,程序号為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----           

multiprocessing.Pool常用函數解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(并行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),args為傳遞給func的參數清單,kwds為傳遞給func的關鍵字參數清單;
  • close():關閉Pool,使其不再接受新的任務;
  • terminate():不管任務是否完成,立即終止;
  • join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;

程序池中的Queue

如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:

RuntimeError: Queue objects should only be shared between processes through inheritance.           

下面的執行個體示範了程序池中的程序如何通信:

# 修改import中的Queue為Manager
from multiprocessing import Manager, Pool
import os, time, random


def reader(q):
    print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue擷取到消息:%s" % q.get(True))


def writer(q):
    print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in "helloworld":
        q.put(i)


if __name__ == "__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())           

運作結果:

(4171) start
writer啟動(4173),父程序為(4171)
reader啟動(4174),父程序為(4171)
reader從Queue擷取到消息:h
reader從Queue擷取到消息:e
reader從Queue擷取到消息:l
reader從Queue擷取到消息:l
reader從Queue擷取到消息:o
reader從Queue擷取到消息:w
reader從Queue擷取到消息:o
reader從Queue擷取到消息:r
reader從Queue擷取到消息:l
reader從Queue擷取到消息:d
(4171) End           

join方法的使用

# join 線程和程序都有join方法
import threading
import time

x = 10


def test(a, b):
    time.sleep(1)
    global x
    x = a + b


# test(1, 1)
# print(x)  # 2

t = threading.Thread(target=test, args=(1, 1))
t.start()
t.join()  # 讓主線程等待

print(x)  # 10           

配套視訊