天天看點

python - 多程序

問題導讀:

  1. Process
  2. Process class
  3. Lock
  4. Semaphore
  5. Event
  6. Queue
  7. Pool

解決方案:

Process

#!/usr/bin/env python
# coding=utf8
import multiprocessing
import time


def sayHello(interval):
    for i in range(5):
        print 'The time is {0}'.format(time.ctime())
        time.sleep(interval)

if __name__=='__main__':
    p = multiprocessing.Process(target=sayHello, args=(3,))
    p.start()

    p1 = multiprocessing.Process(target=sayHello, args=(3,))
    p1.start()

    print 'pid:', p.pid, ' ', p1.pid
    print 'name:' ,p.name, ' ', p1.name
    print 'is_alive:', p.is_alive(), ' ', p1.is_alive()      

Process Class

#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

class SayHello(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        for i in range(4):
            print 'time is {0} {1}'.format(time.ctime(),self.pid)
            time.sleep(self.interval)

if __name__=='__main__':
    for i in range(5):
        p = SayHello(1)
        p.start()      

Lock

#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def sayHello(lock, i):
    # 擷取鎖
    lock.acquire()
    print 'start:{0}'.format(time.ctime())
    try:
        with open('./data.txt','a') as f:
            f.write(time.ctime() + ' id:' + str(i) + '\n')
    finally:
        # 釋放鎖
        lock.release()
    time.sleep(1)
    print 'end:{0}'.format(time.ctime())

if __name__=='__main__':
    lock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=sayHello, args = (lock,3))
    p2 = multiprocessing.Process(target=sayHello, args = (lock,4))
    # 主程序結束,子線程結束
    p1.daemon = True
    p2.daemon = True
    p1.start()
    p2.start()
    # 主程序會在結束之前檢查是否有子線程未完成
    p1.join()
    p2.join()
    print 'end!'      

Semaphore

#!/usr/bin/env python
# coding=utf-8

import multiprocessing
import time

def sayHello(s, i):
    # 擷取
    s.acquire()
    print multiprocessing.current_process().name + ' acquire'
    time.sleep(i)
    print multiprocessing.current_process().name + ' release'
    # 釋放
    s.release()
if __name__=='__main__':
    # 控制共享資源的通路數量
    s = multiprocessing.Semaphore(2)
    for i in range(4):
        p = multiprocessing.Process(target=sayHello, args=(s, i))
        p.start()      

Event

#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

def wait_for1(e):
    print 'wait_for_1: starting'
    # 等待1s
    e.wait(1)
    print 'wait_for_1:{0}'.format(str(e.is_set()))

def wait_for2(e):
    print 'wait_for_2: starting'
    # 等待5s 
    e.wait(5)
    print 'wait_for_2:{0}'.format(str(e.is_set()))
if __name__=='__main__':
    # 在程序之間傳遞狀态
    e = multiprocessing.Event()

    e1 = multiprocessing.Process(name = 'p1', target = wait_for1, args = (e,))
    e2 = multiprocessing.Process(name = 'p2', target = wait_for2, args=(e,))
    
    e1.start()
    e2.start()

    # 等待3s 之後設定狀态
    time.sleep(3)
    e.set()

    print 'Event Set Ok!'      

Queue

  1. Queue.qsize() 傳回隊列的大小
  2. Queue.empty() 如果隊列為空,傳回True,反之False
  3. Queue.full() 如果隊列滿了,傳回True,反之False
  4. Queue.get([block[, timeout]]) 擷取隊列,timeout等待時間
  5. Queue.get_nowait() 相當Queue.get(False)
  6. 非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
  7. Queue.put_nowait(item) 相當Queue.put(item, False)
#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

def writer_proc(q):
    try:
        for i in range(5):
            time.sleep(1)            
            q.put(i,timeout = 2)
            print 'put:',i
    except:
        pass
def reader_proc(q):
    try:
        for i in range(5):
            j = q.get(timeout = 2)
            print 'get:',j
    except:
        pass
if __name__=='__main__':
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target = writer_proc, args=(q,))
    writer.start()

    reader = multiprocessing.Process(target = reader_proc, args=(q,))
    reader.start()      

Pool

# coding: utf-8
import multiprocessing
import os, time, random


def Lee():
    print "\nRun task Lee-%s" % (os.getpid())  # os.getpid()擷取目前的程序的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()随機生成0-1之間的小數
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' % (end - start)


def Marlon():
    print "\nRun task Marlon-%s" % (os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print 'Task Marlon runs %0.2f seconds.' % (end - start)


def Allen():
    print "\nRun task Allen-%s" % (os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' % (end - start)


def Frank():
    print "\nRun task Frank-%s" % (os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' % (end - start)


if __name__ == '__main__':
    function_list = [Lee, Marlon, Allen, Frank]
    print "parent process %s" % (os.getpid())

    pool = multiprocessing.Pool(4)
    for func in function_list:
        # apply 線程阻塞,執行線程代碼的時候,會阻塞主程序
        pool.apply_async(func)  # Pool執行函數,apply執行函數,當有一個程序執行完畢後,會添加一個新的程序到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()  # 調用join之前,一定要先調用close() 函數,否則會出錯, close()執行後不會有新的程序加入到pool,join函數等待素有子程序結束
    print 'All subprocesses done.'