天天看點

python 多線程

線程介紹

什麼是線程

線程(Thread)也叫輕量級程序,是作業系統能夠進行運算排程的最小機關,它被包涵在程序之中,是程序中的實際運作機關。線程自己不擁有系統資源,隻擁有一點兒在運作中必不可少的資源,但它可與同屬一個程序的其它線程共享程序所擁有的全部資源。一個線程可以建立和撤消另一個線程,同一程序中的多個線程之間可以并發執行。

為什麼要使用多線程

線程在程式中是獨立的、并發的執行流。與分隔的程序相比,程序中線程之間的隔離程度要小,它們共享記憶體、檔案句柄和其他程序應有的狀态。

因為線程的劃分尺度小于程序,使得多線程程式的并發性高。程序在執行過程中擁有獨立的記憶體單元,而多個線程共享記憶體,進而極大地提高了程式的運作效率。

線程比程序具有更高的性能,這是由于同一個程序中的線程都有共性多個線程共享同一個程序的虛拟空間。線程共享的環境包括程序代碼段、程序的公有資料等,利用這些共享的資料,線程之間很容易實作通信。

作業系統在建立程序時,必須為該程序配置設定獨立的記憶體空間,并配置設定大量的相關資源,但建立線程則簡單得多。是以,使用多線程來實作并發比使用多程序的性能要高得多。

總結起來,使用多線程程式設計具有如下幾個優點:

  • 程序之間不能共享記憶體,但線程之間共享記憶體非常容易。
  • 作業系統在建立程序時,需要為該程序重新配置設定系統資源,但建立線程的代價則小得多。是以,使用多線程來實作多任務并發執行比使用多程序的效率高。
  • Python 語言内置了多線程功能支援,而不是單純地作為底層作業系統的排程方式,進而簡化了 Python 的多線程程式設計。

單線程

我想做菜,那麼其中的洗菜,炒菜等動作是先後進行的。

from time import ctime,sleep

def wash():
    for i in range(2):
        print "I was washing vegetables. %s" %ctime()
        sleep(1)

def cook():
    for i in range(2):
        print "I was cooking vegetables. %s" %ctime()
        sleep(5)

if __name__ == '__main__':
    wash()
    cook()
    print "all over %s" %ctime()      
$ I was washing vegetables. Tue Dec  1 17:52:37 2020
$ I was washing vegetables. Tue Dec  1 17:52:38 2020
$ I was cooking vegetables. Tue Dec  1 17:52:39 2020
$ I was cooking vegetables. Tue Dec  1 17:52:44 2020
$ all over Tue Dec  1 17:52:49 2020      

先洗菜,通過for循環來控制洗了兩次,每次需要1秒鐘,sleep()來控制洗菜的時長。接着我們開始炒菜,

每個菜需要5秒鐘,這裡炒了兩個。在整過程結束後,我通過

print "all over %s" %ctime()      

看了一下目前時間,執行結果如上。

其實上面的執行個體就是單線程,每件事必須先後的執行,不能同時執行。

多線程

在我們生活中,存在很多,我們可以吃着東西看着電影,騎着車,聽着音樂,這種并行的動作,

Python3 線程中常用的兩個子產品為:

  • _thread
  • threading(推薦使用)

thread 子產品已被廢棄。使用者可以使用 threading 子產品代替。是以,在 Python3 中不能再使用"thread" 子產品。為了相容性,Python3 将 thread 重命名為 "_thread"。

threading 子產品除了包含 _thread 子產品中的所有方法外,還提供的其他方法:

  • threading.currentThread(): 傳回目前的線程變量。
  • threading.enumerate(): 傳回一個包含正在運作的線程的list。正在運作指線程啟動後、結束前,不包括啟動前和終止後的線程。
  • threading.activeCount(): 傳回正在運作的線程數量,與len(threading.enumerate())有相同的結果。

除了使用方法外,線程子產品同樣提供了Thread類來處理線程,Thread類提供了以下方法:

  • run(): 用以表示線程活動的方法。
  • start():啟動線程活動。
  • join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者抛出未處理的異常-或者是可選的逾時發生。
  • isAlive(): 傳回線程是否活動的。
  • getName(): 傳回線程名。
  • setName(): 設定線程名。

是以這裡我們直接學習threading

threading子產品

普通建立方式

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)

if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=("t1",))
    t2 = threading.Thread(target=run, args=("t2",))
    t1.start()
    t2.start()

----------------------------------

>>> task t1
>>> task t2
>>> 2s
>>> 2s
>>> 1s
>>> 1s
>>> 0s
>>> 0s      

自定義線程

繼承threading.Thread來自定義線程類,其本質是重構Thread類中的run方法

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重構run函數必須要寫
        self.n = n

    def run(self):
        print("task", self.n)
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)

if __name__ == "__main__":
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    t1.start()
    t2.start()
    
----------------------------------

>>> task t1
>>> task t2
>>> 2s
>>> 2s
>>> 1s
>>> 1s
>>> 0s
>>> 0s      

守護線程

我們看下面這個例子,這裡使用setDaemon(True)把所有的子線程都變成了主線程的守護線程,是以當主程序結束後,子線程也會随之結束。是以當主線程結束後,整個程式就退出了。

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子線程停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)   #把子程序設定為守護線程,必須在start()之前設定
    t.start()
    print("end")
    
----------------------------------

>>> task t1
>>> end      

我們可以發現,設定守護線程之後,當主線程結束時,子線程也将立即結束,不再執行。

如果不設定守護線程

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子線程停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    # t.setDaemon(True)   #把子程序設定為守護線程,必須在start()之前設定
    t.start()
    print("end")

>>>  taskend
>>>   t1
>>>  3
>>>  2
>>>  1      

主線程等待子線程結束

為了讓守護線程執行結束之後,主線程再結束,我們可以使用join方法,讓主線程等待子線程執行。

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子線程停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)   #把子程序設定為守護線程,必須在start()之前設定
    t.start()
    t.join() # 設定主線程等待子線程結束
    print("end")

----------------------------------

>>> task t1
>>> 3
>>> 2
>>> 1
>>> end      

我們隻對上面的程式加了個join()方法,用于等待線程終止。join()的作用是,在子線程完成運作之前,這個子線程的父線程将一直被阻塞。

多線程共享全局變量

線程是程序的執行單元,程序是系統配置設定資源的最小機關,是以在同一個程序中的多線程是共享資源的。

  

import threading
import time

g_num = 100

def work1():
    global g_num
    for i in range(3):
        g_num += 1
    print("in work1 g_num is : %d" % g_num)

def work2():
    global g_num
    print("in work2 g_num is : %d" % g_num)

if __name__ == '__main__':
    t1 = threading.Thread(target=work1)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=work2)
    t2.start()

----------------------------------

>>> in work1 g_num is : 103
>>> in work2 g_num is : 103      

互斥鎖

由于線程之間是進行随機排程,并且每個線程可能隻執行n條執行之後,當多個線程同時修改同一條資料時可能會出現髒資料,是以,出現了線程鎖,即同一時刻允許一個線程執行操作。線程鎖用于鎖定資源,你可以定義多個鎖, 像下面的代碼, 當你需要獨占某一資源時,任何一個鎖都可以鎖這個資源,就好比你用不同的鎖都可以把相同的一個門鎖住是一個道理。

由于線程之間是進行随機排程,如果有多個線程同時操作一個對象,如果沒有很好地保護該對象,會造成程式結果的不可預期,我們也稱此為“線程不安全”。

為了方式上面情況的發生,就出現了互斥鎖(Lock)

import threading
import time

# 定義一個全局變量
g_num = 0
def test1(num):
    global g_num

    for i in range(num):

        mutex.acquire()  # 上鎖 注意了此時鎖的代碼越少越好
        g_num += 1
        mutex.release()  # 解鎖
        
    print("-----in test1 g_num=%d----" % g_num)


def test2(num):
    global g_num
    for i in range(num):
        mutex.acquire()  # 上鎖
        g_num += 1
        mutex.release()  # 解鎖
    print("-----in test2 g_num=%d=----" % g_num)


# 建立一個互斥鎖,預設是沒有上鎖的
mutex = threading.Lock()


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()

    # 等待上面的2個線程執行完畢....
    time.sleep(2)

    print("-----in main Thread g_num = %d---" % g_num)

if __name__ == "__main__":
    main()      

死鎖: 線上程間共享多個資源的時候,如果兩個線程分别占有一部分資源并且同時等待對方的資源,就會造成死鎖

在使用鎖的時候要注意不要寫出死鎖代碼,附死鎖代碼參考,總結一句就是互相持有對方線程所需要的鎖,造成死鎖

import threading

a = 100


def func1():
    global a
    for i in range(1000000):
        meta_A.acquire()  # 上鎖
        meta_B.acquire()  # 上多把鎖 産生了死鎖 看下面代碼
        print('-------------1')
        a += 1
        meta_B.release()
        meta_A.release()  # 釋放鎖
    print(a)


def func2():
    global a
    for i in range(1000000):
        meta_B.acquire()
        meta_A.acquire()
        print('------------2')
        a += 1
        meta_A.release()
        meta_B.release()
    print(a)


# 建立鎖
meta_A = threading.Lock()
meta_B = threading.Lock()


t1 = threading.Thread(target=func1) 
t2 = threading.Thread(target=func2)
t1.start()
t2.start()      

RLcok類的用法和Lock類一模一樣,但它支援嵌套,在多個鎖沒有釋放的時候一般會使用RLcok類。

RLock内部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,進而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

import threading

lock = threading.RLock()
num = 0


def sum1():
    lock.acquire()
    global num
    num += 1
    print("from sum1")
    lock.release()
    return num


def sum2():
    lock.acquire()
    res = sum1()
    lock.release()
    print(res)


for i in range(10):
    t = threading.Thread(target=sum2)
    t.start()      

信号量(BoundedSemaphore類)

互斥鎖同時隻允許一個線程更改資料,而Semaphore是同時允許一定數量的線程更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人隻能等裡面有人出來了才能再進去。

import threading
import time

def run(n, semaphore):
    semaphore.acquire()   #加鎖
    time.sleep(1)
    print("run the thread:%s\n" % n)
    semaphore.release()     #釋放

if __name__ == '__main__':
    num = 0
    semaphore = threading.BoundedSemaphore(5)  # 最多允許5個線程同時運作
    for i in range(22):
        t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
        t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')      

事件(Event類)

python線程的事件用于主線程控制其他線程的執行,事件是一個簡單的線程同步對象,其主要提供以下幾個方法:

  • clear 将flag設定為“False”
  • set 将flag設定為“True”
  • is_set 判斷是否設定了flag
  • wait 會一直監聽flag,如果沒有檢測到flag就一直處于阻塞狀态

事件處理的機制:全局定義了一個“Flag”,當flag值為“False”,那麼event.wait()就會阻塞,當flag值為“True”,那麼event.wait()便不再阻塞。

#利用Event類模拟紅綠燈
import threading
import time

event = threading.Event()


def lighter():
    count = 0
    event.set()     #初始值為綠燈
    while True:
        if 5 < count <=10 :
            event.clear()  # 紅燈,清除标志位
            print("\33[41;1mred light is on...\033[0m")
        elif count > 10:
            event.set()  # 綠燈,設定标志位
            count = 0
        else:
            print("\33[42;1mgreen light is on...\033[0m")

        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():      #判斷是否設定了标志位
            print("[%s] running..."%name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."%name)
            event.wait()
            print("[%s] green light is on,start going..."%name)

light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car,args=("MINI",))
car.start()      

GIL(Global Interpreter Lock)全局解釋器鎖

在非python環境中,單核情況下,同時隻能有一個任務執行。多核時可以支援多個線程同時執行。但是在python中,無論有多少核,同時隻能執行一個線程。究其原因,這就是由于GIL的存在導緻的。

GIL的全稱是Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,為了資料安全所做的決定。某個線程想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,并且在一個python程序中,GIL隻有一個。拿不到通行證的線程,就不允許進入CPU執行。GIL隻在cpython中才有,因為cpython調用的是c語言的原生線程,是以他不能直接操作cpu,隻能利用GIL保證同一時間隻能有一個線程拿到資料。而在pypy和jpython中是沒有GIL的。

Python多線程的工作過程:

python在使用多線程的時候,調用的是c語言的原生線程。

  • 拿到公共資料
  • 申請gil
  • python解釋器調用os原生線程
  • os操作cpu執行運算
  • 當該線程執行時間到後,無論運算是否已經執行完,gil都被要求釋放
  • 進而由其他程序重複上面的過程
  • 等其他程序執行完後,又會切換到之前的線程(從他記錄的上下文繼續執行),整個過程是每個線程執行自己的運算,當執行時間到就進行切換(context switch)。

python針對不同類型的代碼執行效率也是不同的:

1、CPU密集型代碼(各種循環處理、計算等等),在這種情況下,由于計算工作多,ticks計數很快就會達到門檻值,然後觸發GIL的釋放與再競争(多個線程來回切換當然是需要消耗資源的),是以python下的多線程對CPU密集型代碼并不友好。

2、IO密集型代碼(檔案處理、網絡爬蟲等涉及檔案讀寫的操作),多線程能夠有效提升效率(單線程下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多線程能線上程A等待時,自動切換到線程B,可以不浪費CPU的資源,進而能提升程式執行效率)。是以python的多線程對IO密集型代碼比較友好。

使用建議?

python下想要充分利用多核CPU,就用多程序。因為每個程序有各自獨立的GIL,互不幹擾,這樣就可以真正意義上的并行執行,在python中,多程序的執行效率優于多線程(僅僅針對多核CPU而言)。

GIL在python中的版本差異:

1、在python2.x裡,GIL的釋放邏輯是目前線程遇見IO操作或者ticks計數達到100時進行釋放。(ticks可以看作是python自身的一個計數器,專門做用于GIL,每次釋放後歸零,這個計數可以通過sys.setcheckinterval 來調整)。而每次釋放GIL鎖,線程進行鎖競争、切換線程,會消耗資源。并且由于GIL鎖存在,python裡一個程序永遠隻能同時執行一個線程(拿到GIL的線程才能執行),這就是為什麼在多核CPU上,python的多線程效率并不高。

2、在python3.x中,GIL不使用ticks計數,改為使用計時器(執行時間達到門檻值後,目前線程釋放GIL),這樣對CPU密集型程式更加友好,但依然沒有解決GIL導緻的同一時間隻能執行一個線程的問題,是以效率依然不盡如人意。

支付寶打賞!

python 多線程

您的資助是我最大的動力!

金額随意,歡迎來賞!

微信打賞!

python 多線程

如果,您認為閱讀這篇部落格讓您有些收獲,不妨點選一下右下角的【推薦】按鈕。

如果,您希望更容易地發現我的新部落格,不妨點選一下綠色通道的【關注我】。

如果,想給予我更多的鼓勵,求打

因為,我的寫作熱情也離不開您的肯定與支援,感謝您的閱讀,我是【Blue·Sky】!

【China-測試開發】技術交流群期待你的加入【 193056556 】

【歡迎掃碼關注:日益】微信訂閱号【 riyi18 】

python 多線程