多線程并發程式設計與分析
by:授客 QQ:1033553122
1.技術難點分析與總結
難點1:線程運作時,運作順序不固定
難點2:同一段代碼,再不加鎖的情況下,可能被多個線程同時執行,這會造成很多麻煩,比如變量的指派不正确,方法的重複調用,而如果加鎖,或者通過join阻塞方式等來控制,那麼又如同運作單程序,效率低下,達不到,“并發”,“高速”的效果。
難點3:不通過join阻塞等方式,主線程可能會優先于子線程退出,這也會導緻問題,比如子線程還在用檔案句柄,主線程就把檔案關閉了。
解決方法:
1、考慮為線程類添加變量屬性,這樣一來,每個線程都擁有自己的變量,互不影響,比如下面例子中用到的run_times
2、線程公用的一些變量,也可以考慮通過線程類的變量屬性傳遞,比如下面例子中多線程用到的檔案句柄file_handler
3、必要時,關鍵代碼可以考慮枷鎖Lock、RLock,具體自己看官方文檔,比如下方的檔案寫入,不同線程可能會在同一行寫入資料,導緻資料統計時不準确,是以加鎖,如果出于速度考慮,可以考慮分别給每個線程傳遞屬于自己的檔案句柄,寫入不同的檔案,
4、清理工作,關于這個,需要知道2點:
1)main線程退出時,不會kill非守護線程,但是會kill守護線程
2)通常,子線程start()後會去調用run方法,運作完run方法,子線程停止執行,不會繼續運作之後的代碼。
是以,通常我們可以這麼做,擷取目前活動線程數,如果線程數為1,則說明子線程都運作完,可以繼續後面的代碼清理工作,否則繼續循環檢測,這裡還可以加代碼優化,比如每隔一段時間檢測一次,以免主線程浪費系統資源
# 利用主線程執行清理工作
current_active_thread_num
=
len(threading.enumerate())
# 擷取目前活動線程數量
while
current_active_thread_num
!=
1:
time.sleep(10) #
每10秒檢測一次
2.代碼實踐
requestpy.py
#!/usr/bin/env python
#
-*- coding:utf-8 -*-
__author__
'shouke'
import
urllib.request
json
sys
threading
from
collections
import
Counter
time
datetime
class
SubThread(threading.Thread):
mutex_lock
= threading.RLock()
def
__init__(self,
file_handler):
self.file_handler
= file_handler
self.run_times
# 記錄每個線程的運作次數
threading.Thread.__init__(self)
run(self):
while
self.run_times
<</span>
int(sys.argv[2]):
url
'http://xxxxxx/xxxxxcard/kq/codepool/test/'
request
= urllib.request.Request(url,
method='POST')
try:
response
= urllib.request.urlopen(request)
response_body
= response.read()
= response_body.decode('utf-8')
= json.loads(response_body)
#
寫入檔案
SubThread.mutex_lock.acquire()
self.file_handler.write(str(response_body['code']))
self.file_handler.write('\n')
SubThread.mutex_lock.release()
self.run_times
+
1
print('已經執行%s次請求'
%
str(self.run_times))
except
Exception
as
e:
print('請求錯誤%s'
% e)
def
analyze(test_result_data):
list_data
= []
#
存放目标資料
total_line_count
0 #
讀取的文本行數
abnormal_line
#
存放異常資料行數
digit_line
存放正确資料函數
with
open(test_result_data,
'r')
file:
line
= file.readline()
line:
line
= line.strip('\n')
if
line.isdigit()
and
len(line)
==
12:
list_data.append(int(line))
digit_line
= digit_line +
else:
abnormal_line
= abnormal_line +
print('伺服器傳回資料異常')
total_line_count
= total_line_count +
print('讀取的總行數:%s'
str(total_line_count))
print('資料正确的行數:%s'
str(digit_line))
print('資料異常的行數:%s'
str(abnormal_line))
#
分析是否存在重複資料
set_data
set(list_data)
if
len(set_data)
len(list_data):
print('不存在重複資料,
總數:%s
條'
len(list_data))
else:
print('有重複資料,重複資料:%s條'
% (len(list_data)
-
len(set_data)))
if
__name__ ==
'__main__':
start_time
= datetime.datetime.now()
test_result_data =
'd:\\test_result_data.txt'
file
= open(test_result_data,
'w')
#
存儲伺服器傳回資料
threads_pool
= [] #
線程池,存放線程對象
thread_num
記錄建立的線程數量
while
thread_num <</span>
int(sys.argv[1]):
thread_obj
= SubThread(file)
threads_pool.append(thread_obj)
thread_num
= thread_num +
for
thread
in
threads_pool:
thread.start()
利用主線程執行清理工作
time.sleep(10)
current_active_thread_num
清理工作
try:
file.close()
except
print('關閉檔案出錯%s'
end_time = datetime.datetime.now()
print('運作耗時:',end_time
- start_time)
# 分析資料
analyze(test_result_data)
運作(禁用time.sleep函數的情況下):
100個線程,每個線程運作50次,總的運作
5000次
python requestpy.py
100
50
修改程式如下
class SubThread(threading.Thread):
def __init__(self, file_handler):
self.file_handler = file_handler
self.run_times = 0 # 記錄每個線程的運作次數
def run(self):
while self.run_times < int(sys.argv[2]):
url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/'
request = urllib.request.Request(url, method='POST')
response = urllib.request.urlopen(request)
response_body = response.read()
response_body = response_body.decode('utf-8')
response_body = json.loads(response_body)
# 寫入檔案
self.run_times = self.run_times + 1 # 記錄每個線程的運作次數
print('已經執行%s次請求' % str(self.run_times))
except Exception as e:
print('請求錯誤%s' % e)
def analyze(test_result_file_list):
list_data = [] # 存放目标資料
total_line_count = 0 # 讀取的文本行數
abnormal_line = 0 # 存放異常資料行數
digit_line = 0 # 存放正确資料函數
for file in test_result_file_list:
with open(file, 'r') as file:
line = file.readline()
while line:
line = line.strip('\n')
if line.isdigit() and len(line) == 12:
list_data.append(int(line))
digit_line = digit_line + 1
else:
abnormal_line = abnormal_line + 1
print('伺服器傳回資料異常')
line = file.readline()
total_line_count = total_line_count + 1
print('讀取的總行數:%s' % str(total_line_count))
print('資料正确的行數:%s' % str(digit_line))
print('資料異常的行數:%s' % str(abnormal_line))
# 分析是否存在重複資料
set_data = set(list_data)
if len(set_data) == len(list_data):
print('不存在重複資料, 總數:%s 條' % len(list_data))
print('有重複資料,重複資料:%s條' % (len(list_data) - len(set_data)))
# 擷取重複資料
filehaneder = open('d:\\repeat_data.txt', 'w')
c = Counter(list_data)
for item in c.items():
if item[1] > 1:
print('重複資料:%s' % item[0])
filehaneder.write(str(item[0]))
filehaneder.write('\n')
filehaneder.close()
if __name__ == '__main__':
start_time = datetime.datetime.now()
base_filename = 'test_result_data'
base_dirname = 'd:\\result\\'
test_result_file_list = [] # 存儲結果資料檔案名
sub_thread_file_list = [] # 每個線程的檔案句柄
threads_pool = [] # 線程池,存放線程對象
thread_num = 0 # 記錄建立的線程數量
while thread_num < int(sys.argv[1]):
filename = base_dirname + base_filename + str(thread_num + 1) + '.txt'
test_result_file_list.append(filename)
file = open(filename, 'w')
sub_thread_file_list.append(file)
thread_obj = SubThread(file)
thread_num = thread_num + 1
for thread in threads_pool:
# # 利用主線程執行清理工作
current_active_thread_num = len(threading.enumerate()) # 擷取目前活動線程數量
while current_active_thread_num != 1:
time.sleep(300)
current_active_thread_num = len(threading.enumerate())
# 清理工作
for file in sub_thread_file_list:
file.close()
except Exception as e:
print('關閉檔案出錯%s' % e)
end_time = datetime.datetime.now()
print('運作耗時:',end_time - start_time)
# 分析資料
analyze(test_result_file_list)
運作結果:
作者:授客
QQ:1033553122
全國軟體測試QQ交流群:7156436
Git位址:https://gitee.com/ishouke
友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額随意,您的支援将是我繼續創作的源動力,打賞後如有任何疑問,請聯系我!!!
微信打賞
支付寶打賞 全國軟體測試交流QQ群