1、循環建立多個線程,并通過循環啟動執行
import threading
from datetime import *
from time import sleep
# 單線程執行
def test():
print('hello world')
t = threading.Thread(target=test)
t.start()
# 多線程執行
def test_01():
sleep(1)
x = 0
while x == 0: # 設定一個死循環
print(datetime.now()) # 擷取目前系統時間
def looptest():
'''
循環20次執行 test_o1()函數
:return:
'''
for i in range(20):
test_01()
def thd():
'''
建立并執行多個線程
需求:并發執行50次 test_o1()函數
說明:把50的并發拆成25個線程組,每個線程再循環20次執行 test_o1()函數,這樣在啟動下一個線程的時候,
上一個線程已經在循環了,以此類推,當啟動第25個線程的時候,可能已經執行了200次的t est_o1()函數,
這樣就可以大大減少并發的時間差異
:return:
'''
Threads = []
for i in range(25):
th = threading.Thread(target=looptest)
Threads.append(th)
'''
守護線程:主線程執行完畢之後,會等待子線程全部執行完畢,才會關閉結束程式
必須加在start()之前,預設為 false
'''
th.setDaemon(True)
for th in Threads:
th.start()
for th in Threads:
'''
阻塞線程:等主線程執行完畢之後再關閉所有子線程
必須加在start()之後
可以通過join()的timeout參數來完美解決互相等待的問題,子線程告訴主線程讓其等待0.04秒,
0.04秒之内子線程完成,主線程就繼續往下執行,0.04秒之後如果子線程還未完成,主線程也會
繼續往下執行,執行完成之後關閉子線程
'''
th.join(0.04)
if __name__=="__main__":
print('start')
thd()
print('end')
2、并發測試架構
# 并發測試架構
THREAD_NUM = 1
ONE_WORKER_NUM = 1
def test():
pass # 測試代碼
def working():
global ONE_WORKER_NUM
for i in range(0, ONE_WORKER_NUM):
test()
def t():
global THREAD_NUM
Threads = []
for i in range(THREAD_NUM):
t = threading.Thread(target=working,name='T'+str(i))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
if __name__=="__main__":
t()