問題導讀:
- Process
- Process class
- Lock
- Semaphore
- Event
- Queue
- Pool
解決方案:
Process
#!/usr/bin/env python
# coding=utf8
import multiprocessing
import time
def sayHello(interval):
for i in range(5):
print 'The time is {0}'.format(time.ctime())
time.sleep(interval)
if __name__=='__main__':
p = multiprocessing.Process(target=sayHello, args=(3,))
p.start()
p1 = multiprocessing.Process(target=sayHello, args=(3,))
p1.start()
print 'pid:', p.pid, ' ', p1.pid
print 'name:' ,p.name, ' ', p1.name
print 'is_alive:', p.is_alive(), ' ', p1.is_alive()
Process Class
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
class SayHello(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
for i in range(4):
print 'time is {0} {1}'.format(time.ctime(),self.pid)
time.sleep(self.interval)
if __name__=='__main__':
for i in range(5):
p = SayHello(1)
p.start()
Lock
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def sayHello(lock, i):
# 擷取鎖
lock.acquire()
print 'start:{0}'.format(time.ctime())
try:
with open('./data.txt','a') as f:
f.write(time.ctime() + ' id:' + str(i) + '\n')
finally:
# 釋放鎖
lock.release()
time.sleep(1)
print 'end:{0}'.format(time.ctime())
if __name__=='__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=sayHello, args = (lock,3))
p2 = multiprocessing.Process(target=sayHello, args = (lock,4))
# 主程序結束,子線程結束
p1.daemon = True
p2.daemon = True
p1.start()
p2.start()
# 主程序會在結束之前檢查是否有子線程未完成
p1.join()
p2.join()
print 'end!'
Semaphore
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def sayHello(s, i):
# 擷取
s.acquire()
print multiprocessing.current_process().name + ' acquire'
time.sleep(i)
print multiprocessing.current_process().name + ' release'
# 釋放
s.release()
if __name__=='__main__':
# 控制共享資源的通路數量
s = multiprocessing.Semaphore(2)
for i in range(4):
p = multiprocessing.Process(target=sayHello, args=(s, i))
p.start()
Event
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def wait_for1(e):
print 'wait_for_1: starting'
# 等待1s
e.wait(1)
print 'wait_for_1:{0}'.format(str(e.is_set()))
def wait_for2(e):
print 'wait_for_2: starting'
# 等待5s
e.wait(5)
print 'wait_for_2:{0}'.format(str(e.is_set()))
if __name__=='__main__':
# 在程序之間傳遞狀态
e = multiprocessing.Event()
e1 = multiprocessing.Process(name = 'p1', target = wait_for1, args = (e,))
e2 = multiprocessing.Process(name = 'p2', target = wait_for2, args=(e,))
e1.start()
e2.start()
# 等待3s 之後設定狀态
time.sleep(3)
e.set()
print 'Event Set Ok!'
Queue
- Queue.qsize() 傳回隊列的大小
- Queue.empty() 如果隊列為空,傳回True,反之False
- Queue.full() 如果隊列滿了,傳回True,反之False
- Queue.get([block[, timeout]]) 擷取隊列,timeout等待時間
- Queue.get_nowait() 相當Queue.get(False)
- 非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
- Queue.put_nowait(item) 相當Queue.put(item, False)
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def writer_proc(q):
try:
for i in range(5):
time.sleep(1)
q.put(i,timeout = 2)
print 'put:',i
except:
pass
def reader_proc(q):
try:
for i in range(5):
j = q.get(timeout = 2)
print 'get:',j
except:
pass
if __name__=='__main__':
q = multiprocessing.Queue()
writer = multiprocessing.Process(target = writer_proc, args=(q,))
writer.start()
reader = multiprocessing.Process(target = reader_proc, args=(q,))
reader.start()
Pool
# coding: utf-8
import multiprocessing
import os, time, random
def Lee():
print "\nRun task Lee-%s" % (os.getpid()) # os.getpid()擷取目前的程序的ID
start = time.time()
time.sleep(random.random() * 10) # random.random()随機生成0-1之間的小數
end = time.time()
print 'Task Lee, runs %0.2f seconds.' % (end - start)
def Marlon():
print "\nRun task Marlon-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 40)
end = time.time()
print 'Task Marlon runs %0.2f seconds.' % (end - start)
def Allen():
print "\nRun task Allen-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print 'Task Allen runs %0.2f seconds.' % (end - start)
def Frank():
print "\nRun task Frank-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print 'Task Frank runs %0.2f seconds.' % (end - start)
if __name__ == '__main__':
function_list = [Lee, Marlon, Allen, Frank]
print "parent process %s" % (os.getpid())
pool = multiprocessing.Pool(4)
for func in function_list:
# apply 線程阻塞,執行線程代碼的時候,會阻塞主程序
pool.apply_async(func) # Pool執行函數,apply執行函數,當有一個程序執行完畢後,會添加一個新的程序到pool中
print 'Waiting for all subprocesses done...'
pool.close()
pool.join() # 調用join之前,一定要先調用close() 函數,否則會出錯, close()執行後不會有新的程序加入到pool,join函數等待素有子程序結束
print 'All subprocesses done.'