天天看點

python多線程并發

1、循環建立多個線程,并通過循環啟動執行

import threading
from datetime import *
from time import sleep

# 單線程執行
def test():
    print('hello world')

t = threading.Thread(target=test)
t.start()

# 多線程執行
def test_01():
    sleep(1)
    x = 0
    while x == 0:        # 設定一個死循環
        print(datetime.now())             # 擷取目前系統時間

def looptest():
    '''
    循環20次執行 test_o1()函數
    :return:
    '''
    for i in range(20):
        test_01()

def thd():
    '''
    建立并執行多個線程
    需求:并發執行50次 test_o1()函數
    說明:把50的并發拆成25個線程組,每個線程再循環20次執行 test_o1()函數,這樣在啟動下一個線程的時候,
    上一個線程已經在循環了,以此類推,當啟動第25個線程的時候,可能已經執行了200次的t est_o1()函數,
    這樣就可以大大減少并發的時間差異
    :return:
    '''
    Threads = []
    for i in range(25):
        th = threading.Thread(target=looptest)
        Threads.append(th)
        '''
        守護線程:主線程執行完畢之後,會等待子線程全部執行完畢,才會關閉結束程式
        必須加在start()之前,預設為 false
        '''
        th.setDaemon(True)
    for th in Threads:
        th.start()
    for th in Threads:
        '''
        阻塞線程:等主線程執行完畢之後再關閉所有子線程
        必須加在start()之後
        可以通過join()的timeout參數來完美解決互相等待的問題,子線程告訴主線程讓其等待0.04秒,
        0.04秒之内子線程完成,主線程就繼續往下執行,0.04秒之後如果子線程還未完成,主線程也會
        繼續往下執行,執行完成之後關閉子線程
        '''
        th.join(0.04)

if __name__=="__main__":
    print('start')
    thd()
    print('end')      

2、并發測試架構

# 并發測試架構
THREAD_NUM = 1
ONE_WORKER_NUM = 1
def test():
    pass            # 測試代碼

def working():
    global ONE_WORKER_NUM
    for i in range(0, ONE_WORKER_NUM):
        test()
        
def t():
    global THREAD_NUM
    Threads = []
    for i in range(THREAD_NUM):
        t = threading.Thread(target=working,name='T'+str(i))
        t.setDaemon(True)
        Threads.append(t)
    for t in Threads:
        t.start()
    for t in Threads:
        t.join()
        
if __name__=="__main__":
    t()