天天看點

Python的GIL機制與多線程程式設計線程

GIL

全稱global interpreter lock 全局解釋鎖

gil使得python同一個時刻隻有一個線程在一個cpu上執行位元組碼,并且無法将多個線程映射到多個cpu上,即不能發揮多個cpu的優勢。

gil會根據執行的位元組碼行數以及時間片釋放gil,也會在遇到IO操作時候主動釋放。

線程

  作業系統能夠調動的最小單元就是線程。最開始是程序,因為程序對資源的消耗大,是以演變成了線程。

對于IO操作來說,多線程和多程序性能差别不大。

  • 方式一:通過thread類執行個體化
import threading
import time
def get_html(url):
    print('get html started')
    time.sleep(2)
    print('get html ended')
def get_url(url):
    print('get url started')
    time.sleep(2)
    print('get url ended')

get_html = threading.Thread(target=get_html, args=('url1',))
get_url = threading.Thread(target=get_url, args=('url2',))

if __name__ =='__main__':
    start_time = time.time()
    get_html.start()
    get_url.start()
    print(time.time() - start_time)
輸出結果:
get html started
get url started
0.0009999275207519531
get html ended
get url ended      

此處因為自定義了兩個線程,但是實際有三個線程,(還有一個主線程)因為直接線程.start()是非阻塞的,是以先會運作列印時間,然後再結束上面兩個線程。如果想要等上面兩個線程結束之後再執行主線程列印出時間話(即阻塞)可以有兩種方法

①線上程開始前加入語言:(隻要主線程結束之後就結束整個程式,Kill所有的子線程)

 get_html.setDaemon(True)

 get_url.setDaemon(True)

②線上程開始之後加入語言(将等待線程運作結束之後再往下繼續執行代碼):

get_html.join()

get_url.join()

  • 方式二:繼承threading.Thread類
import threading
import time
class GetHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
    def run(self):
        print('get html started')
        time.sleep(2)
        print('get html ended')

class GetUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
    def run(self):
        print('get url started')
        time.sleep(2)
        print('get url ended')

get_html = GetHtml('HTML')
get_url = GetUrl('URL')

if __name__ =='__main__':
    start_time =time.time()
    get_html.start()
    get_url.start()
    get_html.join()
    get_url.join()
    print(time.time() - start_time)
輸出結果:
get html started
get url started
get html ended
get url ended
2.0011143684387207      

 線程間的通信

  • 1 通過全局變量進行通信
import time
import threading
url_list = []
def get_html():
    global url_list
    url = url_list.pop()
    print('get html form {} started'.format(url))
    time.sleep(2)
    print('get html from {} ended'.format(url))

def get_url():
    global url_list
    print('get url started')
    time.sleep(2)
    for i in range(20):
        url_list.append('http://www.baidu.com/{id}'.format(id=i))
    print('get url ended')
if __name__ == '__main__':
    thread_url = threading.Thread(target=get_url)
    for i in range(10):
        thread_html = threading.Thread(target=get_html)
        thread_html.start()      

 上述代碼比較原始,不靈活,可以将全局變量url_list通過參數傳入函數調用

import time
import threading
url_list = []
def get_html(url_list):
    url = url_list.pop()
    print('get html form {} started'.format(url))
    time.sleep(1)
    print('get html from {} ended'.format(url))
def add_url(url_list):
    print('add url started')
    time.sleep(1)
    for i in range(20):
        url_list.append('http://www.baidu.com/{id}'.format(id=i))
    print('add url ended')
if __name__ == '__main__':
    thread_url = threading.Thread(target=add_url, args=(url_list,))
    thread_url.start()
    thread_url.join()
    for i in range(20):
        thread_html = threading.Thread(target=get_html, args=(url_list,))
        thread_html.start()      

  還有一種方式為建立一個py檔案,然後在檔案中定義一個變量,url_list = [] 然後開頭的時候用import導入這個變量即可。這種方式對于變量很多的情況下為避免混亂統一将變量進行管理。但是此方式一定要注意import的時候隻要import到檔案,而不要import到變量。(比如說檔案名為variables.python内定義一個變量名url_list=[],  需要import variables,然後代碼中用variables.url_list 而不是 from variables import url_list 因為後一種方式導入的話,在其他線程修改此變量的時候,我們是看不到的。但是第一種方式可以看到。

  總結:不管以何種形式共享全局變量,都不是線程安全的操作,是以為了達到線程安全,就需要用到線程鎖,lock的機制,代碼就會比較複雜,所有引入了一種安全的線程通信,from queue import Queue

  • 2用消息隊列Queue(推薦使用,Queue是線程安全的,不會沖突的)
import time
import threading
from queue import Queue
def get_html(queue):
    url = queue.get()
    print('get html form {} started'.format(url))
    time.sleep(1)
    print('get html from {} ended'.format(url))

def add_url(queue):
    print('add url started')
    time.sleep(1)
    for i in range(20):
        queue.put('http://www.baidu.com/{id}'.format(id=i))
    print('add url ended')
if __name__ == '__main__':
    url_queue = Queue(maxsize=1000) # 設定隊列中元素的max個數。
    thread_url = threading.Thread(target=add_url, args=(url_queue,))
    thread_url.start()
    thread_url.join()
    list1=[]
    for i in range(20):
        thread_html = threading.Thread(target=get_html, args=(url_queue,))
        list1.append(thread_html)
    for i in list1:
        i.start()      

 線程同步的問題:

 概念:

  線程的同步(即當有一個線程在對記憶體進行操作時,其他線程都不可以對這個記憶體位址進行操作,直到該線程完成操作, 其他線程才能對該記憶體位址進行操作,而其他線程又處于等待狀态)

  • 線程為什麼要同步?

問題:既然python有GIL機制,那麼線程就是安全的,那麼為什麼還有線程同步問題?

  回到上面GIL的介紹(gil會根據執行的位元組碼行數以及時間片釋放gil,也會在遇到IO操作時候主動釋放)

  再看一個經典的案列:如果GIL使線程絕對安全的話,那麼最後結果恒為0,事實卻不是這樣。

from threading import Thread
total = 0
def add():
    global total
    for i in range(1000000):
        total += 1
def desc():
    global total
    for i in range(1000000):
        total -= 1
thread1 = Thread(target=add)
thread2 = Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)

312064      

結果列印不穩定,都不會0,

線程同步的方法:

1:線程鎖機制 Lock

注意,鎖的擷取和釋放也需要時間,于是會對程式的運作性能産生一定的影響。而且極易造成死鎖,于是對應的可以将Lock改為Rlock,就可以支援同時多個acquire進入鎖,但是一定注意,Rlock隻在單線程内起作用,并且acquire次數要和release次數想等。

import threading
from threading import Lock
l = Lock()
a = 0
def add():
    global a
    global l
    l.acquire()
    for i in range(1000000):
        a += i
    l.release() # 記得線程段結束運作之後一定需要解鎖。不然其他程式就阻塞了。
def desc():
    global a
    global l
    l.acquire()
    for i in range(1000000):
        a -= i
    l.release()
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join() # 再次注意如果線程隻是start()沒有join()的話,那麼任意線程執行完了就會往下執行print語句,但是如果加了join的話,就會等thread1和thread2運作完之後在運作下面的語句。
thread2.join()
print(a)

輸出結果恒為0      

2:條件變量:condition

複雜的線程通訊的話lock機制已經不再适用,例如:

from threading import Condition, Thread, Lock
# 條件變量,用複雜的線程間的同步
lock = Lock()


class Tom(Thread):
    def __init__(self, lock):
        self.lock = lock
        super().__init__(name='Tom')

    def run(self):
        self.lock.acquire()
        print('{}: hello, Bob.'.format(self.name))
        self.lock.release()
        self.lock.acquire()
        print("{}: Let's have a chat.".format(self.name))
        self.lock.release()


class Bob(Thread):
    def __init__(self, lock):
        self.lock = lock
        super().__init__(name='Bob')

    def run(self):
        self.lock.acquire()
        print('{}: Hi, Tom.'.format(self.name))
        self.lock.release()
        self.lock.acquire()
        print("{}:Well, I like to talk to you.".format(self.name))
        self.lock.release()


tom = Tom(lock)
bob = Bob(lock)
tom.start()
bob.start()

Tom: hello, Bob.
Tom: Let's have a chat.
Bob: Hi, Tom.
Bob:Well, I like to talk to you.      

  為什麼會這樣?原因很簡單,Tom在start()的時候,還沒有來得及Bob start()之前就将所有的邏輯執行完了,其次,GIL切換的時候是根據時間片或者位元組碼行數來的,即也可能因為在時間片内将Tom執行完畢之後才切換到Bob。于是引入了條件變量機制,condition,

  看condition原代碼可以了解到,其內建了魔法方法__enter__ 和 __exit__于是可以用with語句調用,在__enter__方法中,調用了

def __enter__(self):
        return self._lock.__enter__()      

而__enter__() 方法則直接調用了acquire方法, 同時acquire其實就是調用了Rlock.acquire()方法。是以condition内部其實還是使用了Rlock方法來實作。同理__exit__則調用了Rlock.release()

重要方法 wait()和notify()

wait()允許我們等待某個條件變量的通知,而notify()方法則是發送一個通知。于是就可以修改上述代碼:

from threading import Condition, Thread, Lock
# 條件變量,用複雜的線程間的同步


class Tom(Thread):
    def __init__(self, condition):
        self.condition = condition
        super().__init__(name='Tom')

    def run(self):
        with self.condition:
            print('{}: hello, Bob.'.format(self.name))
            self.condition.notify()
            self.condition.wait()
            print("{}: Let's have a chat.".format(self.name))
            self.condition.notify()


class Bob(Thread):
    def __init__(self, condition):
        self.condition = condition
        super().__init__(name='Bob')

    def run(self):
        with self.condition:
            self.condition.wait()
            print('{}: Hi, Tom.'.format(self.name))
            self.condition.notify()
            self.condition.wait()
            print("{}:Well, I like to talk to you.".format(self.name))

if __name__ == '__main__':
    condition = Condition()
    tom = Tom(condition)
    bob = Bob(condition)

    bob.start()
    tom.start()      

  上述代碼注意:

  1. 開始順序很重要,因為wait()方法必須要notify()方法才能喚醒,如果先調用tom.start()的話,那麼當tom中的self.condition.notify()調用完畢之後,bob開沒有開始啟動,是以根本接受不到tom的信号,于是要先調用bob的wait()使其處于一個類似監聽狀态。
  2. 必須要使用with self.condition, 或者是self.condition.acquire()之後才能使用後面的wait()和notify()方法。
  3. 如果上面不是用with方法打開的self.condition那麼在代碼結束之後一定要記得self.condition.release()釋放鎖。

3:semaphore

用于控制進入某段代碼線程的數量,比如說做爬蟲的時候,在請求頁面的時候防止線程數量過多,短時間内請求頻繁被發現,可以使用semaphore來控制進入請求的線程數量。

from threading import Thread, Semaphore, Condition, Lock, RLock
import time
class GetHtml(Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem
    def run(self):
        time.sleep(2)
        print('get html successful.')
        self.sem.release() # 開啟之後記得要釋放。
class GetUrl(Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem
    def run(self):
        for i in range(20):
            self.sem.acquire() # 開啟semaphore
            get_html = GetHtml('www.baidu.com/{}'.format(i), self.sem)
            get_html.start()
if __name__ == '__main__':
    sem = Semaphore(3) # 接受一個參數,設定最大進入的線程數為3
    get_url = GetUrl(sem)
    get_url.start()      

線程池(比semaphore更加容易實作線程數量的控制)

from concurrent import futures

出了控制線程數量的其它功能:

  1. 主線程可以擷取某一個線程的狀态,以及傳回值。
  2. 當一個線程我完成的時候,我們可以立即知道。
  3. futures可以讓多線程可多程序的編碼接口一緻。多程序改多線程或者多線程改多程序代碼的時候,切換會非常平滑。 
  • 注意下代碼中的task1,task2都是線程池建立的一個Future對象,此對象的設計非常重要, Future可以看做是一個未來對象,或者說是一個線程的狀态收集容器,可以通過它的.done()檢視線程是否運作結束,也可以通過.result()檢視線程的傳回結果。
import time
from concurrent.futures import ThreadPoolExecutor
def get_html(times):
    time.sleep(times)
    print('get page{} success'.format(times))
    return times
excutor = ThreadPoolExecutor(max_workers=2)
task1 = excutor.submit(get_html, 3) #task1為一個Tuture類對象, submit方法是非阻塞的,立即傳回的。第二個參數為函數參數
tesk2 = excutor.submit(get_html, 2)

print(task1.done()) # 判斷函數是否執行成功

輸出結果:
False
get page2 success
get page3 success      

分析:因為submit方法是非阻塞的,立即傳回的。後面的print代碼不會等待task1運作結束。如果加入等待時間等待task1完成則将傳回True:

import time
from concurrent.futures import ThreadPoolExecutor
def get_html(times):
    time.sleep(times)
    print('get page{} success'.format(times))
    return times
excutor = ThreadPoolExecutor(max_workers=2)
task1 = excutor.submit(get_html, 3) #task1為一個futures類對象, submit方法是非阻塞的,立即傳回的。第二個參數為函數參數
tesk2 = excutor.submit(get_html, 2)

print(task1.done()) # 判斷函數是否執行成功
time.sleep(4)
print(task1.done())
輸出結果:
False
get page2 success
get page3 success
True      

代碼後面加入

print(task1.result()) # 用result()方法可以擷取到線程函數傳回的結果。      

可以用result()方法可以擷取到線程函數傳回的結果。

用代碼:print(task1.cancel())可以将task1在運作之前取消掉,如果取消成功則傳回True,反之False

import time
from concurrent.futures import ThreadPoolExecutor
def get_html(times):
    time.sleep(times)
    print('get page{} success'.format(times))
    return times
excutor = ThreadPoolExecutor(max_workers=1) # 将線程池數量改為1,讓tesk2先等待不執行,友善取消。

task1 = excutor.submit(get_html, 3) #task1為一個futures類對象, submit方法是非阻塞的,立即傳回的。第二個參數為函數參數
tesk2 = excutor.submit(get_html, 2)

print(task1.done()) # 判斷函數是否執行成功
print(tesk2.cancel())
time.sleep(4)
print(task1.done())
print(task1.result()) # 用result()方法可以擷取到線程函數傳回的結果。

輸出結果:(結果無get page 2 sucess)
False
True
get page3 success
True
3      

在某些情況下,要擷取已經成功的task的傳回值。

  • 方法一:需要用到as_complete
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_html(times):
    time.sleep(times)
    print('get page{} success'.format(times))
    return times
excutor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
all_task = [excutor.submit(get_html, url) for url in urls]
for futures in as_completed(all_task):
    data = futures.result()
    print('get {} page'.format(data))
輸出結果:
get page2 success
get 2 page
get page3 success
get 3 page
get page4 success
get 4 page      

代碼分析:可以看到因為excutor.submit()是非阻塞的,由列印結果可以看出,沒一個線程執行成功之後,as_complete()就會拿到其結果。

  • 方法二:用executor.map
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_html(times):
    time.sleep(times)
    print('get page{} success'.format(times))
    return times
excutor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
for data in excutor.map(get_html, urls):
    print('get {} page'.format(data))

結果:
get page2 success
get page3 success
get 3 page
get 2 page
get page4 success
get 4 page      

可以看到用excutor.map方法不是完成一個列印一個,而是按照參數清單中的順序,先get第一個參數結果,然後依次get,推薦可以使用第一種as_complete()方式。

wait方法使主線程阻塞

等待所有線程完成之後再往下走,wait()裡面也可以選擇參數return_when,預設是ALL_COMPLETE,如果為FIRST_COMPLETE(注意該參數需要在前面的import先導入)則第一個執行完成之後就會往下執行。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
def get_html(times):
    time.sleep(times)
    print('get page{} success'.format(times))
    return times
excutor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
all_task = [excutor.submit(get_html, url) for url in urls]
wait(all_task)
print('主線程結束')

列印結果:

get page2 success
get page3 success
get page4 success
主線程結束      

轉載于:https://www.cnblogs.com/yc3110/p/10459425.html