并發程式設計
并發程式設計理論居多,實際應用代碼簡單
必備知識回顧
- 計算機又叫電腦,即通電的大腦,發明計算機是為了讓它通電之後能夠像人一樣去工作,并且比人的工作效率更高,因為可以24小時不間斷
-
計算機五大組成部分
控制器
運算器
存儲器
輸入裝置
輸出裝置
計算機的核心:
(控制器 + 運算器 = 中央處理器)CPU
- 程式要想被計算機運作,它的代碼必須要先由硬碟讀到記憶體,之後CPU取出指令去執行
多道技術
單核實作并發的效果
必備知識點
-
并發
看起來像同時運作的就可以稱之為并發
-
并行
真正意義上的同時執行
空間與時間上的複用
空間上:多個程式公用一套計算機硬體
時間上:切換+儲存狀态
ps:
- 并行肯定算并發
- 單核的計算機肯定不能實作并行,但是可以實作并發
補充:假設單核就是一個核,不考慮CPU的核心數
多道技術圖解

多道技術重點知識
"""
切換(cpu)分為兩種情況
1.當一個程式遇到IO操作的時候,作業系統會剝奪該程式的CPU執行權限
作用:提高了CPU的使用率,并且也不影響程式的執行效率
2.當一個程式長時間占用CPU的時候,作業系統也會剝奪該程式的CPU執行權限
作用:降低了程式的執行效率(程式運作時間+切換時間)
"""
程序理論
程序與程式的差別
"""
程式就是一堆躺在硬碟上的代碼,是“死”的
程序則表示程式正在執行的過程,是“活”的
"""
程序排程
-
先來先服務排程算法
對長作業有利,對短作業無益
-
短作業優先排程算法
對短作業有利,對長作業無益
- 時間片輪轉法+多級回報隊列
程序運作的三狀态圖
示例:
兩對重要概念
-
同步和異步
描述的是任務的送出方式
同步:任務送出之後,原地等待任務的傳回結果,等待的過程中不做任何事(幹等)
程式層面上表現的感覺就是卡住了
異步:任務送出之後,不原地等待任務的傳回結果,直接去做其他事
我送出的任務結果如何擷取?
任務的傳回結果會有一個異步回調機制自動處理
-
阻塞非阻塞
描述的程式的運作狀态
阻塞:阻塞态
非阻塞:就緒态、運作态
上述概念的組合:最高效的一種組合就是 異步非阻塞
理想狀态:我們寫的代碼永遠處于就緒态和運作态之間切換(但基本不可能)
開啟程序的兩種方式
# 第一種 直接定義函數 常用
from multiprocessing import Process
import time
def task(name):
print('%s is running' % name)
time.sleep(3)
print('%s is over' % name)
if __name__ == '__main__':
# 1、建立一個程序對象
p = Process(target=task, args=('jason',))
"""
target=要運作的函數
args=(函數需要的參數)
"""
# 2、開啟程序
p.start() # 告訴作業系統幫你建立一個程序 異步
print('主')
"""
windows作業系統下 建立程序一定要在main内建立
因為windows下建立程序類似于子產品導入的方式
會從上往下依次執行代碼
Linux中則是直接将代碼完整拷貝一份
"""
# 第二種方式 類的繼承 不常用
from multiprocessing import Process
import time
class MyProcess(Process): # 定義一個類繼承自Process,類名可以自定義
def run(self): # 将要運作的功能寫入函數,函數名必須叫run!!
print('hello beautiful girl!')
time.sleep(1)
print('get out!')
if __name__ == '__main__':
p = MyProcess()
p.start()
print('主')
總結
建立程序就是在記憶體中申請一塊記憶體空間将需要運作的代碼丢進去
一個程序對應在記憶體中就是一塊獨立的記憶體空間
多個程序對應在記憶體中就是多塊獨立的記憶體空間
程序與程序之間資料預設情況下是無法直接互動,想互動可以借助第三方工具、子產品
join方法
join是讓主程序的代碼等待子程序代碼運作結束之後,再繼續運作。不影響其他子程序的執行
from multiprocessing import Process
import time
def task(name, n):
print('%s is running' % name)
time.sleep(n)
print('%s is over' % name)
if __name__ == '__main__':
# p1 = Process(target=task, args=('jason', 1))
# p2 = Process(target=task, args=('egon', 2))
# p3 = Process(target=task, args=('tank', 3))
# start_time = time.time()
# p1.start()
# p2.start()
# p3.start() # 僅僅是告訴作業系統要建立程序
# p1.join() # 主程序等待子程序p運作結束之後再繼續往後執行
# p2.join()
# p3.join()
start_time = time.time()
p_list = []
for i in range(1, 4):
p = Process(target=task, args=('子程序%s' % i, i))
p.start()
p_list.append(p) # 将起起來的所有程序對象放入一個清單中
for p in p_list:
p.join() # 主程序等待每一個子程序p運作結束之後再繼續往後執行
print('主', time.time() - start_time)
>>>:子程序1 is running
子程序3 is running
子程序2 is running
子程序1 is over
子程序2 is over
子程序3 is over
主 3.059741973876953
程序間資料隔離(預設情況下)
from multiprocessing import Process
money = 100
def task():
global money # 局部修改全局
money = 666
print('子', money)
if __name__ == '__main__':
p = Process(target=task)
p.start()
p.join()
print(money)
程序對象及其他方法
一台計算機上面運作着很多程序,那麼計算機是如何區分并管理這些程序服務端的呢?
計算機會給每一個運作的程序配置設定一個PID号
如何檢視
Windows 進入 cmd 輸入
tasklist
檢視全部程序,
tasklist |findstr PID
檢視具體的程序(PID是具體的程序号)
Mac 進入 終端 輸入
ps aux
ps aux|grep PID
from multiprocessing import Process, current_process
import os
current_process().pid # 檢視目前程序号
os.getpid() # 檢視目前程序号
os.getppid() # 檢視目前程序的父程序号
from multiprocessing import Process, current_process
import time
def task():
print('%s is running' % current_process().pid) # current_process().pid 檢視目前程序的程序号
time.sleep(3)
if __name__ == '__main__':
p = Process(target=task)
p.start()
print('主', current_process().pid)
from multiprocessing import Process
import time
import os
def task():
print('%s is running' % os.getpid()) # 檢視目前程序的程序号
print('子程序的主程序号:%s' % os.getppid()) # 檢視目前程序的父程序的程序号
time.sleep(3)
if __name__ == '__main__':
p = Process(target=task)
p.start()
print('主', os.getpid())
print('主主', os.getppid())
p.terminate() # 殺死目前程序
# 告訴作業系統幫你去殺死目前程序 但是需要一定的時間 而代碼的運作速度極快
time.sleep(0.1) # 加入一個 time.sleep() 就可以得到正确結果
print(p.is_alive()) # 判斷目前程序是否存活
僵屍程序與孤兒程序
-
僵屍程序
死了但是沒有死透
當你開設了子程序之後 該程序死後不會立刻釋放占用的程序号
因為我要讓父程序能夠檢視到它開設的子程序的一些基本資訊(占用的pid号、運作時間等)
所有的程序都會步入僵屍程序
有害的情況:父程序不死并且在無限制的建立子程序并且子程序也不結束
回收子程序占用的pid号兩種情況:
1、父程序等待子程序運作結束
2、父程序調用 join 方法(等同于等待子程序運作結束)
-
孤兒程序
子程序存活,父程序意外死亡
作業系統會開設一個“兒童福利院”專門管理孤兒程序回收相關資源
守護程序
from multiprocessing import Process
import time
def task(name):
print('%s 總管正在活着' % name)
time.sleep(3)
print('%s 總管正在死亡' % name)
if __name__ == '__main__':
p = Process(target=task, args=('縱觀',))
p.daemon = True # 将程序 p 設定成守護程序 這一句代碼要放在 p.start 上面才有效
p.start()
print('皇帝壽終正寝')
互斥鎖
多個程序操作同一份資料的時候,會出現資料錯亂的問題
針對上述問題,解決方式就是加鎖處理:将并發變成串行,犧牲效率但是保證了資料的安全
from multiprocessing import Process, Lock
import json
import time
import random
# 查票
def search(i):
# 檔案操作讀取票數
with open('data', 'r', encoding='utf8') as f:
dic = json.load(f)
print('使用者 %s 查詢餘票:%s' % (i, dic.get('ticket_num'))) # 字典取值不要用[],要用.get()
# 買票 1、先查 2、再買
def buy(i):
# 先查票
with open('data', 'r', encoding='utf8') as f:
dic = json.load(f)
# 模拟網絡延遲
time.sleep(random.randint(1, 3))
# 判斷目前是否有票
if dic.get('ticket_num') > 0:
# 修改資料庫買票
dic['ticket_num'] -= 1
# 寫入資料庫
with open('data', 'w', encoding='utf8') as f:
json.dump(dic, f)
print('使用者 %s 買票成功!' % i)
else:
print('使用者 %s 買票失敗' % i)
# 整合上面兩個函數
def run(i, mutex):
search(i)
# 給買票環節加鎖處理
# 搶鎖
mutex.acquire() # 所有人随機搶鎖,一個人搶到,程式繼續運作
buy(i)
# 釋放鎖
mutex.release() # 買完票,将鎖釋放,供剩下的人搶
if __name__ == '__main__':
# 在主程序中生成一把鎖 讓所有的子程序搶 誰先搶到誰先買票
mutex = Lock()
for i in range(1, 11):
p = Process(target=run, args=(i, mutex))
p.start()
注意:
1、鎖不要輕易使用,容易造成死鎖現象(一般不會用到,都是内部封裝好的)
2、鎖隻在處理資料的部分來保證資料安全(隻在争搶資料的環節加鎖處理即可)
程序間通信
程序之間是無法直接進行資料互動的,但是可以通過隊列或管道實作資料互動
隊列Queue子產品
管道:subprocess(stdin stdout stderr)
隊列:管道 + 鎖
隊列:先進先出
堆棧:先進後出
from multiprocessing import Queue
# import queue 也可以
# 建立一個隊列
q = Queue(5) # 括号内可以傳數字 表示生成的隊列最大可以同時存放的資料量
# 往隊列中存資料
q.put(111)
q.put(222)
q.put(333)
print(q.full()) # 判斷目前隊列是否滿了
print(q.empty()) # 判斷目前隊列是否空了
q.put(444)
q.put(555)
print(q.full()) # 判斷目前隊列是否滿了
# q.put(666) # 當隊列資料放滿了之後 如果還有資料要放 程式會阻塞 直到有位置讓出來
# 去隊列中取資料
v1 = q.get()
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
print(q.empty()) # 判斷目前隊列是否空了
# v6 = q.get_nowait() # 沒有資料直接報錯 queue.Empty
# v6 = q.get(timeout=3) # 沒有資料之後原地等待三秒 沒有再報錯
# v6 = q.get() # 隊列中如果已經沒有資料的話 get方法會原地阻塞
# print(v1, v2, v3, v4, v5, v6)
"""
q.full()
q.empty()
q.get_nowait()
在多程序的情況下是不精确的
本地測試的時候才可能會用到Queue,實際生産用的都是别人封裝好的功能非常強大的工具
"""
IPC機制
研究思路
1、主程序跟子程序借助于隊列通信:
from multiprocessing import Queue, Process
def producer(q):
q.put('我是23号技師 很高興為您服務') # 在子程序中向主程序的隊列中添加值
print('hello big baby')
if __name__ == '__main__':
q = Queue() # 在主程序中建立一個隊列
p = Process(target=producer, args=(q,)) # 将producer函數當作子程序建立
p.start()
print(q.get()) # 在主程序中擷取并列印隊列裡的值
2、子程序跟子程序借助于隊列通信:
from multiprocessing import Queue, Process
def producer(q):
q.put('我是23号技師 很高興為您服務') # 往隊列裡放資料
def consumer(q):
print(q.get()) # 在子程序中擷取隊列中的資料
if __name__ == '__main__':
q = Queue() # 在主程序中建立一個隊列
p = Process(target=producer, args=(q,)) # 将producer函數當作子程序建立
p1 = Process(target=consumer, args=(q,)) # 将consumer函數當作子程序建立
p.start()
p1.start()
生産者消費者模型
生産者:生産/制造東西的
消費者:消費/處理東西的
該模型除了上述兩個之外還需要一個媒介
生活中的例子:做包子的将包子做好後放在蒸籠裡,買包子的去蒸籠裡拿
廚師做好的菜用盤子裝着,給消費者端過去
生産者和消費者之間不是直接做互動的,而是借助于媒介做互動
生産者(做包子的)+ 消息隊列(蒸籠)+ 消費者(吃包子的)
消息隊列的存在是為了解決供需不平衡的問題
from multiprocessing import Process, Queue, JoinableQueue
import time
import random
# 生産者
def producer(name, food, q):
for i in range(5):
data = '%s 生産了 %s%s' % (name, food, i)
# 模拟延遲
time.sleep(random.randint(1, 3))
print(data)
# 将資料放入隊列中
q.put(data)
# 消費者
def consumer(name, q):
# 消費者胃口很大 CD光牒行動
while True:
food = q.get() # 沒有資料就會卡住
# 判斷目前是否有結束的辨別
# if food is None: break
time.sleep(random.randint(1, 3))
print('%s吃了%s' % (name, food))
q.task_done() # 告訴隊列你已經從裡面取出了一個資料并且處理完畢了
if __name__ == '__main__':
# q = Queue()
q = JoinableQueue()
p1 = Process(target=producer, args=('大廚1号', '包子', q))
p2 = Process(target=producer, args=('大廚2号', '泔水', q))
c1 = Process(target=consumer, args=('春哥', q))
c2 = Process(target=consumer, args=('新哥', q))
p1.start()
p2.start()
# 将消費者設定成守護程序
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
# 等待生産者生産完畢之後 往隊列中添加特定的結束符号
# q.put(None) # 肯定在所有生産者生産的資料的末尾
# q.put(None) # 有兩個消費者 是以要放兩個 None
q.join() # 等待隊列中所有的資料被取完了再往下執行代碼
"""
JoinableQueue 每當你往該隊列中存入資料的時候 内部會有一個計數器+1
每當你調用 task_done 的時候 計數器-1
q.join() 當計數器為0的時候 才往後運作
"""
# 隻要 q.join() 執行完畢 說明消費者已經處理完資料了 消費者就沒有存在的必要了
# 是以可以在主程式中将消費者設定為守護程序
線程理論
緻命三問
-
什麼是線程
程序:資源機關
線程:執行機關
- 将作業系統比喻成一個大的工廠,程序就相當于工廠裡面的工廠中的房間,線程就是工廠中的房間裡面的流水線
- 每一個程序肯定自帶一個線程
總結:起一個程序隻是在記憶體空間中開辟一塊獨立的空間,真正被CPU執行的其實是程序裡面的線程,線程指的就是代碼的執行過程,執行代碼中所需要使用到的資源都找所在的程序要。
程序和線程都是虛拟機關,隻是為了我們更加友善的描述問題
-
為何要有線程
開設程序
1、申請記憶體空間 耗資源
2、“拷貝代碼” 耗資源
開線程
一個程序内可以開設多個線程,在用一個程序内開多個線程無需再次申請記憶體空間及拷貝代碼操作
總結:開線程比開程序省資源
同一個程序下的線程,資料是共享的
-
如何使用
開啟線程的兩種方式:
TCP服務端實作并發的效果:# # 第一種方式 from multiprocessing import Process from threading import Thread import time def task(name): print('%s is running' % name) time.sleep(1) print('%s is over' % name) # 開啟線程不需要在main下面執行代碼 直接書寫就可以 t = Thread(target=task, args=('egon',)) t.start() # 建立線程的開銷非常小 幾乎是代碼一執行線程就已經建立了 print('主') ############################################################ # 第二種方式 from threading import Thread import time class MyThead(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s is running' % self.name) time.sleep(1) print('egon DSB') if __name__ == '__main__': t = MyThead('egon') # 開一個線程 t.start() print('主')
""" 服務端 1、要有固定的IP和端口 2、24小時不間斷提供服務 3、能夠支援并發 """ import socket from threading import Thread from multiprocessing import Process server = socket.socket() # 括号内不加參數預設就是tcp協定 server.bind(('127.0.0.1', 8080)) server.listen(5) # 将服務的代碼單獨封裝成一個函數 def talk(conn): # 通信循環 while True: try: data = conn.recv(1024) if len(data) == 0: break print(data.decode('utf-8')) conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close() # 連結循環 while True: conn, addr = server.accept() # 接客 # 叫其他人來服務客戶 t = Thread(target=talk, args=(conn,)) # t = Process(target=talk, args=(conn,)) # 開程序也是一樣的效果 t.start() ########################################################### """ 用戶端 """ import socket client = socket.socket() client.connect(('127.0.0.1', 8080)) while True: client.send(b'hello world') data = client.recv(1024) print(data.decode('utf-8'))
線程對象的join方法
from threading import Thread
import time
def task(name):
print('%s is running' % name)
time.sleep(3)
print('%s is over' % name)
if __name__ == '__main__':
t = Thread(target=task, args=('egon',))
t.start()
t.join() # 主線程等待子線程運作結束再執行
print('主')
同一個程序下的多個線程 資料是共享的
from threading import Thread
import time
money = 100
def task():
global money
money = 666
print(money)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
t.join()
print(money)
線程對象屬性及其他方法
from threading import Thread, active_count, current_thread
import time
def task(n):
# print('hello world', os.getpid())
print('hello world', current_thread().name) # 檢視目前所線上程的名字
time.sleep(n)
if __name__ == '__main__':
t = Thread(target=task, args=(1,))
t1 = Thread(target=task, args=(2,))
t.start()
t1.start()
t1.join() # 等待 t1 運作完畢後再運作後面的代碼
# print('主', os.getpid())
# print('主', current_thread().name)
print('主', active_count()) # 統計目前正在活躍的線程數
守護線程
from threading import Thread
import time
def task(name):
print('%s is running' % name)
time.sleep(1)
print('%s is over' % name)
if __name__ == '__main__':
t = Thread(target=task, args=('xxxx',))
t.daemon = True # 将 t 線程變為守護線程 隻要主線程結束了 t線程也結束
t.start()
print('主')
"""
主線程運作結束之後不會立刻結束 會等待所有其他非守護線程結束才會結束
因為主線程的結束意味着所在的程序的結束
"""
稍微有點迷惑性的例子:
from threading import Thread
import time
def foo():
print('123')
time.sleep(1)
print('end123')
def func():
print('456')
time.sleep(3)
print('end456')
if __name__ == '__main__':
t1 = Thread(target=foo)
t2 = Thread(target=func)
t1.daemon = True
t1.start()
t2.start()
print('主.......')
線程互斥鎖
from threading import Thread, Lock
import time
money = 100
mutex = Lock() # 先生成一把鎖
def task():
global money
mutex.acquire() # 在搶資料的代碼前加鎖
tmp = money
time.sleep(0.1)
money = tmp - 1
mutex.release() # 搶完以後釋放鎖
if __name__ == '__main__':
t_list = []
for i in range(100): # 起100個線程
t = Thread(target=task)
t.start()
t_list.append(t) # 先将線程逐一添加進清單中以備後續統一操作
for t in t_list:
t.join() # 後續統一操作:保證每一個線程運作完畢後再結束主線程
print(money)
GIL全局解釋器鎖
Ps:小猿取經 - 部落格園 密碼:xiaoyuanqujing@666
定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
python解釋器有多個版本
Cpython
Jpython
Pypypython
但是普遍使用的都是Cpython解釋器
在Cpython解釋器中GIL是一把互斥鎖,用來阻止同一個程序下的多個線程的同時執行
同一個程序下的多個線程無法利用多核優勢!!!
疑問:python的多線程是不是一點用都沒有???
因為python中的記憶體管理不是線程安全的
記憶體管理(垃圾回收機制)
1、引用計數
2、标記清除
3、分代回收
重點:
1、GIL不是python的特點,而是Cpython解釋器的特點
2、GIL是保證解釋器級别的資料的安全(即線程與線程之間的資料的安全)
3、GIL會導緻同一個程序下的多個線程無法同時執行即無法利用多核優勢
4、針對不同的資料還是需要加不同的鎖處理
5、解釋性語言的通病:同一個程序下多個線程無法利用多核優勢
GIL與普通互斥鎖的差別
from threading import Thread, Lock
import time
money = 100
mutex = Lock() # 先生成一把鎖
def task():
global money
# with mutex: # 等同于下面的寫法
# tmp = money
# time.sleep(0.1)
# money = tmp - 1
mutex.acquire() # 在搶資料的代碼前加鎖
tmp = money
time.sleep(0.1)
money = tmp - 1
mutex.release() # 搶完以後釋放鎖
if __name__ == '__main__':
t_list = []
for i in range(100): # 起100個線程
t = Thread(target=task)
t.start()
t_list.append(t) # 先将線程逐一添加進清單中以備後續統一操作
for t in t_list:
t.join() # 後續統一操作:保證每一個線程運作完畢後再結束主線程
print(money)
"""
100個線程起來之後要先去搶GIL
我進入IO GIL自動釋放 但是我手上還有一把互斥鎖
其他線程雖然搶到了GIL但是搶不到互斥鎖
最終GIL還是回到我的手上 我去操作資料
"""
同一個程序下的多線程無法利用多核優勢,是不是就沒有用了
多線程是否有用要看具體情況
單核:四個任務(IO密集型\計算密集型)
多核:四個任務(IO密集型\計算密集型)
計算密集型:每個任務都需要10s
單核:(不用考慮了,時代已過)
多程序:額外的消耗資源
多線程:節省資源
多核:
多程序:總耗時 10s+
多線程:總耗時 40s+
IO密集型:
多程序:相對浪費資源
多線程:更加節省資源
代碼驗證
# 計算密集型
from multiprocessing import Process
from threading import Thread
import os, time
def work():
res = 0
for i in range(10000000):
res *= i
if __name__ == '__main__':
l = []
print(os.cpu_count()) # 擷取目前計算機CPU個數
start_time = time.time()
for i in range(8):
# p = Process(target=work) # 0.4691619873046875 多程序更快
t = Thread(target=work) # 1.6921021938323975
# p.start()
t.start()
# l.append(p)
l.append(t)
for p in l:
p.join()
print(time.time() - start_time)
# IO密集型
from multiprocessing import Process
from threading import Thread
import os, time
def work():
time.sleep(2)
if __name__ == '__main__':
l = []
print(os.cpu_count()) # 擷取目前計算機CPU個數
start_time = time.time()
for i in range(2000):
# p = Process(target=work) # 14.634779930114746
t = Thread(target=work) # 2.142601251602173
# p.start()
t.start()
# l.append(p)
l.append(t)
for p in l:
p.join()
print(time.time() - start_time)
總結:
多程序和多線程各有自己的優勢
通常可以 多程序下面開設多線程
這樣的話既可以利用多核也可以節省資源消耗
死鎖(了解)
當你知道鎖的使用(搶鎖必須要釋放鎖),其實你在操作鎖的時候也極其容易産生死鎖現象(整個程式卡死、阻塞)
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
# 類隻要加括号多次 産生的肯定是不同的對象
# 如果你想要實作多次加括号等到的是相同的對象 ---> 單例模式
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('%s 搶到A鎖' % self.name) # 擷取目前線程名
mutexB.acquire()
print('%s 搶到B鎖' % self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('%s 搶到B鎖' % self.name)
time.sleep(2)
mutexA.acquire()
print('%s 搶到A鎖' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
遞歸鎖(了解)
特點:
可以被連續的acquire和release
但是隻能被第一個搶到這把鎖執行上述操作
它的内部有一個計數器 每acquire一次計數加一 每release一次計數減一
隻要計數不為0 其他人都無法搶到該鎖
# 遞歸鎖
from threading import Thread, Lock, RLock
import time
mutexA = mutexB = RLock()
# 類隻要加括号多次 産生的肯定是不同的對象
# 如果你想要實作多次加括号等到的是相同的對象 ---> 單例模式
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('%s 搶到A鎖' % self.name) # 擷取目前線程名
mutexB.acquire()
print('%s 搶到B鎖' % self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('%s 搶到B鎖' % self.name)
time.sleep(2)
mutexA.acquire()
print('%s 搶到A鎖' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
信号量(了解)
信号量在不同的階段可能對應不同的技術點
在并發程式設計中信号量指的是鎖!
如果我們将互斥鎖比喻成一個廁所的話
那麼信号量就相當于多個廁所
from threading import Thread, Semaphore
import time
import random
sm = Semaphore(5) # 括号内寫數字 寫幾就表示開設幾個坑位
def task(name):
sm.acquire()
print('%s 正在蹲坑' % name)
time.sleep(random.randint(1, 5))
sm.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task, args=('傘兵%s号' % i,))
t.start()
Event事件(了解)
一些程序/線程需要等待另外一些程序/線程運作完畢之後才能運作,類似于發射信号一樣
from threading import Thread, Event
import time
event = Event() # 造了一個紅綠燈
def light():
print('紅燈亮着的')
time.sleep(3)
print('綠燈亮了')
event.set() # 告訴等紅燈的人可以走了
def car(name):
print('%s 号車正在等紅燈' % name)
event.wait() # 等待 event.set() 被觸發
print('%s 号車加油門開走了..' % name)
if __name__ == '__main__':
t = Thread(target=light) # 設定一個紅綠燈
t.start()
for i in range(1, 21):
t = Thread(target=car, args=('%s' % i,))
t.start()
線程q(了解)
"""
同一個程序下多個線程 資料是共享的
為什麼同一個程序下還會使用隊列來共享資料呢
因為同一程序下的多個線程共享資料時,資料是不安全的
隊列:
管道 + 鎖
是以用隊列是為了保證資料的安全
"""
import queue
# 我們現在使用的隊列(queue)都是隻能在本地測試使用
# 1 隊列q 先進先出
q = queue.Queue(3)
q.put(1)
q.get()
q.get_nowait()
q.get(timeout=3)
q.full()
q.empty()
##############################################
# 2 後進先出q
q = queue.LifoQueue(3) # last in first out
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3
##############################################
# 3 優先級q (你可以給放入隊列中的資料設定進出的優先級)
q = queue.PriorityQueue(4)
q.put((10, '111'))
q.put((100, '222'))
q.put((0, '333'))
q.put((-5, '444'))
print(q.get()) # (-5, '444')
# put括号内放一個元組 第一個放數字表示優先級 第二個放資料
# 需要注意的是 數字越小優先級越高!!!
程序池與線程池(掌握)
先回顧tcp服務端實作并發的效果是怎麼玩的?
之前是每來一個人就開設一個程序或者線程去處理
"""
無論是開程序還是線程 都要消耗資源
隻不過線程的資源稍微小一點而已
我們是不可能做到無限制的開設程序和線程 因為計算機硬體的資源跟不上!
硬體的開發速度遠遠趕不上軟體
我們的宗旨應該是在保證計算機硬體能夠正常工作的情況下最大限度的利用它
"""
池的概念
什麼是池?
池是用來保證計算機硬體安全的情況下最大限度的利用計算機
它降低了程式的運作效率,但是保證了計算機硬體的安全,進而讓程式能夠正常運作
基本使用:
# 線程池
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(5) # 變量名随便起 參數 5 表示池子裡面固定隻有五個線程
# 括号内可以傳數字 不傳的話預設會開設目前計算機CPU個數五倍的線程
"""
池子造出來之後 裡面會固定存在五個線程
這五個線程不會出現重複建立和銷毀的過程
這樣也節省了重複建立線程的過程 ---> 節省資源
池子的使用非常的簡單
你隻需要将需要做的任務往池子中送出即可
"""
def task(n):
print(n)
time.sleep(2)
return n*n
"""
任務的送出方式分兩種
同步:
異步:
"""
# pool.submit(task, 1) # 朝池子中送出任務 異步送出
# print('主')
t_list = []
for i in range(20): # 朝池子中送出20個任務
res = pool.submit(task, i)
# print(res.result()) # submit類中的 result 方法 這行代碼會導緻程式同步送出
t_list.append(res)
# 等待線程池中所有的任務執行完畢之後再列印清單中的結果
pool.shutdown() # 關閉線程池 等待線程池中所有的任務運作完畢
for t in t_list:
print('>>>: ', t.result()) # 有序輸出
但是上述線程池的代碼用清單的方式有點别扭,可以用下面的方法
最終實作一有結果立刻擷取并列印
# 程序池
from concurrent.futures import ProcessPoolExecutor
import time
pool = ProcessPoolExecutor()
# 括号内可以傳數字 不傳的話預設會開設目前計算機CPU個數的程序
"""
池子造出來之後 裡面會固定存在幾個程序
這幾個程序不會出現重複建立和銷毀的過程
池子的使用非常的簡單
你隻需要将需要做的任務往池子中送出即可
"""
def task(n):
print(n)
time.sleep(2)
return n*n
def call_back(n): # n ==> pool.submit(task, i) 傳回的對象
print('call_back: ', n.result()) # 對象的值用 .result() 檢視
"""
任務的送出方式分兩種
同步:
異步:送出後的傳回結果應該通過回調來擷取
回調機制:就相當于給每個異步任務綁定了一個定時炸彈,一旦任務有結果立刻觸發
"""
if __name__ == '__main__':
for i in range(20): # 朝池子中送出20個任務
# res = pool.submit(task, i)
pool.submit(task, i).add_done_callback(call_back)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor() # 程序池
pool = ThreadPoolExecutor(5) # 線程池
res = pool.submit(task, i).add_done_callback(call_back)
協程
協程:這個概念完全是程式員自己意淫出來的 根本不存在
為的是“單線程下實作并發”
我們程式員自己在代碼層面上檢測我們所有的IO操作
一旦遇到IO 在代碼級别完成切換
這樣給CPU的感覺是你這個程式一直在運作 沒有IO
進而提升程式的運作效率
切換+儲存狀态
CPU兩種情況下切換
1、程式遇到IO
2、程式長時間占用
适合的例子:
tcp服務端
accept
recv
代碼如何做到 切換+儲存狀态?
切換:切換不一定提升效率 也有可能降低效率
IO切 提升
沒有IO切 降低
儲存狀态:儲存上一次執行的狀态 下一次接着上一次的操作繼續往後執行
yield
驗證切換是否就一定提升效率
# 串行執行計算密集型的任務
import time
def func1():
for i in range(100000000):
i + 1
def func2():
for i in range(100000000):
i + 1
start_time = time.time()
func1()
func2()
print(time.time() - start_time)
# 切換 + yield
import time
def func1():
while True:
100000000 + 1
yield
def func2():
g = func1() # 先初始化出生成器
for i in range(100000000):
i + 1
next(g)
start_time = time.time()
func2()
print(time.time() - start_time)
gevent子產品(了解)
可以檢測程式的IO操作
from gevent import monkey; monkey.patch_all()
import time
from gevent import spawn
"""
gevent子產品本身無法檢測常見的一些IO操作
在使用的時候需要你額外的導入一句話
from gevent import monkey
monkey.patch_all()
又由于上面兩句話在使用gevent子產品時是肯定要導入的
是以還支援簡寫
from gevent import monkey; monkey.patch_all()
"""
def heng():
print('哼')
time.sleep(2)
print('哼')
def ha():
print('哈')
time.sleep(3)
print('哈')
def heiheihei():
print('嘿嘿嘿')
time.sleep(5)
print('嘿嘿嘿')
start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join() # 等待被檢測的任務執行完畢 再往後繼續執行
g2.join()
g3.join()
print(time.time() - start_time) # 5.001976728439331
協程實作tcp服務端并發
# 服務端
from gevent import monkey; monkey.patch_all()
import socket
from gevent import spawn
def communication(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0:break
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
def server(ip, port):
server = socket.socket()
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
spawn(communication, conn)
if __name__ == '__main__':
g1 = spawn(server, '127.0.0.1', 8080)
g1.join()
# 用戶端
from threading import Thread, current_thread
import socket
def x_client():
client = socket.socket()
client.connect(('127.0.0.1', 8080))
n = 0
while True:
msg = '%s say hello %s' % (current_thread().name, n)
n += 1
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
if __name__ == '__main__':
for i in range(500):
t = Thread(target=x_client)
t.start()
理想狀态:
我們可以通過
多程序下面開設多線程
多線程下面開設協程
進而使我們的程式執行效率提升
IO模型簡介
我們這裡研究的IO模型都是針對網絡IO的
Stevens在文章中一共比較了五種IO Model:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路複用
* signal driven IO 信号驅動IO
* asynchronous IO 異步IO
由signal driven IO(信号驅動IO)在實際中并不常用,是以主要介紹其餘四種IO Model
1)等待資料準備(Waiting for the data to be ready)
2)将資料從核心拷貝到程序中(Copying the data from the kernel to the process)
同步異步
常見的網絡狀态:
recvftom
send雖然它也有IO行為 但是不在我們的考慮範圍
阻塞IO
我們之前寫的都是阻塞IO模型 協程除外
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:
conn, addr = server.accept()
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
print(data)
conn.send(data.upper())
except ConnectionResetError as e:
break
conn.close()
# 在服務端無論是開設多程序、多線程,還是程序池、線程池,其實還是沒有解決IO問題
"""
該等的地方還是得等 沒有規避
隻不過多個人等待的彼此互不幹擾
"""
非阻塞IO
"""
要自己實作一個非阻塞IO模型
"""
# 服務端
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False) # 參數預設是True 改為False會将所有的網絡阻塞變為非阻塞
r_list = []
del_list = []
while True:
try:
conn, addr = server.accept()
r_list.append(conn)
except BlockingIOError:
for conn in r_list:
try:
data = conn.recv(1024) # 沒有消息 報錯
if len(data) == 0: # 用戶端斷開連結
conn.close() # 關閉conn
# 将無用的conn從r_list删除
del_list.append(conn)
continue
conn.send(data.upper()) # 給用戶端傳回大寫
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
del_list.append(conn)
# 回收無用的連結
for conn in del_list:
r_list.remove(conn)
del_list.clear()
############################################################
# 用戶端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while True:
client.send(b'hello world')
data = client.recv(1024)
print(data)
雖然非阻塞IO給你的感覺非常的牛逼,但是該模型會長時間占用着CPU并且不幹活(讓CPU不停的空轉)
我們實際應用中也不會考慮使用非阻塞IO模型
但是任何的技術點都有它存在的意義
實際應用或者是思想借鑒
IO多路複用
當監管的對象隻有一個的時候 其實IO多路複用連阻塞IO都比不上!
但是IO多路複用可以一次性監管很多個對象:
server = socket.socket()
conn, add = server.accept()
監管機制是作業系統本身就有的 如果你想用該監管機制(select)
需要你導入對應的select子產品
# 服務端
import socket
import select
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
read_list = [server]
while True:
r_list, w_list, x_list = select.select(read_list, [], [])
for i in r_list:
"""針對不同的對象做不同的處理"""
if i is server:
conn, addr = i.accept()
read_list.append(conn) # conn 對象也應該添加到監管的隊列中
else:
res = i.recv(1024)
if len(res) == 0:
i.close()
# 将無效的監管對象 移除
read_list.remove(i)
continue
print(res)
i.send(b'heiheihei')
# 用戶端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while True:
client.send(b'hello world')
data = client.recv(1024)
print(data)
監管機制其實有很多:
select機制 ==> Windows Linux 都有
poll機制 ===> 隻在 Linux 有,poll和select都可以監管多個對象 但是poll監管的數量更多
上述select和poll機制都不是很完美 當監管的對象特别多的時候 可能會出現極其大的延遲響應
epoll機制 ===> 隻在 Linux 有
它給每一個監管對象都綁定了一個回調機制,一旦有響應 回調機制立刻發起提醒
針對不同的作業系統還需要考慮不同的檢測機制 書寫代碼太過繁瑣
有一個子產品能夠根據你跑的平台的不同自動幫你選擇對應的監管機制 => selectors子產品
異步IO
異步IO模型是所有模型中效率最高的,也是使用最廣泛的
相關的子產品和架構
子產品:asyncio子產品
import threading
import asyncio
async def hello():
print('hello world %s' % threading.current_thread())
await asyncio.sleep(1) # 此處模拟真正的IO操作的耗時
print('hello world %s' % threading.current_thread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()