天天看點

python多線程程式設計

Python多線程程式設計中常用方法:

1、join()方法:如果一個線程或者在函數執行的過程中調用另一個線程,并且希望待其完成操作後才能執行,那麼在調用線程的時就可以使用被調線程的join方法join([timeout]) timeout:可選參數,線程運作的最長時間

2、isAlive()方法:檢視線程是否還在運作

3、getName()方法:獲得線程名

4、setDaemon()方法:主線程退出時,需要子線程随主線程退出,則設定子線程的setDaemon()

Python線程同步:

(1)Thread的Lock和RLock實作簡單的線程同步:

import threading
import time
class mythread(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self,name=threadname)
    def run(self):
        global x
        lock.acquire()
        for i in range(3):
            x = x+1
        time.sleep(1)
        print x
        lock.release()

if __name__ == '__main__':
    lock = threading.RLock()
    t1 = []
    for i in range(10):
        t = mythread(str(i))
        t1.append(t)
    x = 0
    for i in t1:
        i.start()      

(2)使用條件變量保持線程同步:

# coding=utf-8
import threading

class Producer(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self,name=threadname)
    def run(self):
        global x
        con.acquire()
        if x == 10000:
            con.wait() 
            pass
        else:
            for i in range(10000):
                x = x+1
                con.notify()
        print x
        con.release()

class Consumer(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self,name=threadname)
    def run(self):
        global x
        con.acquire()
        if x == 0:
            con.wait()
            pass
        else:
            for i in range(10000):
                x = x-1
            con.notify()
        print x
        con.release()

if __name__ == '__main__':
    con = threading.Condition()
    x = 0
    p = Producer('Producer')
    c = Consumer('Consumer')
    p.start()
    c.start()
    p.join()
    c.join()
    print x      

(3)使用隊列保持線程同步:

# coding=utf-8
import threading
import Queue
import time
import random

class Producer(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self,name=threadname)
    def run(self):
        global     queue
        i = random.randint(1,5)
        queue.put(i)
        print self.getName(),' put %d to queue' %(i)
        time.sleep(1)

class Consumer(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self,name=threadname)
    def run(self):
        global     queue
        item = queue.get()
        print self.getName(),' get %d from queue' %(item)
        time.sleep(1)

if __name__ == '__main__':
    queue = Queue.Queue()
    plist = []
    clist = []
    for i in range(3):
        p = Producer('Producer'+str(i))
        plist.append(p)
    for j in range(3):
        c = Consumer('Consumer'+str(j))
        clist.append(c)
    for pt in plist:
        pt.start()
        pt.join()
    for ct in clist:
        ct.start()
        ct.join()      

生産者消費者模式的另一種實作:

# coding=utf-8
import time
import threading
import Queue

class Consumer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self._queue = queue

    def run(self):
        while True:
            # queue.get() blocks the current thread until an item is retrieved.
            msg = self._queue.get()
            # Checks if the current message is the "quit"
            if isinstance(msg, str) and msg == 'quit':
                # if so, exists the loop
                break
            # "Processes" (or in our case, prints) the queue item
            print "I'm a thread, and I received %s!!" % msg
        # Always be friendly!
        print 'Bye byes!'

class Producer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self._queue = queue

    def run(self):
        # variable to keep track of when we started
        start_time = time.time()
        # While under 5 seconds..
        while time.time() - start_time < 5:
            # "Produce" a piece of work and stick it in the queue for the Consumer to process
            self._queue.put('something at %s' % time.time())
            # Sleep a bit just to avoid an absurd number of messages
            time.sleep(1)
        # This the "quit" message of killing a thread.
        self._queue.put('quit')

if __name__ == '__main__':
    queue = Queue.Queue()
    consumer = Consumer(queue)
    consumer.start()
    producer1 = Producer(queue)
    producer1.start()      

使用線程池(Thread pool)+同步隊列(Queue)的實作方式:

# A more realistic thread pool example
# coding=utf-8
import time 
import threading 
import Queue 
import urllib2 

class Consumer(threading.Thread): 
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self._queue = queue 
 
    def run(self):
        while True: 
            content = self._queue.get() 
            if isinstance(content, str) and content == 'quit':
                break
            response = urllib2.urlopen(content)
        print 'Bye byes!'
 
def Producer():
    urls = [
        'http://www.python.org', 'http://www.yahoo.com'
        'http://www.scala.org', 'http://cn.bing.com'
        # etc.. 
    ]
    queue = Queue.Queue()
    worker_threads = build_worker_pool(queue, 4)
    start_time = time.time()
    # Add the urls to process
    for url in urls: 
        queue.put(url)  
    # Add the 'quit' message
    for worker in worker_threads:
        queue.put('quit')
    for worker in worker_threads:
        worker.join()
 
    print 'Done! Time taken: {}'.format(time.time() - start_time)
 
def build_worker_pool(queue, size):
    workers = []
    for _ in range(size):
        worker = Consumer(queue)
        worker.start() 
        workers.append(worker)
    return workers
 
if __name__ == '__main__':
    Producer()      

另一個使用線程池+Map的實作:

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 
 
urls = [
    'http://www.python.org', 
    'http://www.python.org/about/',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/community/'
    ]
 
# Make the Pool of workers
pool = ThreadPool(4) 
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish 
pool.close() 
pool.join()      

參考:

http://blog.jobbole.com/58700/

作者:阿凡盧

出處:http://www.cnblogs.com/luxiaoxun/

本文版權歸作者所有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。