Python多線程
Threading子產品用于提供線程相關的操作,線程是應用程式中工作的最小單元。
threading子產品提供的類:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
Thread類
Thread是線程類,有兩種使用方法,直接傳入要運作的方法或從Thread繼承并覆寫run():

# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
def run(arg):
time.sleep(2)
print("Thread-%s" % arg, )
if __name__ == '__main__':
for i in range(10):
# 将要執行的方法作為參數傳遞給Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
方式1:建立線程

Thread-3
Thread-2
Thread-1
Thread-0
Thread-4
Thread-7
Thread-6
Thread-5
Thread-9
Thread-8
Process finished with exit code 0
執行結果

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
# 從Thread繼承,并重寫run()
class myThread(threading.Thread):
def __init__(self, num):
# #注意:一定要顯式的調用父類的初始化函數。
threading.Thread.__init__(self)
self.num = num
# 定義每個線程要運作的函數
def run(self):
time.sleep(1)
print("Thread-%s" % self.num)
if __name__ == '__main__':
for i in range(10):
thread = myThread(i)
thread.start()
方式2:自定義線程類,繼承Thread類,并重寫run方法

Thread-4
Thread-2
Thread-1
Thread-3
Thread-0
Thread-6
Thread-5
Thread-8
Thread-9
Thread-7
Process finished with exit code 0
運作結果
Thread類的構造方法:
Thread(group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None)
class Thread:
def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):
• group: 線程組,目前還沒有實作,庫引用中提示必須是None;
• target: 要執行的方法run();
• name: 線程名;
• args/kwargs: 要傳入方法的參數args元組,kwargs字典。
• daemon=None:如果子類重寫構造函數,則必須確定在對線程執行其他操作之前調用基類構造函數
Thread執行個體方法:
• is_alive(): 傳回線程是否在運作[bool]。正在運作指啟動後、終止前。
• getName(): 擷取線程名。
• setName(str):設定線程名
• start(): 線程準備就緒,啟動線程,等待CPU排程
• isDaemon():傳回線程前台線程還是背景線程[bool]
• setDaemon(bool):設定線程前台線程還是背景線程[bool],必須在start前設定
如果是背景線程[True],主線程執行過程中,背景線程也在進行,主線程執行完畢後,背景線程不論成功與否,主線程和背景線程均停止
如果是前台線程[False]預設,主線程執行過程中,前台線程也在進行,主線程執行完畢後,等待前台線程也執行完成後,程式停止
• join([timeout]): 阻塞目前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數),逐個執行每個線程,執行完畢後繼續往下執行,該方法使得多線程變得無意義

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Created by YangYongming at 2018/11/24 14:07
# FileName: 5.py
import threading
def run(arg):
print(arg)
thread = threading.Thread(name="yangym", target=run(1), )
print(thread.getName()) # 輸出 yangym
thread.setDaemon(False)
thread.setName("YANGYM")
print(thread.getName()) # 輸出 YANGYM
thread.start()
print(thread.is_alive()) # 輸出 False
print(thread.isDaemon()) # 輸出 False
執行個體示範
setDaemon 使用介紹
setDaemon為True時,主線程不等待其他線程的執行;setDaemon為False時(預設),主線程等待所有其他線程執行完成後再繼續往下執行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
# 從Thread繼承,并重寫run()
class myThread(threading.Thread):
def __init__(self, num):
# #注意:一定要顯式的調用父類的初始化函數。
threading.Thread.__init__(self)
self.num = num
# 定義每個線程要運作的函數
def run(self):
time.sleep(1)
print("Thread-%s" % self.num)
if __name__ == '__main__':
for i in range(10):
thread = myThread(i)
thread.setDaemon(True)
thread.start()
setDaemon為True

Process finished with exit code 0
運作結果:主程式執行完畢後立即退出

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
# 從Thread繼承,并重寫run()
class myThread(threading.Thread):
def __init__(self, num):
# #注意:一定要顯式的調用父類的初始化函數。
threading.Thread.__init__(self)
self.num = num
# 定義每個線程要運作的函數
def run(self):
time.sleep(1)
print("Thread-%s" % self.num)
if __name__ == '__main__':
for i in range(10):
thread = myThread(i)
thread.setDaemon(False) #預設
thread.start()
setDaemon為False

Thread-2
Thread-3
Thread-1
Thread-0
Thread-9
Thread-6
Thread-4
Thread-7
Thread-8
Thread-5
Process finished with exit code 0
運作結果:主程式執行完畢後會等待其他所有線程全部結束後在退出程式
join 使用介紹
阻塞目前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數),逐個執行每個線程,執行完畢後繼續往下執行,該方法使得多線程變得無意義

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
def run(arg):
time.sleep(2)
print("Thread-%s Time is: %s" % (arg,time.ctime()) )
if __name__ == '__main__':
for i in range(10):
# 将要執行的方法作為參數傳遞給Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
thread.join()
使用join,使用join的線程會阻塞其他線程的運作,指導線程結束或者逾時,其他線程才可以繼續運作

Thread-0 Time is: Sat Mar 10 13:33:24 2018
Thread-1 Time is: Sat Mar 10 13:33:26 2018
Thread-2 Time is: Sat Mar 10 13:33:28 2018
Thread-3 Time is: Sat Mar 10 13:33:30 2018
Thread-4 Time is: Sat Mar 10 13:33:32 2018
Thread-5 Time is: Sat Mar 10 13:33:34 2018
Thread-6 Time is: Sat Mar 10 13:33:36 2018
Thread-7 Time is: Sat Mar 10 13:33:38 2018
Thread-8 Time is: Sat Mar 10 13:33:40 2018
Thread-9 Time is: Sat Mar 10 13:33:42 2018
Process finished with exit code 0
運作結果:每個線程都會等待使用join的線程執行完畢後才繼續運作,等待2秒,這使得多線程變得沒有意義

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
def run(arg):
time.sleep(2)
print("Thread-%s Time is: %s" % (arg,time.ctime()) )
if __name__ == '__main__':
for i in range(10):
# 将要執行的方法作為參數傳遞給Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
不使用join,線程之前運作互不影響

Thread-4 Time is: Sat Mar 10 13:34:22 2018
Thread-2 Time is: Sat Mar 10 13:34:22 2018
Thread-1 Time is: Sat Mar 10 13:34:22 2018
Thread-0 Time is: Sat Mar 10 13:34:22 2018
Thread-3 Time is: Sat Mar 10 13:34:22 2018
Thread-6 Time is: Sat Mar 10 13:34:22 2018
Thread-5 Time is: Sat Mar 10 13:34:22 2018
Thread-9 Time is: Sat Mar 10 13:34:22 2018
Thread-8 Time is: Sat Mar 10 13:34:22 2018
Thread-7 Time is: Sat Mar 10 13:34:22 2018
Process finished with exit code 0
線程鎖(Lock、RLock)
由于線程之間是進行随機排程,并且每個線程可能隻執行n條執行之後,當多個線程同時修改同一條資料時可能會出現髒資料,是以,出現了線程鎖 - 同一時刻允許一個線程執行操作。
Lock屬于全局,Rlock屬于線程。Lock(),Rlock(),推薦使用Rlock()
一把鎖頭,一把鑰匙,鑰匙用完給下一個人用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
num = 0
def run(arg):
global num
time.sleep(1)
num = num + arg
print(num)
if __name__ == '__main__':
for i in range(10):
# 将要執行的方法作為參數傳遞給Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
未使用鎖,所有線程随機運作,資料混亂

5
8
9
11
15
15
24
32
39
45
Process finished with exit code 0
執行結果,未按照理想的順序運作

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
lock = threading.RLock()
num = 0
def run(arg):
lock.acquire()
global num
time.sleep(1)
num = num + arg
print(num)
lock.release()
if __name__ == '__main__':
for i in range(10):
# 将要執行的方法作為參數傳遞給Thread方法
thread = threading.Thread(target=run, args=(i,))
thread.start()
使用鎖,設定一條資料在同一時間隻可以被一個線程修改

0
1
3
6
10
15
21
28
36
45
Process finished with exit code 0
信号量(Semaphore)
互斥鎖Rlock 同時隻允許一個線程更改資料,而Semaphore是同時允許一定數量的線程更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人隻能等裡面有人出來了才能再進去。
一把鎖頭,n把鑰匙,你把鑰匙全部用完,再給下n個人使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
class myThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self):
semaphore.acquire()
time.sleep(5)
print("Thread-%s Time is: %s" % (self.num,time.ctime()))
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5) # 最多允許5個線程被同僚執行
for i in range(10):
thread = myThread(i)
thread.start()
執行個體:某條語句設定最多可以5個線程同時執行

Thread-4 Time is: Sat Mar 10 14:07:00 2018
Thread-3 Time is: Sat Mar 10 14:07:00 2018
Thread-2 Time is: Sat Mar 10 14:07:00 2018
Thread-1 Time is: Sat Mar 10 14:07:00 2018
Thread-0 Time is: Sat Mar 10 14:07:00 2018
Thread-9 Time is: Sat Mar 10 14:07:05 2018
Thread-5 Time is: Sat Mar 10 14:07:05 2018
Thread-7 Time is: Sat Mar 10 14:07:05 2018
Thread-6 Time is: Sat Mar 10 14:07:05 2018
Thread-8 Time is: Sat Mar 10 14:07:05 2018
Process finished with exit code 0
# 每次執行5個線程,中間間隔5秒
運作結果:看時間
事件(event)
python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
- clear:将“Flag”設定為False 預設的,當Flag為False,無論線程執行到哪裡,遇到wait後全部阻塞
- set:将“Flag”設定為True 無論在哪裡,遇到set後所有線程正常執行

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Jack.Ming
import threading
import time
class myThread(threading.Thread):
def __init__(self, num, event):
threading.Thread.__init__(self)
self.num = num
self.event = event
def run(self):
print("Thread-%s" % self.num)
self.event.wait() # 所有線程到此将被阻塞
print(self.num)
if __name__ == '__main__':
event_obj = threading.Event()
for i in range(10):
thread = myThread(i, event_obj)
thread.start()
inp = input("please input:") # 輸入true時,所有被阻塞線程恢複執行
if inp == "true":
event_obj.set()
執行個體

Thread-0
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6
Thread-7
Thread-8
Thread-9
please input:true
0
2
3
1
5
7
8
9
4
6
Process finished with exit code 0
Queue類
Python 的 Queue 子產品中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue。
這些隊列都實作了鎖原語,能夠在多線程中直接使用,可以使用隊列來實作線程間的同步。
Queue 子產品中的常用方法:
- Queue.qsize() 傳回隊列的大小
- Queue.empty() 如果隊列為空,傳回True,反之False
- Queue.full() 如果隊列滿了,傳回True,反之False
- Queue.full 與 maxsize 大小對應
- Queue.get([block[, timeout]])擷取隊列,timeout等待時間
- Queue.get_nowait() 相當Queue.get(False)
- Queue.put(item) 寫入隊列,timeout等待時間
- Queue.put_nowait(item) 相當Queue.put(item, False)
- Queue.task_done() 在完成一項工作之後,Queue.task_done()函數向任務已經完成的隊列發送一個信号
- Queue.join() 實際上意味着等到隊列為空,再執行别的操作

#!/usr/bin/python3
import queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print ("開啟線程:" + self.name)
process_data(self.name, self.q)
print ("退出線程:" + self.name)
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print ("%s processing %s" % (threadName, data))
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1
# 建立新線程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# 填充隊列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待隊列清空
while not workQueue.empty():
pass
# 通知線程是時候退出
exitFlag = 1
# 等待所有線程完成
for t in threads:
t.join()
print ("退出主線程")

開啟線程:Thread-1
開啟線程:Thread-2
開啟線程:Thread-3
Thread-3 processing One
Thread-3 processing Two
Thread-1 processing Three
Thread-2 processing Four
Thread-1 processing Five
退出線程:Thread-2
退出線程:Thread-1
退出線程:Thread-3
退出主線程
簡單應用
多線程ping操作,
ping.py:主程式
sfile:IP位址段清單檔案
dfile:ping通的位址儲存到該檔案中
sfile 檔案:存放IP網段清單,每行一個,内容如下

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Created by YangYongming at 2018/11/25 12:26
# FileName: ping.py
import threading
import IPy
import os
import queue
class MyThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self): # 定義每個線程要運作的函數
global aliveip
while True:
host = self.queue.get(timeout=2) # 從隊列中取Ip
host = str(host)
res = os.popen("ping -n 1 %s" % host)
os.popen("exit\n")
if "TTL" in res.read():
print("%s is Alive" % host)
lock.acquire()
with open("dfile", "a", encoding="utf-8") as f2:
f2.write(host + '\n')
aliveip.append(host)
lock.release()
self.queue.task_done()
if self.queue.empty(): # 當隊列為空時,終止該線程
break
if __name__ == '__main__':
threadlist = []
lock = threading.RLock()
queue = queue.Queue(50) # 初始化一個隊列, 容量50
aliveip = []
for i in range(40): # 建立40個線程
t = MyThread(queue)
t.setDaemon(False) # 主線程執行完成,等待所有的前台線程執行完畢,預設[等待]False
t.start() # 線程準備就緒,等待CPU排程
with open("sfile", "r", encoding="utf-8") as f1:
for line in f1.readlines():
ip = IPy.IP(line)
for x in ip:
queue.put(x) # 向隊列裡面放IP
queue.join()
主程式
輸出結果:
将可以ping通的位址儲存到dfile檔案中,内容如下:
作者:楊永明
出處:https://www.cnblogs.com/ming5218/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。