天天看點

Python常用子產品 之 threading和Thread子產品 第二階段 線程通信及隊列基操

目錄:

  • ​​每篇前言:​​
  • ​​1. 線程通信​​
  • ​​1.1 互斥鎖:​​
  • ​​1.2 線程間全局變量的共享:​​
  • ​​1.3 共享記憶體間存在競争問題:​​
  • ​​1.4 使用鎖來控制共享資源的通路:​​
  • ​​分析此階段,我們會發現程序和線程的痛點!!!​​
  • ​​2. 隊列的基本概念​​

每篇前言:

作者介紹:【孤寒者】—全棧領域優質創作者、HDZ核心組成員、華為雲享專家Python全棧領域部落客、原力計劃作者
  • 本文已收錄于Python全棧系列專欄:​​《Python全棧基礎教程》​​
  • 熱門專欄推薦:​​《Django架構從入門到實戰》、​​​​《爬蟲從入門到精通系列教程》、​​​​《爬蟲進階》、​​​​《前端系列教程》、​​​​《tornado一條龍+一個完整版項目》。​​
  • 本專欄面向廣大程式猿,為的是大家都做到Python從入門到精通,同時穿插有很多很多習題,鞏固學習。
  • 加入我一起學習進步,一個人可以走的很快,一群人才能走的更遠!

1. 線程通信

1.1 互斥鎖:

在多線程中 , 所有變量對于所有線程都是共享的 , 是以 , 線程之間共享資料最大的危險在于多個線程同時修改一個變量 , 那就亂套了 , 是以我們需要互斥鎖 , 來鎖住資料。

1.2 線程間全局變量的共享:

注意:

  • 因為線程屬于同一個程序,是以它們之間共享記憶體區域。是以全局變量是公共的。
# -*- coding: utf-8 -*-
"""
__author__ = 孤寒者
"""
import threading

a = 1

def func():
    global  a
    a = 2

t = threading.Thread(target=func)
t.start()
t.join()

print(a)      
Python常用子產品 之 threading和Thread子產品 第二階段 線程通信及隊列基操

1.3 共享記憶體間存在競争問題:

先來個正常的例子,不用多線程:

# -*- coding: utf-8 -*-
"""
__author__ = 孤寒者
"""
x = 0
n =1000000
def a(n):
    global x
    for i in range(n):
        x += 1

def b(n):
    global x
    for i in range(n):
        x -= 1

a(n)
b(n)
print(x)      

輸出肯定和大家想的一樣,毫無疑問是0!

Python常用子產品 之 threading和Thread子產品 第二階段 線程通信及隊列基操
# -*- coding: utf-8 -*-
"""
__author__ = 孤寒者
"""
from threading import Thread

x = 0
n =1000000
def a(n):
    global x
    for i in range(n):
        x += 1

def b(n):
    global x
    for i in range(n):
        x -= 1

if __name__ == '__main__':
    a = Thread(target=a,args = (n,))
    b = Thread(target=b,args = (n,))
    a.start()
    b.start()
    # 一定要加阻塞,原因大家可以自己結合第一篇講的自己好好想想哦~
    a.join()
    b.join()
    print(x)      

提示:

  • 如果1000000不能出現效果可以繼續在後面加0

    你會發現這個結果千奇百怪!!!

1.4 使用鎖來控制共享資源的通路:

下面引入互斥鎖

  • 在多線程中 , 所有變量對于所有線程都是共享的 ,是以 ,線程之間共享資料最大的危險在于多個線程同時修改一個變量 , 那就亂套了, 是以我們需要互斥鎖 , 來鎖住資料。
  • 隻要我們操作全局變量的時候,就在操作之前加鎖,在操作完之後解鎖,就解決了這個資源競争的問題!!!

第一種實作:

# -*- coding: utf-8 -*-
"""
__author__ = 孤寒者
"""
from threading import Thread, Lock

a = 0
n = 100000   # 指定加減次數
# 線程鎖
lock = Lock()

def incr(n):
    global  a
    # 對全局變量a做n次加1
    for i in range(n):
        lock.acquire()
        a += 1
        lock.release()

def decr(n):
    global a
    # 對全局變量a做n次減一
    for i in range(n):
        lock.acquire()
        a -= 1
        lock.release()

t_incr = Thread(target=incr, args=(n, ))
t_decr = Thread(target=decr, args=(n, ))
t_incr.start(); t_decr.start()
t_incr.join();  t_decr.join()
print(a)      
Python常用子產品 之 threading和Thread子產品 第二階段 線程通信及隊列基操

第二種實作:

# -*- coding: utf-8 -*-
"""
__author__ = 孤寒者
"""
from threading import Thread, Lock

a = 0
n = 100000   # 指定加減次數
# 線程鎖
lock = Lock()

def incr(n):
    global  a
    # 對全局變量a做n次加1
    for i in range(n):
        with lock:
            a += 1

def decr(n):
    global a
    # 對全局變量a做n次減一
    for i in range(n):
        with lock:
            a -= 1

t_incr = Thread(target=incr, args=(n, ))
t_decr = Thread(target=decr, args=(n, ))
t_incr.start(); t_decr.start()
t_incr.join();  t_decr.join()
print(a)      
Python常用子產品 之 threading和Thread子產品 第二階段 線程通信及隊列基操

分析此階段,我們會發現程序和線程的痛點!!!

  • 下述參考本篇文章:​​《什麼是協程》​​
線程之間如何進行協作?
最典型的例子就是生産者/消費者模式:若幹個生産者線程向隊列中寫入資料,若幹個消費者線程從隊列中消費資料。
(功能!)
1.定義了一個生産者類,一個消費者類。
2.生産者類循環100次,向同步隊列當中插入資料。
3.消費者循環監聽同步隊列,當隊列有資料時拉取資料。
4.如果隊列滿了(達到5個元素),生産者阻塞。
5.如果隊列空了,消費者阻塞。

上面的代碼正确地實作了生産者/消費者模式,但是卻并不是一個高性能的實作。為什麼性能不高呢?原因如下:
    1.涉及到同步鎖。
    2.涉及到線程阻塞狀态和可運作狀态之間的切換。
    3.涉及到線程上下文的切換。
    以上涉及到的任何一點,都是非常耗費性能的操作。
    
這裡就引入了協程!是一種比線程更加輕量級的存在。正如一個程序可以擁有多個線程一樣,一個線程也可以擁有多個協程。
最重要的是,協程不是被作業系統核心所管理,而完全是由程式所控制(也就是在使用者态執行)。
這樣帶來的好處就是性能得到了很大的提升,不會像線程切換那樣消耗資源。
既然協程這麼好,它到底是怎麼來使用的呢?
代碼走起來(依舊是生産者/消費者模式的例子!):
def consumer():
    while True:
        # consumer協程等待接收資料
        number = yield
        print('開始消費', number)

consumer_result = consumer()
# 讓初始化狀态的consumer協程先執行起來,在yield處停止
next(consumer_result)

for num in range(100):
    print('開始生産', num)
    # 發送資料給consumer協程
    consumer_result.send(num)

代碼中建立了一個叫做consumer_result的協程,并且在主線程中生産資料,協程中消費資料。
其中 yield 是python當中的文法。當協程執行到yield關鍵字時,會暫停在那一行,等到主線程調用send方法發送了資料,協程才會接到資料繼續執行。
但是,yield讓協程暫停,和線程的阻塞是有本質差別的。協程的暫停完全由程式控制,線程的阻塞狀态是由作業系統核心來進行切換。

是以,協程的開銷遠遠小于線程的開銷!!!      

執行結果:

Python常用子產品 之 threading和Thread子產品 第二階段 線程通信及隊列基操

2. 隊列的基本概念

  • 一個入口,一個出口;
  • 先入先出(FIFO)。
  • Python常用子產品 之 threading和Thread子產品 第二階段 線程通信及隊列基操
import      

隊列操作一覽:

  • 入隊: put(item)
  • 出隊: get()
  • 測試空: empty()
  • 測試滿: full()
  • 隊列長度: qsize()
  • 任務結束: task_done()
  • 等待完成: join()

注意:

  1. get()等待任務完成,如果不加task_done()則不表示任務完成,隻要加這句才表明完成。才會結束執行。
  2. join就是阻塞,直到這個任務完成(完成的标準就是每次取出都task_done()了)
# -*- coding: utf-8 -*-
"""
__author__ = 孤寒者
"""
import queue

# 建立隊列
q = queue.Queue(4)

# 入隊
q.put(1)
q.put(2)
q.put(3)
print(q.full())
q.put(4)
print(q.full())

# 出隊
print(q.get())
print(q.get())
print(q.empty())
print(q.get())
print(q.get())
print(q.empty())