天天看點

python——多任務多線程用法

1. 線程的介紹

在Python中,想要實作多任務除了使用程序,還可以使用線程來完成,線程是實作多任務的另外一種方式。

2. 線程的概念

線程是程序中執行代碼的一個分支,每個執行分支(線程)要想工作執行代碼需要cpu進行排程 ,也就是說線程是cpu排程的基本機關,每個程序至少都有一個線程,而這個線程就是我們通常說的主線程。 線程就是在程式運作過程中,執行程式代碼的一個分支,每個運作的程式至少都有一個線程 (主線程)

python的thread子產品是比較底層的子產品,python的threading子產品是對thread做了一些包裝的,可以更加友善的被使用

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-rItXkI4B-1577763245505)(assets/線程.png)]

  1. python中threading子產品的使用
'''
導入線程子產品
import threading
建立子線程并指定執行的任務
sub_thread = threading.Thread(target=任務名)
啟動線程執行任務
sub_thread.start()
'''
import threading
import time

# 唱歌任務
def sing():
    # 擴充: 擷取目前線程
    # print("sing目前執行的線程為:", threading.current_thread())
    for i in range(3):
        print("正在唱歌...%d" % i)
        time.sleep(1)

# 跳舞任務
def dance():
    # 擴充: 擷取目前線程
    # print("dance目前執行的線程為:", threading.current_thread())
    for i in range(3):
        print("正在跳舞...%d" % i)
        time.sleep(1)


if __name__ == '__main__':
    # 擴充: 擷取目前線程
    # print("目前執行的線程為:", threading.current_thread())
    # 建立唱歌的線程
    # target: 線程執行的函數名
    sing_thread = threading.Thread(target=sing)

    # 建立跳舞的線程
    dance_thread = threading.Thread(target=dance)

    # 開啟線程
    sing_thread.start()
    dance_thread.start()      

4.線程類Thread參數說明

Thread([group [, target [, name [, args [, kwargs]]]]])

-   group: 線程組,目前隻能使用None
-   target: 執行的目标任務名
-   args: 以元組的方式給執行任務傳參
-   kwargs: 以字典方式給執行任務傳參
-   name: 線程名,一般不用設定      

5.多線程執行帶有參數的任務

import threading
import time

# 唱歌任務
def sing(num):
    # 擴充: 擷取目前線程
    # print("sing目前執行的線程為:", threading.current_thread())
    for i in range(num):
        print("正在唱歌...%d" % i)
        time.sleep(1)

# 跳舞任務
def dance(num):
    # 擴充: 擷取目前線程
    # print("dance目前執行的線程為:", threading.current_thread())
    for i in range(num):
        print("正在跳舞...%d" % i)
        time.sleep(1)


if __name__ == '__main__':
    # 擴充: 擷取目前線程
    # print("目前執行的線程為:", threading.current_thread())
    # target: 線程執行的函數名
    # args: 表示以元組的方式給函數傳參
    # kwargs: 表示以字典的方式給函數傳參
    sing_thread = threading.Thread(target=sing, args=(3, ))

    # 建立跳舞的線程
    dance_thread = threading.Thread(target=dance, kwargs={"num": 3})

    # 開啟線程
    sing_thread.start()
    dance_thread.start()      

6.檢視擷取線程清單

import threading
import time

# 唱歌
def sing():
    # 擴充:-擷取目前執行代碼的線程
    print("sing:", threading.current_thread())
    for i in range(5):
        print("唱歌")
        time.sleep(0.2)

# 跳舞
def dance():
    # 擴充:-擷取目前執行代碼的線程
    print("dance:", threading.current_thread())
    for i in range(5):
        print("跳舞")
        time.sleep(0.2)


if __name__ == '__main__':

    # 擴充:-擷取目前執行代碼的線程
    print("main:", threading.current_thread())

    # 擷取目前程式活動線程的清單
    thread_list = threading.enumerate()
    print("111:", thread_list, len(thread_list))

    # 建立唱歌線程, 表示建立的子線程執行唱歌任務
    sing_thread = threading.Thread(target=sing)
    # 建立跳舞的線程, 表示建立的子線程執行跳舞任務
    dance_thread = threading.Thread(target=dance)

    thread_list = threading.enumerate()
    print("222:", thread_list, len(thread_list))

    # 啟動線程,執行對應的任務
    sing_thread.start()
    # 啟動線程,執行對應的任務 
    dance_thread.start()
    提示:隻有線程啟動了,才能加入到活動線程清單中
    thread_list = threading.enumerate()
    print("333:", thread_list, len(thread_list))      
  1. 通過繼承threading.Thread類,重寫run方法
  1. 線程執行代碼的封裝

    通過使用threading子產品能完成多任務的程式開發,為了讓每個線程的封裝性更完美,是以使用threading子產品時,往往會定義一個新的子類class,隻要繼承threading.Thread就可以了,然後重寫run方法

  2. python的threading.Thread類有一個run方法,用于定義線程的功能函數,可以在自己的線程類中覆寫該方法。而建立自己的線程執行個體後,通過Thread類的start方法,可以啟動該線程,交給python解釋器進行排程,當該線程獲得執行的機會時,就會調用run方法執行線程。
  3. 當線程的run()方法結束時該線程完成。

示例如下:

#coding=utf-8
import threading
import time

class MyThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I'm "+self.name+' @ '+str(i) #name屬性中儲存的是目前線程的名字
            print(msg)


if __name__ == '__main__':
    t = MyThread()
    t.start()      

8.多線程的執行順序

  • 線程之間執行是無序的,它是由cpu排程決定的 ,cpu排程哪個線程,哪個線程就先執行,沒有排程的線程不能執行。
  • 程序之間執行也是無序的,它是由作業系統排程決定的,作業系統排程哪個程序,哪個程序就先執行,沒有排程的程序不能執行。
  • 每個線程預設有一個名字,盡管上面的例子中沒有指定線程對象的name,但是python會自動為線程指定一個名字。
  • 無法控制線程排程程式,但可以通過别的方式來影響線程排程的方式。
#coding=utf-8
import threading
import time

class MyThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I'm "+self.name+' @ '+str(i)
            print(msg)
def test():
    for i in range(5):
        t = MyThread()
        t.start()
if __name__ == '__main__':
    test()      

從代碼和執行結果我們可以看出,多線程程式的執行順序是不确定的。當執行到sleep語句時,線程将被阻塞(Blocked),到sleep結束後,線程進入就緒(Runnable)狀态,等待排程。而線程排程将自行選擇一個線程執行。上面的代碼中隻能保證每個線程都運作完整個run函數,但是線程的啟動順序、run函數中每次循環的執行順序都不能确定。

9.主線程會等待所有的子線程執行結束再結束

import threading
import time


# 測試主線程是否會等待子線程執行完成以後程式再退出
def show_info():
    for i in range(5):
        print("test:", i)
        time.sleep(0.5)


if __name__ == '__main__':
    sub_thread = threading.Thread(target=show_info)
    sub_thread.start()

    # 主線程延時1秒
    time.sleep(1)
    print("over")      
  1. 通過上面代碼的執行結果,我們可以得知: 主線程會等待所有的子線程執行結束再結束

    假如我們就讓主線程執行1秒鐘,子線程就銷毀不再執行,那怎麼辦呢?

  • 我們可以設定守護主線程

守護主線程:

  • 守護主線程就是主線程退出子線程銷毀不再執行

設定守護主線程有兩種方式:

  1. threading.Thread(target=show_info, daemon=True)
  2. 線程對象.setDaemon(True)

設定守護主線程的示例代碼:

import threading
import time


# 測試主線程是否會等待子線程執行完成以後程式再退出
def show_info():
    for i in range(5):
        print("test:", i)
        time.sleep(0.5)


if __name__ == '__main__':
    # 建立子線程守護主線程 
    # daemon=True 守護主線程
    # 守護主線程方式1
    sub_thread = threading.Thread(target=show_info, daemon=True)
    # 設定成為守護主線程,主線程退出後子線程直接銷毀不再執行子線程的代碼
    # 守護主線程方式2
    # sub_thread.setDaemon(True)
    sub_thread.start()

    # 主線程延時1秒
    time.sleep(1)
    print("over")      

線程之間共享全局變量

import threading
import time


# 定義全局變量
my_list = list()

# 寫入資料任務
def write_data():
    for i in range(5):
        my_list.append(i)
        time.sleep(0.1)
    print("write_data:", my_list)


# 讀取資料任務
def read_data():
    print("read_data:", my_list)


if __name__ == '__main__':
    # 建立寫入資料的線程
    write_thread = threading.Thread(target=write_data)
    # 建立讀取資料的線程
    read_thread = threading.Thread(target=read_data)

    write_thread.start()
    # 延時
    # time.sleep(1)
    # 主線程等待寫入線程執行完成以後代碼在繼續往下執行
    write_thread.join()
    print("開始讀取資料啦")
    read_thread.start()      

線程之間共享全局變量資料出現錯誤問題

import threading

# 定義全局變量
g_num = 0


# 循環一次給全局變量加1
def sum_num1():
    for i in range(1000000):
        global g_num
        g_num += 1

    print("sum1:", g_num)


# 循環一次給全局變量加1
def sum_num2():
    for i in range(1000000):
        global g_num
        g_num += 1
    print("sum2:", g_num)


if __name__ == '__main__':
    # 建立兩個線程
    first_thread = threading.Thread(target=sum_num1)
    second_thread = threading.Thread(target=sum_num2)

    # 啟動線程
    first_thread.start()
    # 啟動線程
    second_thread.start()      

兩個線程first_thread和second_thread都要對全局變量g_num(預設是0)進行加1運算,但是由于是多線程同時操作,有可能出現下面情況:

  1. 在g_num=0時,first_thread取得g_num=0。此時系統把first_thread排程為”sleeping”狀态,把second_thread轉換為”running”狀态,t2也獲得g_num=0
  2. 然後second_thread對得到的值進行加1并賦給g_num,使得g_num=1
  3. 然後系統又把second_thread排程為”sleeping”,把first_thread轉為”running”。線程t1又把它之前得到的0加1後指派給g_num。
  4. 這樣導緻雖然first_thread和first_thread都對g_num加1,但結果仍然是g_num=1

全局變量資料錯誤的解決辦法:

線程同步: 保證同一時刻隻能有一個線程去操作全局變量 同步: 就是協同步調,按預定的先後次序進行運作。如:你說完,我再說, 好比現實生活中的對講機

線程同步的方式:

  1. 線程等待(join)
  2. 互斥鎖

線程等待的示例代碼:

import threading

# 定義全局變量
g_num = 0


# 循環1000000次每次給全局變量加1
def sum_num1():
    for i in range(1000000):
        global g_num
        g_num += 1

    print("sum1:", g_num)


# 循環1000000次每次給全局變量加1
def sum_num2():
    for i in range(1000000):
        global g_num
        g_num += 1
    print("sum2:", g_num)


if __name__ == '__main__':
    # 建立兩個線程
    first_thread = threading.Thread(target=sum_num1)
    second_thread = threading.Thread(target=sum_num2)

    # 啟動線程
    first_thread.start()
    # 主線程等待第一個線程執行完成以後代碼再繼續執行,讓其執行第二個線程
    # 線程同步: 一個任務執行完成以後另外一個任務才能執行,同一個時刻隻有一個任務在執行
    first_thread.join()
    # 啟動線程
    second_thread.start()      

互斥鎖:

互斥鎖: 對共享資料進行鎖定,保證同一時刻隻能有一個線程去操作。

注意:

  • 互斥鎖是多個線程一起去搶,搶到鎖的線程先執行,沒有搶到鎖的線程需要等待,等互斥鎖使用完釋放後,其它等待的線程再去搶這個鎖。
  • threading子產品中定義了Lock變量,這個變量本質上是一個函數,通過調用這個函數可以擷取一把互斥鎖。

    互斥鎖使用步驟:

# 建立鎖
mutex = threading.Lock()

# 上鎖
mutex.acquire()

...這裡編寫代碼能保證同一時刻隻能有一個線程去操作, 對共享資料進行鎖定...

# 釋放鎖
mutex.release()      
  • acquire和release方法之間的代碼同一時刻隻能有一個線程去操作
  • 如果在調用acquire方法的時候 其他線程已經使用了這個互斥鎖,那麼此時acquire方法會堵塞,直到這個互斥鎖釋放後才能再次上鎖。
  • 上鎖,解鎖的過程:
-   當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入“locked”狀态。每次隻有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為“blocked”狀态,稱為“阻塞”,直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入“unlocked”狀态。線程排程程式從處于同步阻塞狀态的線程中選擇一個來獲得鎖,并使得該線程進入運作(running)狀态。      

示範代碼:

import threading


# 定義全局變量
g_num = 0

# 建立全局互斥鎖
lock = threading.Lock()


# 循環一次給全局變量加1
def sum_num1():
    # 上鎖
    lock.acquire()
    for i in range(1000000):
        global g_num
        g_num += 1

    print("sum1:", g_num)
    # 釋放鎖
    lock.release()


# 循環一次給全局變量加1
def sum_num2():
    # 上鎖
    lock.acquire()
    for i in range(1000000):
        global g_num
        g_num += 1
    print("sum2:", g_num)
    # 釋放鎖
    lock.release()


if __name__ == '__main__':
    # 建立兩個線程
    first_thread = threading.Thread(target=sum_num1)
    second_thread = threading.Thread(target=sum_num2)
    # 啟動線程
    first_thread.start()
    second_thread.start()

    # 提示:加上互斥鎖,那個線程搶到這個鎖我們決定不了,那線程搶到鎖那個線程先執行,沒有搶到的線程需要等待
    # 加上互斥鎖多任務瞬間變成單任務,性能會下降,也就是說同一時刻隻能有一個線程去執行互斥鎖
    # 通過執行結果可以位址互斥鎖能夠保證多個線程通路共享資料不會出現資料錯誤問題      

互斥鎖作用以及缺點:

  • 互斥鎖的作用就是保證同一時刻隻能有一個線程去操作共享資料,保證共享資料不會出現錯誤問題
  • 使用互斥鎖的好處確定某段關鍵代碼隻能由一個線程從頭到尾完整地去執行
  • 使用互斥鎖會影響代碼的執行效率,多任務改成了單任務執行
  • 互斥鎖如果沒有使用好容易出現死鎖的情況

死鎖:

死鎖: 一直等待對方釋放鎖的情景就是死鎖出現

出現死鎖代碼示範:

import threading
import time

# 建立互斥鎖
lock = threading.Lock()


# 根據下标去取值, 保證同一時刻隻能有一個線程去取值
def get_value(index):

    # 上鎖
    lock.acquire()
    print(threading.current_thread())
    my_list = [3,6,8,1]
    # 判斷下标釋放越界
    if index >= len(my_list):
        print("下标越界:", index)
        return
    value = my_list[index]
    print(value)
    time.sleep(0.2)
    # 釋放鎖
    lock.release()


if __name__ == '__main__':
    # 模拟大量線程去執行取值操作
    for i in range(30):
        sub_thread = threading.Thread(target=get_value, args=(i,))
        sub_thread.start()      
# 在合适的地方釋放鎖
import threading
import time

# 建立互斥鎖
lock = threading.Lock()


# 根據下标去取值, 保證同一時刻隻能有一個線程去取值
def get_value(index):

    # 上鎖
    lock.acquire()
    print(threading.current_thread())
    my_list = [3,6,8,1]
    if index >= len(my_list):
        print("下标越界:", index)
        # 當下标越界需要釋放鎖,讓後面的線程還可以取值
        lock.release()
        return
    value = my_list[index]
    print(value)
    time.sleep(0.2)
    # 釋放鎖
    lock.release()


if __name__ == '__main__':
    # 模拟大量線程去執行取值操作
    for i in range(30):
        sub_thread = threading.Thread(target=get_value, args=(i,))
        sub_thread.start()      

ading.Lock()

根據下标去取值, 保證同一時刻隻能有一個線程去取值

# 上鎖
lock.acquire()
print(threading.current_thread())
my_list = [3,6,8,1]
if index >= len(my_list):
    print("下标越界:", index)
    # 當下标越界需要釋放鎖,讓後面的線程還可以取值
    lock.release()
    return
value = my_list[index]
print(value)
time.sleep(0.2)
# 釋放鎖
lock.release()