線程
一、前言
1、概述
-
線程
在一個程序的内部 要同時幹很多件事 就需要同時執行多個子任務 我們把程序内的這些子任務稱為 線程
線程通常叫輕型的程序 線程是共享記憶體空間的并發執行的多任務 每一個線程都共享一個程序的資源
線程是最小的執行單元 而程序最少由一個線程組成的
線程的執行時間 由程序決定的
-
主線程
任何程序都會有一個預設的主線程 如果主線程死掉 子線程也死掉 是以子線程依賴于主線程
- 子產品
- _thread 低級子產品
- threading 進階子產品 對_thread進行了封裝(建議)
二、線程使用
1、_thread子產品
-
導入
import _thread
-
開啟線程
_thread.start_new_thread(函數,參數)
- 注意
- 參數為元組類型
- 如果主線程死掉了 則子線程死掉
- 如果不傳參 可以傳遞一個空元祖占位
-
執行個體
import _thread
import time
線程_thread下方需要有阻塞代碼 否則程序執行完畢 線程就死掉了def fun(): for i in range(10): print(i,'---------------') time.sleep(1) for i in range(5): _thread.start_new_thread(fun, ()) for x in range(10): time.sleep(1) print('over')
2、threading(建議)
-
導入
import threading
-
建立線程
thr = threading.Thread(target=函數名, args=(參數,) ,name='名字')
- target 函數名 要執行的任務
- args 參數 傳遞給任務的參數
- name 給線程起名稱
- 設定線程名稱
- 執行個體化的時候傳遞name參數
- thr.setName(名稱)
-
開啟線程
thr.start()
-
線程等待
thr.join()
- 傳回目前線程對象
- threading.current_thread()
- threading.currentThread()
- 擷取線程名稱
- threading.current_thread().name
- threading.current_thread().getName()
-
擷取所有線程的名稱
threading.enumerate() [ɪˈnjuːməreɪt]
-
設定線程守護
t.setDaemon(True) # 線程守護 是否随着主線程的結束而結束
-
執行個體
import threading
import time
def fun(i): print('線程開始', threading.current_thread().name) time.sleep(1) if __name__ == '__main__': print(threading.current_thread().name) t = threading.Thread(target=fun, args=(1,)) t.setName('name'+str(i)) # 通過setname方法進行設定線程名稱 t.start() # 開啟線程 t_list.append(t) # 循環等待 i.join() print('over')
3、開啟多線程
import threading
import time
def fun(i):
print('線程開始', threading.current_thread().name)
time.sleep(1)
if __name__ == '__main__':
print(threading.current_thread().name)
t_list = []
for i in range(5):
# 執行個體化并設定線程名稱
# t = threading.Thread(target=fun, args=(1,), name='name-'+str(i))
t = threading.Thread(target=fun, args=(1,))
t.setName('name'+str(i)) # 通過setname方法進行設定線程名稱
t.start() # 開啟線程
t_list.append(t)
# 循環等待
for i in t_list:
i.join()
print('over')
4、類的繼承 建立線程
import threading
class MyThread(threading.Thread):
def __init__(self, arg, name):
threading.Thread.__init__(self)
self.i = arg
self.name = name
def run(self):
print('我是類的繼承實作的線程', self.i, self.name)
t = MyThread(1, 2)
t.start()
t.join()
5、線程共享資料
-
概述
多線程和多程序最主要的差別在于 多程序 中 同一個變量 會被拷貝到每一個程序中 互不影響
而多線程中 所有的變量都是線程間共享的 是以 任意一個變量都可以被線程進行修改 原因就是在于線程中變量共享的
-
資料共享
import threading
import time
name = 'lucky'
def fun1(): global name name = '張三' print(threading.current_thread().name, name) time.sleep(2) def fun2(): print(threading.current_thread().name, name) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(name) print('over')
-
線程沖突
import threading
兩個線程同時對同一資料進行讀寫就會産生資料的錯亂 解決辦法 在進行寫的時候保證隻有一個線程i = 1 def change_num(num): global i i += num i -= num def fun1(): for i in range(10000000): change_num(6) def fun2(): for i in range(10000000): change_num(8) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i) print('over')
6、Lock線程鎖(資料錯亂的問題)
-
概述
lock鎖是線程子產品中的一個類 有倆個主要的方法 acquire() 和release() 當調用acquire()時 它進行鎖定 并阻塞執行 直到調用release() 以防止資料損壞 因為一次隻有一個線程進行通路資源
-
作用
避免線程沖突
- 注意
- 目前的線程鎖定以後 後面的線程會等待 (線程等待/線程阻塞)
- 需要release以後才能解鎖恢複正常
- 不能重複鎖定
-
記憶體錯亂
import threading, time
倆個線程對統一資源進行讀寫 可能會造成資料值的錯亂我麼必須保證一個線程在修改的同時其他線程不能進行操作i = 1 def fun1(): global i for x in range(1000000): i += x i -= x print('fun1----------', i) def fun2(): global i for x in range(10000000): i += x i -= x print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join()
-
線程鎖的簡單使用
import threading
# 建立線程鎖
lock = threading.Lock()
# 進行鎖定
lock.acquire()
# 進行解鎖
lock.release()
-
死鎖
import threading
lock = threading.Lock() a = 0 def sumOne(): global a lock.acquire() a += 1 # lock.release() print(a) def sumTwo(): global a print('two', a) lock.acquire() a += 1 lock.release() sumOne() sumTwo() print(a)
-
解決上面沖突問題
import threading, time
lock = threading.Lock()
i = 1
def fun1():
global i
if lock.acquire():
for x in range(1000000):
i += x
i -= x
lock.release()
print('fun1----------', i)
def fun2(): global i if lock.acquire(): for x in range(10000000): i += x i -= x lock.release() print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i)
-
Lock的簡寫
with lock:
pass
-
上面進行更改
import threading, time
lock = threading.Lock()
i = 1
def fun1():
global i
with lock:
for x in range(1000000):
i += x
i -= x
print('fun1----------', i)
def fun2(): global i with lock: for x in range(10000000): i += x i -= x print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i)
7、RLock 避免單線程死鎖
-
說明
上面的Lock不能識别Lock是被哪個線程所持有的 如果任何線程正在通路共享資源 那麼試圖通路共享資源的其他線程将被阻塞 即使鎖定共享資源的線程也是如此 如果使用RLock 就不會出現這種問題 RLock可以識别目前線程 如果是目前線程鎖定 則不會出現死鎖也就是阻塞的問題
-
作用
在同一線程内 進行多次acquire 不會被阻塞
-
線程鎖的簡單使用
import threading
# 建立線程鎖
rlock = threading.RLock()
# 進行鎖定
rlock.acquire()
# 進行解鎖
rlock.release()
-
同一線程多次鎖定
import threading
rlock = threading.RLock()
a = 0
def sumOne(): global a rlock.acquire() a += 1 rlock.acquire() rlock.release() print(a) sumOne() print(a)
-
解決沖突問題
import threading, time
rlock = threading.RLock()
i = 1
def fun1():
global i
if rlock.acquire():
for x in range(1000000):
i += x
i -= x
rlock.release()
print('fun1----------', i)
def fun2(): global i if rlock.acquire(): for x in range(10000000): i += x i -= x rlock.release() print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i)
-
rlock的簡寫
rlock = threading.RLock()
with rlock:
...
8、Local
-
概述
ThreadLocal 有的稱呼為線程本地變量 也有稱呼為線程本地存儲
ThreadLocal 在每一個變量中都會建立副本 每個線程都可以通路自己内部副本的變量
-
作用
它本身是一個全局變量 但是每個線程可以利用它來保護自己的私有資料 這些私有資料對其它的線程是不可見的
-
導入
import threading
local = threading.local()
-
使用
import threading
local = threading.local() def run(n): local.x += n local.x -= n def fun(): # 設定local.x的初始值 local.x = 0 for i in range(10000000): run(i) print(threading.current_thread().name, local.x) if __name__ == '__main__': t1 = threading.Thread(target=fun) t2 = threading.Thread(target=fun) t1.start() t2.start() t1.join() t2.join()
9、Timer定時執行
-
概述
Timer是Thread的子類 可以指定某段時間間隔後在執行某個操作
-
使用
import threading
def go(): print('小老弟 你啥時候能看見我?') # 開啟定時任務 t = threading.Timer(5, go) t.start() print('over')
10、線程通信(觸發動作)
-
作用
線程間傳遞信号
-
使用
import threading
e = threading.Event()
e.wait() # 線程等待 等待一次
e.clear() # 将wait重置 再次等待
e.set() # 觸發wait
-
執行個體
import threading
import time
def run(): # 建立了一個event對象 e = threading.Event() def go(): e.wait() # 阻塞等待 print('我的心在等待 永遠在等待') threading.Thread(target=go).start() return e e = run() time.sleep(3) e.set() # 觸發
-
觸發一次執行一次
import threading
def run(e): for i in range(5): e.wait() print('---------------', i) e.clear() if __name__ == '__main__': e = threading.Event() t = threading.Thread(target=run, args=(e, )) t.start() for i in range(5): v = input() if v == 'y': e.set()
11、信号量
-
作用
用于控制線程的數量
-
概述
信号量semaphore [ˈseməfɔːr]
管理一個内置的計數器 每次調用acquire的時候計數器減一
當調用release的時候 内置計數器加1
計數器不能小于0 當計數器為0時 acquire将進行阻塞 直到調用release
-
使用
import threading
import time
def go(): if s.acquire(): print('go go go 哦來哦哦來哦來') time.sleep(1) s.release() if __name__ == '__main__': s = threading.Semaphore(2) # 限制并發數為2 for i in range(5): threading.Thread(target=go).start()
-
簡寫
s = threading.Semaphore(2) # 限制并發數為2
with s:
...
-
import threading import time def go(): # if s.acquire(): with s: print('go go go 哦來哦哦來哦來') time.sleep(1) # s.release() if __name__ == '__main__': s = threading.Semaphore(2) # 限制并發數為2 for i in range(5): threading.Thread(target=go).start()
12、ThreadPool 線程池(老版本)
-
安裝
pip install threadpool
-
概述
啟動大量線程
-
導入
import threadpool
- 使用
-
建立線程池
threadpool.ThreadPool()
-
建立線程池任務
threadpool.makeRequests(go, name_list)
-
将線程池任務放到線程池中
for i in req:
pool.putRequest(i)
[pool.putRequest(i) for i in req]
-
阻塞等待
pool.wait()
-
-
執行個體
import threadpool
import time
def go(name): print('人生苦短 {}:快用python 一行代碼就見效'.format(name)) time.sleep(2) name_list = ['大偉', '老曹', '什麼雨', '卻什麼黑'] # 建立線程池 并發3 pool = threadpool.ThreadPool(3) # 建立線程池任務 req = threadpool.makeRequests(go, name_list) # for i in req: # pool.putRequest(i) [pool.putRequest(i) for i in req] # 循環放到線程池中 pool.wait() # 阻塞等待
13、線程池新子產品
-
導入
from concurrent.futures import ThreadPoolExecutor[ɪɡˈzekjətər]
- 方法
- submit() 傳入函數及參數
- map() 統一傳參管理
-
使用
from concurrent.futures import ThreadPoolExecutor
import time
8
def go(name): print('人生苦短 {}:快用python 一行代碼就見效'.format(name)) time.sleep(2) if __name__ == '__main__': name_list = ['大偉', '老曹', '什麼雨', '卻什麼黑'] pool = ThreadPoolExecutor(3) # for n in name_list: # pool.submit(go, n) pool.map(go, name_list) print('over')
14、隊列子產品
-
導入
import queue
-
概述
隊列子產品是python标準庫中線程安全隊列 (FIFO)first in first out 的縮寫 先進先出 用來存儲生産者和消費者之間的資訊傳遞
-
格式
queue.Queue(maxsize=0)
當參數為0 或者小于0 隊列大小沒有限制
如果為其它整數 則代表目前隊列的限制數 如果超出限定的數 則阻塞
-
簡單使用
import queue
# 執行個體化隊列
q = queue.Queue()
# q = queue.Queue(3)
# 循環将資料放入隊列
for i in range(5):
q.put(i)
# 循環取出隊列資料
while not q.empty():
print(q.get())
15、生産者消費者
import threading
import queue
import time, random
class createThread(threading.Thread):
def __init__(self, que, name):
threading.Thread.__init__(self)
self.que = que
self.name = name
def run(self):
while True:
time.sleep(3) # 3秒鐘生産一個
num = random.randint(1, 1000000)
self.que.put('生産者:'+str(self.name)+'商品:'+str(num))
print('生産者:'+str(self.name)+'商品:'+str(num))
class consumerThread(threading.Thread):
def __init__(self, que, name):
threading.Thread.__init__(self)
self.que = que
self.name = name
def run(self):
while True:
time.sleep(1) # 1秒鐘消費一個
data = self.que.get()
print('客戶:'+str(self.name)+'買到了:'+data)
if __name__ == '__main__':
que = queue.Queue(10)
for i in range(3):
createThread(que, i).start()
for i in range(7):
consumerThread(que, i).start()
三、線程VS程序
1、多任務原理
首先 要實作多任務 通常我們會設計 Master - Worker模式 Master負責配置設定任務 worker負責執行任務 是以 多任務環境下 通常是一個master 多個 worker
多程序實作Master worker 主程序則為Master 其它程序則為worker
多線程實作Master worker 主線程為Master 其它線程為worker
2、多程序
-
優點
穩定性高 :多程序的最大優點為 穩定性高 如果多個程序中有一個子程序崩潰了 則不會影響到其它的程序
Apache 就是使用的多程序
-
缺點
建立程序的代價比較大 針對于Windows Unix/Linux下影響不大的 是以說在Windows下開啟多程序開銷比較大
作業系統運作的程序數是有限的
3、多線程
-
優點
多線程的執行效率高于多程序
-
缺點
任何一個線程崩潰(挂掉) 會影響整個程序的崩潰 所有線程共享存儲空間的
四、協程
1、概念
-
協程
又稱微線程 是一種輕量級的線程
-
子程式
在所有語言中 都是層級調用的 比如 在A中調用B B在執行過程中調用了C C執行完進行傳回 B執行完傳回 最後是A執行完畢 一個函數就是執行的子程式 子程式的調用 總會有一個入口 一次傳回 調用的順序是明确的
-
了解協程
之前的線程 是由作業系統進行排程的 協程按照程式員按照需求自己排程的
-
實作
python對協程的實作是通過生成器 generator來實作的
在生成器中 不但可以作用于for循環, 還可以作用于next方法
-
優點
協程的效率極高 因為子程式 不需要線程或者程序的切換 也沒有額外的開銷
不需要多線程的鎖機制
-
缺點
無法利用多核cpu 協程的本質是單線程
-
send方法
将值發送給生成器中的yield 進行接收 并且代碼向下執行(send實作了next的方法)
-
注意
協程的啟動 需要send(None)
-
資料的傳遞
# send()
def run(): data = '123' r = yield data print('代碼1', r) r = yield data print('代碼2', r) r = yield data r = run() # print(next(r)) # print(next(r)) # print(next(r)) r.send(None) # 協程啟動 None print(r.send('a')) print(r.send('b'))
-
生産者和消費者
傳統的生産者和消費者 是一個線程寫消息 另一個讀取消息
如果改用協程的話 就由生成器來進行實作
def product(c):
c.send(None)
for i in range(10):
c.send(i)
print('生産了資料:', i)
# res = c.send(i)
# print('生産了資料:', i, res)
def con(): data = '' while True: r = yield data print('消費了:{}資料'.format(r)) if __name__ == '__main__': c = con() product(c) # print(c)