天天看點

Python 多線程并發程式設計與分析

多線程并發程式設計與分析

by:授客 QQ:1033553122

1.技術難點分析與總結

難點1:線程運作時,運作順序不固定

難點2:同一段代碼,再不加鎖的情況下,可能被多個線程同時執行,這會造成很多麻煩,比如變量的指派不正确,方法的重複調用,而如果加鎖,或者通過join阻塞方式等來控制,那麼又如同運作單程序,效率低下,達不到,“并發”,“高速”的效果。

難點3:不通過join阻塞等方式,主線程可能會優先于子線程退出,這也會導緻問題,比如子線程還在用檔案句柄,主線程就把檔案關閉了。

解決方法:

1、考慮為線程類添加變量屬性,這樣一來,每個線程都擁有自己的變量,互不影響,比如下面例子中用到的run_times

2、線程公用的一些變量,也可以考慮通過線程類的變量屬性傳遞,比如下面例子中多線程用到的檔案句柄file_handler

3、必要時,關鍵代碼可以考慮枷鎖Lock、RLock,具體自己看官方文檔,比如下方的檔案寫入,不同線程可能會在同一行寫入資料,導緻資料統計時不準确,是以加鎖,如果出于速度考慮,可以考慮分别給每個線程傳遞屬于自己的檔案句柄,寫入不同的檔案,

4、清理工作,關于這個,需要知道2點:

1)main線程退出時,不會kill非守護線程,但是會kill守護線程

2)通常,子線程start()後會去調用run方法,運作完run方法,子線程停止執行,不會繼續運作之後的代碼。

是以,通常我們可以這麼做,擷取目前活動線程數,如果線程數為1,則說明子線程都運作完,可以繼續後面的代碼清理工作,否則繼續循環檢測,這裡還可以加代碼優化,比如每隔一段時間檢測一次,以免主線程浪費系統資源

# 利用主線程執行清理工作

    current_active_thread_num

=

len(threading.enumerate())

# 擷取目前活動線程數量

while

 current_active_thread_num

!=

1:

    time.sleep(10) #

每10秒檢測一次

2.代碼實踐

requestpy.py

#!/usr/bin/env python

#

-*- coding:utf-8 -*-

__author__

'shouke'

import

urllib.request

json

sys

threading

from

collections

import

Counter

time

datetime

class

SubThread(threading.Thread):

    mutex_lock

= threading.RLock()

    def

__init__(self,

file_handler):

        self.file_handler

= file_handler

        self.run_times

# 記錄每個線程的運作次數

        threading.Thread.__init__(self)

run(self):

        while

self.run_times

<</span>

int(sys.argv[2]):

            url

'http://xxxxxx/xxxxxcard/kq/codepool/test/'

            request

= urllib.request.Request(url,

method='POST')

            try:

                response

= urllib.request.urlopen(request)

                response_body

= response.read()

= response_body.decode('utf-8')

= json.loads(response_body)

                 #

寫入檔案

                SubThread.mutex_lock.acquire()

                self.file_handler.write(str(response_body['code']))

                self.file_handler.write('\n')

                SubThread.mutex_lock.release()

                self.run_times

+

1

                print('已經執行%s次請求'

%

str(self.run_times))

            except

Exception

as

e:

                print('請求錯誤%s'

% e)

def

analyze(test_result_data):

    list_data

= []

      #

存放目标資料

    total_line_count

0  #

讀取的文本行數

    abnormal_line

   #

存放異常資料行數

    digit_line

存放正确資料函數

    with

 open(test_result_data,

'r')

file:

        line

= file.readline()

line:

            line

= line.strip('\n')

            if

line.isdigit()

and

len(line)

==

12:

                list_data.append(int(line))

                digit_line

= digit_line +

            else:

                abnormal_line

= abnormal_line +

                print('伺服器傳回資料異常')

            total_line_count

= total_line_count +

    print('讀取的總行數:%s'

str(total_line_count))

    print('資料正确的行數:%s'

str(digit_line))

    print('資料異常的行數:%s'

str(abnormal_line))

    #

分析是否存在重複資料

    set_data

set(list_data)

    if

len(set_data)

len(list_data):

        print('不存在重複資料,

總數:%s

條'

len(list_data))

    else:

        print('有重複資料,重複資料:%s條'

% (len(list_data)

-

len(set_data)))

if

__name__ ==

'__main__':

    start_time

= datetime.datetime.now()

test_result_data =

'd:\\test_result_data.txt'

    file

=  open(test_result_data,

'w')

 #

存儲伺服器傳回資料

    threads_pool

= []  #

線程池,存放線程對象

    thread_num

記錄建立的線程數量

    while

thread_num <</span>

int(sys.argv[1]):

        thread_obj

= SubThread(file)

        threads_pool.append(thread_obj)

        thread_num

= thread_num +

    for

thread

in

threads_pool:

        thread.start()

利用主線程執行清理工作

        time.sleep(10)

        current_active_thread_num

清理工作

    try:

        file.close()

    except

        print('關閉檔案出錯%s'

end_time = datetime.datetime.now()

    print('運作耗時:',end_time

- start_time)

# 分析資料

    analyze(test_result_data)

運作(禁用time.sleep函數的情況下):

100個線程,每個線程運作50次,總的運作

5000次

python requestpy.py

100

50

Python 多線程并發程式設計與分析
Python 多線程并發程式設計與分析

修改程式如下

class SubThread(threading.Thread):

    def __init__(self, file_handler):

        self.file_handler = file_handler

        self.run_times = 0 # 記錄每個線程的運作次數

    def run(self):

        while self.run_times < int(sys.argv[2]):

            url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/'

            request = urllib.request.Request(url, method='POST')

                response = urllib.request.urlopen(request)

                response_body = response.read()

                response_body = response_body.decode('utf-8')

                response_body = json.loads(response_body)

                # 寫入檔案

                self.run_times = self.run_times + 1 # 記錄每個線程的運作次數

                print('已經執行%s次請求' % str(self.run_times))

            except Exception as e:

                print('請求錯誤%s' % e)

def analyze(test_result_file_list):

    list_data = []       # 存放目标資料

    total_line_count = 0  # 讀取的文本行數

    abnormal_line = 0    # 存放異常資料行數

    digit_line = 0       # 存放正确資料函數

    for file in test_result_file_list:

        with  open(file, 'r') as file:

            line = file.readline()

            while line:

                line = line.strip('\n')

                if line.isdigit() and len(line) == 12:

                    list_data.append(int(line))

                    digit_line = digit_line + 1

                else:

                    abnormal_line = abnormal_line + 1

                    print('伺服器傳回資料異常')

                line = file.readline()

                total_line_count = total_line_count + 1

    print('讀取的總行數:%s' % str(total_line_count))

    print('資料正确的行數:%s' % str(digit_line))

    print('資料異常的行數:%s' % str(abnormal_line))

    # 分析是否存在重複資料

    set_data = set(list_data)

    if len(set_data) == len(list_data):

        print('不存在重複資料, 總數:%s 條' % len(list_data))

        print('有重複資料,重複資料:%s條' % (len(list_data) - len(set_data)))

    # 擷取重複資料

    filehaneder = open('d:\\repeat_data.txt', 'w')

    c = Counter(list_data)

    for item in c.items():

        if item[1] > 1:

            print('重複資料:%s' % item[0])

            filehaneder.write(str(item[0]))

            filehaneder.write('\n')

    filehaneder.close()

if __name__ == '__main__':

    start_time = datetime.datetime.now()

    base_filename = 'test_result_data'

    base_dirname = 'd:\\result\\'

    test_result_file_list = [] # 存儲結果資料檔案名

    sub_thread_file_list = [] # 每個線程的檔案句柄

    threads_pool = []  # 線程池,存放線程對象

    thread_num = 0  # 記錄建立的線程數量

    while thread_num < int(sys.argv[1]):

        filename = base_dirname + base_filename + str(thread_num + 1) + '.txt'

        test_result_file_list.append(filename)

        file =  open(filename, 'w')

        sub_thread_file_list.append(file)

        thread_obj = SubThread(file)

        thread_num = thread_num + 1

    for thread in threads_pool:

    # # 利用主線程執行清理工作

    current_active_thread_num = len(threading.enumerate()) # 擷取目前活動線程數量

    while  current_active_thread_num != 1:

        time.sleep(300)

        current_active_thread_num = len(threading.enumerate())

    # 清理工作

        for file in sub_thread_file_list:

            file.close()

    except Exception as e:

        print('關閉檔案出錯%s' % e)

    end_time = datetime.datetime.now()

    print('運作耗時:',end_time - start_time)

    # 分析資料

    analyze(test_result_file_list)

運作結果:

Python 多線程并發程式設計與分析

作者:授客

QQ:1033553122

全國軟體測試QQ交流群:7156436

Git位址:https://gitee.com/ishouke

友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!

作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額随意,您的支援将是我繼續創作的源動力,打賞後如有任何疑問,請聯系我!!!

           微信打賞                       

支付寶打賞                  全國軟體測試交流QQ群  

Python 多線程并發程式設計與分析
Python 多線程并發程式設計與分析
Python 多線程并發程式設計與分析