<b>15.1 multiprocessing</b>
multiprocessing是多程序子產品,多程序提供了任務并發性,能充分利用多核處理器。避免了gil(全局解釋鎖)對資源的影響。
有以下常用類:
類
<b>描述</b>
process(group=none, target=none, name=none, args=(), kwargs={})
派生一個程序對象,然後調用start()方法啟動
pool(processes=none, initializer=none, initargs=())
傳回一個程序池對象,processes程序池程序數量
pipe(duplex=true)
傳回兩個連接配接對象由管道連接配接
queue(maxsize=0)
傳回隊列對象,操作方法跟queue.queue一樣
multiprocessing.dummy
這個庫是用于實作多線程
process()類有以下些方法:
run()
start()
啟動程序對象
join([timeout])
等待子程序終止,才傳回結果。可選逾時。
name
程序名字
is_alive()
傳回程序是否存活
daemon
程序的守護标記,一個布爾值
pid
傳回程序id
exitcode
子程序退出狀态碼
terminate()
終止程序。在unix上使用sigterm信号,在windows上使用terminateprocess()。
pool()類有以下些方法:
apply(func, args=(), kwds={})
等效内建函數apply()
apply_async(func, args=(), kwds={}, callback=none)
異步,等效内建函數apply()
map(func, iterable, chunksize=none)
等效内建函數map()
map_async(func, iterable, chunksize=none, callback=none)
異步,等效内建函數map()
imap(func, iterable, chunksize=1)
等效内建函數itertools.imap()
imap_unordered(func, iterable, chunksize=1)
像imap()方法,但結果順序是任意的
close()
關閉程序池
終止工作程序,垃圾收集連接配接池對象
join()
等待工作程序退出。必須先調用close()或terminate()
pool.apply_async()和pool.map_aysnc()又提供了以下幾個方法:
get([timeout])
擷取結果對象裡的結果。如果逾時沒有,則抛出timeouterror異常
wait([timeout])
等待可用的結果或逾時
ready()
傳回調用是否已經完成
successful()
<b>部落格位址:http://lizhenliang.blog.51cto.com and https://yq.aliyun.com/u/lizhenliang</b>
qq群:323779636(shell/python運維開發群)
舉例:
1)簡單的例子,用子程序處理函數
from multiprocessing import process
import os
def worker(name):
print name
print 'parent process id:', os.getppid()
print 'process id:', os.getpid()
if __name__ == '__main__':
p = process(target=worker, args=('function worker.',))
p.start()
p.join()
print p.name
# python test.py
function worker.
parent process id: 9079
process id: 9080
process-1
process執行個體傳入worker函數作為派生程序執行的任務,用start()方法啟動這個執行個體。
2)加以說明join()方法
def worker(n):
print 'hello world', n
for n in range(5):
p = process(target=worker, args=(n,))
p.start()
p.join()
print 'child process id:', p.pid
print 'child process name:', p.name
parent process id: 9041
hello world 0
child process id: 9132
child process name: process-1
hello world 1
child process id: 9133
child process name: process-2
hello world 2
child process id: 9134
child process name: process-3
hello world 3
child process id: 9135
child process name: process-4
hello world 4
child process id: 9136
child process name: process-5
# 把p.join()注釋掉再執行
child process id: 9125
child process id: 9126
child process id: 9127
child process id: 9128
child process id: 9129
可以看出,在使用join()方法時,輸出的結果都是順序排列的。相反是亂序的。是以join()方法是堵塞父程序,要等待目前子程序執行完後才會繼續執行下一個子程序。否則會一直生成子程序去執行任務。
在要求輸出的情況下使用join()可保證每個結果是完整的。
3)給子程序命名,友善管理
import os, time
def worker1(n):
def worker2():
print 'worker2...'
for n in range(3):
p1 = process(name='worker1', target=worker1, args=(n,))
p1.start()
p1.join()
print 'child process id:', p1.pid
print 'child process name:', p1.name
p2 = process(name='worker2', target=worker2)
p2.start()
p2.join()
print 'child process id:', p2.pid
print 'child process name:', p2.name
child process id: 9248
child process name: worker1
child process id: 9249
child process id: 9250
worker2...
child process id: 9251
child process name: worker2
4)設定守護程序,父程序退出也不影響子程序運作
p1.daemon = true
p2 = process(target=worker2)
p2.daemon = false
5)使用程序池
#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import pool, current_process
import os, time, sys
print 'process name:', current_process().name # 擷取目前程序名字
time.sleep(1) # 休眠用于執行時有時間檢視目前執行的程序
p = pool(processes=3)
for i in range(8):
r = p.apply_async(worker, args=(i,))
r.get(timeout=5) # 擷取結果中的資料
p.close()
process name: poolworker-1
process name: poolworker-2
process name: poolworker-3
hello world 5
hello world 6
hello world 7
程序池生成了3個子程序,通過循環執行8次worker函數,程序池會從子程序1開始去處理任務,當到達最大程序時,會繼續從子程序1開始。
在運作此程式同時,再打開一個終端視窗會看到生成的子程序:
# ps -ef |grep python
root 40244 9041 4 16:43 pts/3 00:00:00 python test.py
root 40245 40244 0 16:43 pts/3 00:00:00 python test.py
root 40246 40244 0 16:43 pts/3 00:00:00 python test.py
root 40247 40244 0 16:43 pts/3 00:00:00 python test.py
6)程序池map()方法
map()方法是将序列中的元素通過函數處理傳回新清單。
from multiprocessing import pool
def worker(url):
return 'http://%s' % url
urls = ['www.baidu.com', 'www.jd.com']
p = pool(processes=2)
r = p.map(worker, urls)
p.close()
print r
['http://www.baidu.com', 'http://www.jd.com']
7)queue程序間通信
multiprocessing支援兩種類型程序間通信:queue和pipe。
queue庫已經封裝到multiprocessing庫中,在第十章 python常用标準庫已經講解到queue庫使用,有需要請檢視以前博文。
例如:一個子程序向隊列寫資料,一個子程序讀取隊列資料
from multiprocessing import process, queue
# 寫資料到隊列
def write(q):
q.put(n)
print 'put %s to queue.' % n
# 從隊列讀資料
def read(q):
while true:
if not q.empty():
value = q.get()
print 'get %s from queue.' % value
else:
break
q = queue()
pw = process(target=write, args=(q,))
pr = process(target=read, args=(q,))
pw.start()
pw.join()
pr.start()
pr.join()
put 0 to queue.
put 1 to queue.
put 2 to queue.
put 3 to queue.
put 4 to queue.
get 0 from queue.
get 1 from queue.
get 2 from queue.
get 3 from queue.
get 4 from queue.
8)pipe程序間通信
from multiprocessing import process, pipe
def f(conn):
conn.send([42, none, 'hello'])
conn.close()
parent_conn, child_conn = pipe()
p = process(target=f, args=(child_conn,))
print parent_conn.recv()
[42, none, 'hello']
pipe()建立兩個連接配接對象,每個連結對象都有send()和recv()方法,
9)程序間對象共享
manager類傳回一個管理對象,它控制服務端程序。提供一些共享方式:value()、array()、list()、dict()、event()等
建立manger對象存放資源,其他程序通過通路manager擷取。
from multiprocessing import process, manager
def f(v, a, l, d):
v.value = 100
a[0] = 123
l.append('hello')
d['a'] = 1
mgr = manager()
v = mgr.value('v', 0)
a = mgr.array('d', range(5))
l = mgr.list()
d = mgr.dict()
p = process(target=f, args=(v, a, l, d))
p.start()
p.join()
print(v)
print(a)
print(l)
print(d)
value('v', 100)
array('d', [123.0, 1.0, 2.0, 3.0, 4.0])
['hello']
{'a': 1}
10)寫一個多程序的例子
比如:多程序監控url是否正常
import urllib2
urls = [
'http://www.baidu.com',
'http://www.jd.com',
'http://www.sina.com',
'http://www.163.com',
]
def status_code(url):
print 'process name:', current_process().name
try:
req = urllib2.urlopen(url, timeout=5)
return req.getcode()
except urllib2.urlerror:
return
p = pool(processes=4)
for url in urls:
r = p.apply_async(status_code, args=(url,))
if r.get(timeout=5) == 200:
print "%s ok" %url
else:
print "%s no" %url
http://www.baidu.com ok
http://www.jd.com ok
http://www.sina.com ok
process name: poolworker-4
http://www.163.com ok
<b>15.2 threading</b>
threading子產品類似于multiprocessing多程序子產品,使用方法也基本一樣。threading庫是對thread庫進行二次封裝,我們主要用到thread類,用thread類派生線程對象。
1)使用thread類實作多線程
from threading import thread, current_thread
print 'thread name:', current_thread().name
for n in range(5):
t = thread(target=worker, args=(n, ))
t.start()
t.join() # 等待主程序結束
thread name: thread-1
thread name: thread-2
thread name: thread-3
thread name: thread-4
thread name: thread-5
2)還有一種方式繼承thread類實作多線程,子類可以重寫__init__和run()方法實作功能邏輯。
class test(thread):
# 重寫父類構造函數,那麼父類構造函數将不會執行
def __init__(self, n):
thread.__init__(self)
self.n = n
def run(self):
print 'thread name:', current_thread().name
print 'hello world', self.n
t = test(n)
t.start()
t.join()
3)lock
from threading import thread, lock, current_thread
lock = lock()
lock.acquire() # 擷取鎖
lock.release() # 釋放鎖
衆所周知,python多線程有gil全局鎖,意思是把每個線程執行代碼時都上了鎖,執行完成後會自動釋放gil鎖,意味着同一時間隻有一個線程在運作代碼。由于所有線程共享父程序記憶體、變量、資源,很容易多個線程對其操作,導緻内容混亂。
當你在寫多線程程式的時候如果輸出結果是混亂的,這時你應該考慮到在不使用鎖的情況下,多個線程運作時可能會修改原有的變量,導緻輸出不一樣。
由此看來python多線程是不能利用多核cpu提高處理性能,但在io密集情況下,還是能提高一定的并發性能。也不必擔心,多核cpu情況可以使用多程序實作多核任務。python多程序是複制父程序資源,互不影響,有各自獨立的gil鎖,保證資料不會混亂。能用多程序就用吧!