天天看點

第十五章 Python多程序與多線程

<b>15.1 multiprocessing</b>

multiprocessing是多程序子產品,多程序提供了任務并發性,能充分利用多核處理器。避免了gil(全局解釋鎖)對資源的影響。

有以下常用類:

<b>描述</b>

process(group=none, target=none, name=none, args=(), kwargs={})

派生一個程序對象,然後調用start()方法啟動

pool(processes=none, initializer=none, initargs=())

傳回一個程序池對象,processes程序池程序數量

pipe(duplex=true)

傳回兩個連接配接對象由管道連接配接

queue(maxsize=0)

傳回隊列對象,操作方法跟queue.queue一樣

multiprocessing.dummy

這個庫是用于實作多線程

process()類有以下些方法:

run()

start()

啟動程序對象

join([timeout])

等待子程序終止,才傳回結果。可選逾時。

name

程序名字

is_alive()

傳回程序是否存活

daemon

程序的守護标記,一個布爾值

pid

傳回程序id

exitcode

子程序退出狀态碼

terminate()

終止程序。在unix上使用sigterm信号,在windows上使用terminateprocess()。

pool()類有以下些方法:

apply(func, args=(), kwds={})

等效内建函數apply()

apply_async(func, args=(), kwds={}, callback=none)

異步,等效内建函數apply()

map(func, iterable, chunksize=none)

等效内建函數map()

map_async(func, iterable, chunksize=none, callback=none)

異步,等效内建函數map()

imap(func, iterable, chunksize=1)

等效内建函數itertools.imap()

imap_unordered(func, iterable, chunksize=1)

像imap()方法,但結果順序是任意的

close()

關閉程序池

終止工作程序,垃圾收集連接配接池對象

join()

等待工作程序退出。必須先調用close()或terminate()

pool.apply_async()和pool.map_aysnc()又提供了以下幾個方法:

get([timeout])

擷取結果對象裡的結果。如果逾時沒有,則抛出timeouterror異常

wait([timeout])

等待可用的結果或逾時

ready()

傳回調用是否已經完成

successful()

<b>部落格位址:http://lizhenliang.blog.51cto.com and https://yq.aliyun.com/u/lizhenliang</b>

qq群:323779636(shell/python運維開發群)

舉例:

1)簡單的例子,用子程序處理函數

from multiprocessing import process

import os

def worker(name):

    print name

    print 'parent process id:', os.getppid()

    print 'process id:', os.getpid()

if __name__ == '__main__':

    p = process(target=worker, args=('function worker.',))

    p.start()

    p.join()

    print p.name

# python test.py

function worker.

parent process id: 9079

process id: 9080

process-1

process執行個體傳入worker函數作為派生程序執行的任務,用start()方法啟動這個執行個體。

2)加以說明join()方法

def worker(n):

    print 'hello world', n

    for n in range(5):

        p = process(target=worker, args=(n,))

        p.start()

        p.join()

        print 'child process id:', p.pid

        print 'child process name:', p.name

parent process id: 9041

hello world 0

child process id: 9132

child process name: process-1

hello world 1

child process id: 9133

child process name: process-2

hello world 2

child process id: 9134

child process name: process-3

hello world 3

child process id: 9135

child process name: process-4

hello world 4

child process id: 9136

child process name: process-5

# 把p.join()注釋掉再執行

child process id: 9125

child process id: 9126

child process id: 9127

child process id: 9128

child process id: 9129

可以看出,在使用join()方法時,輸出的結果都是順序排列的。相反是亂序的。是以join()方法是堵塞父程序,要等待目前子程序執行完後才會繼續執行下一個子程序。否則會一直生成子程序去執行任務。

在要求輸出的情況下使用join()可保證每個結果是完整的。

3)給子程序命名,友善管理

import os, time

def worker1(n):

def worker2():

    print 'worker2...'

    for n in range(3):

        p1 = process(name='worker1', target=worker1, args=(n,))

        p1.start()

        p1.join()

        print 'child process id:', p1.pid

        print 'child process name:', p1.name

    p2 = process(name='worker2', target=worker2)

    p2.start()

    p2.join()

    print 'child process id:', p2.pid

    print 'child process name:', p2.name

child process id: 9248

child process name: worker1

child process id: 9249

child process id: 9250

worker2...

child process id: 9251

child process name: worker2

4)設定守護程序,父程序退出也不影響子程序運作

        p1.daemon = true

    p2 = process(target=worker2)

    p2.daemon = false

5)使用程序池

#!/usr/bin/python

# -*- coding: utf-8 -*-

from multiprocessing import pool, current_process

import os, time, sys

    print 'process name:', current_process().name  # 擷取目前程序名字

    time.sleep(1)    # 休眠用于執行時有時間檢視目前執行的程序

    p = pool(processes=3)

    for i in range(8):

        r = p.apply_async(worker, args=(i,))

        r.get(timeout=5)  # 擷取結果中的資料

    p.close()

process name: poolworker-1

process name: poolworker-2

process name: poolworker-3

hello world 5

hello world 6

hello world 7

程序池生成了3個子程序,通過循環執行8次worker函數,程序池會從子程序1開始去處理任務,當到達最大程序時,會繼續從子程序1開始。

在運作此程式同時,再打開一個終端視窗會看到生成的子程序:

# ps -ef |grep python

root      40244   9041  4 16:43 pts/3    00:00:00 python test.py

root      40245  40244  0 16:43 pts/3    00:00:00 python test.py

root      40246  40244  0 16:43 pts/3    00:00:00 python test.py

root      40247  40244  0 16:43 pts/3    00:00:00 python test.py

6)程序池map()方法

map()方法是将序列中的元素通過函數處理傳回新清單。

from multiprocessing import pool

def worker(url):

    return 'http://%s' % url

urls = ['www.baidu.com', 'www.jd.com']

p = pool(processes=2)

r = p.map(worker, urls)

p.close()

print r

['http://www.baidu.com', 'http://www.jd.com']

7)queue程序間通信

multiprocessing支援兩種類型程序間通信:queue和pipe。

queue庫已經封裝到multiprocessing庫中,在第十章 python常用标準庫已經講解到queue庫使用,有需要請檢視以前博文。

例如:一個子程序向隊列寫資料,一個子程序讀取隊列資料

from multiprocessing import process, queue

# 寫資料到隊列

def write(q):

        q.put(n)

        print 'put %s to queue.' % n

# 從隊列讀資料

def read(q):

    while true:

        if not q.empty():

            value = q.get()

            print 'get %s from queue.' % value

        else:

            break

    q = queue()

    pw = process(target=write, args=(q,))

    pr = process(target=read, args=(q,))   

    pw.start()

    pw.join()

    pr.start()

    pr.join()

put 0 to queue.

put 1 to queue.

put 2 to queue.

put 3 to queue.

put 4 to queue.

get 0 from queue.

get 1 from queue.

get 2 from queue.

get 3 from queue.

get 4 from queue.

8)pipe程序間通信

from multiprocessing import process, pipe

def f(conn):

    conn.send([42, none, 'hello'])

    conn.close()

    parent_conn, child_conn = pipe()

    p = process(target=f, args=(child_conn,))

    print parent_conn.recv() 

[42, none, 'hello']

pipe()建立兩個連接配接對象,每個連結對象都有send()和recv()方法,

9)程序間對象共享

manager類傳回一個管理對象,它控制服務端程序。提供一些共享方式:value()、array()、list()、dict()、event()等

建立manger對象存放資源,其他程序通過通路manager擷取。

from multiprocessing import process, manager

def f(v, a, l, d):

    v.value = 100

    a[0] = 123

    l.append('hello')

    d['a'] = 1

mgr = manager()

v = mgr.value('v', 0)

a = mgr.array('d', range(5))

l = mgr.list()

d = mgr.dict()

p = process(target=f, args=(v, a, l, d))

p.start()

p.join()

print(v)

print(a)

print(l)

print(d)

value('v', 100)

array('d', [123.0, 1.0, 2.0, 3.0, 4.0])

['hello']

{'a': 1}

10)寫一個多程序的例子

比如:多程序監控url是否正常

import urllib2

urls = [

    'http://www.baidu.com',

    'http://www.jd.com',

    'http://www.sina.com',

    'http://www.163.com',

]

def status_code(url):

    print 'process name:', current_process().name

    try:

        req = urllib2.urlopen(url, timeout=5)

        return req.getcode()

    except urllib2.urlerror:

        return

p = pool(processes=4)

for url in urls:

    r = p.apply_async(status_code, args=(url,))

    if r.get(timeout=5) == 200:

        print "%s ok" %url

    else:

        print "%s no" %url

http://www.baidu.com ok

http://www.jd.com ok

http://www.sina.com ok

process name: poolworker-4

http://www.163.com ok

<b>15.2 threading</b>

threading子產品類似于multiprocessing多程序子產品,使用方法也基本一樣。threading庫是對thread庫進行二次封裝,我們主要用到thread類,用thread類派生線程對象。

1)使用thread類實作多線程

from threading import thread, current_thread

    print 'thread name:', current_thread().name

for n in range(5):

    t = thread(target=worker, args=(n, ))

    t.start()

    t.join()  # 等待主程序結束

thread name: thread-1

thread name: thread-2

thread name: thread-3

thread name: thread-4

thread name: thread-5

2)還有一種方式繼承thread類實作多線程,子類可以重寫__init__和run()方法實作功能邏輯。

class test(thread):

    # 重寫父類構造函數,那麼父類構造函數将不會執行

    def __init__(self, n):

        thread.__init__(self)

        self.n = n

    def run(self):

        print 'thread name:', current_thread().name

        print 'hello world', self.n

        t = test(n)

        t.start()

        t.join()

3)lock

from threading import thread, lock, current_thread

lock = lock()

        lock.acquire()  # 擷取鎖

        lock.release()  # 釋放鎖

衆所周知,python多線程有gil全局鎖,意思是把每個線程執行代碼時都上了鎖,執行完成後會自動釋放gil鎖,意味着同一時間隻有一個線程在運作代碼。由于所有線程共享父程序記憶體、變量、資源,很容易多個線程對其操作,導緻内容混亂。

當你在寫多線程程式的時候如果輸出結果是混亂的,這時你應該考慮到在不使用鎖的情況下,多個線程運作時可能會修改原有的變量,導緻輸出不一樣。

由此看來python多線程是不能利用多核cpu提高處理性能,但在io密集情況下,還是能提高一定的并發性能。也不必擔心,多核cpu情況可以使用多程序實作多核任務。python多程序是複制父程序資源,互不影響,有各自獨立的gil鎖,保證資料不會混亂。能用多程序就用吧!