天天看點

Python爬蟲11-多線程同步問題一、線程中的資源競争二、死鎖三、Queue線程(隊列)四、線程同步五、生産者和消費者

多線程同步問題

  • 一、線程中的資源競争
  • 二、死鎖
  • 三、Queue線程(隊列)
  • 四、線程同步
  • 五、生産者和消費者
    • 5.1 Lock版的生産者和消費者
    • 5.2 Condition版的生産者和消費者

一、線程中的資源競争

import threading
import time
           
num = 0
# 建立一把鎖 預設是沒有上鎖
suo=threading.Lock()
           
def demo1(nums):
    global num
    # 加鎖
    suo.acquire()
    for i in range(nums):
        num += 1
    # 解鎖
    suo.release()
    print('demo1-num %d'%num)
           
def demo2(nums):
    global num
    # 加鎖
    suo.acquire()
    for i in range(nums):
        num += 1
    # 解鎖
    suo.release()
    print('demo2-num %d'%num)
           
def main():
    t1=threading.Thread(target=demo1,args=(1000000,))
    t2=threading.Thread(target=demo2,args=(1000000,))
    t1.start()
    # t1.join()
    # time.sleep(1)
    t2.start()
    # t1.join()
    time.sleep(1)
    print('demo-num %d'%num)

if __name__ == '__main__':
    main()
           

threading.Lock() 隻能建立一次

threading.RLock() 可建立多次,需逐個解鎖

二、死鎖

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        # 對mutexA上鎖
        mutexA.acquire()

        # mutexA上鎖後,延時1秒,等待另外那個線程 把mutexB上鎖
        print(self.name+'----do1---up----')
        time.sleep(1)

        # 此時會堵塞,因為這個mutexB已經被另外的線程搶先上鎖了
        mutexB.acquire()
        print(self.name+'----do1---down----')
        mutexB.release()

        # 對mutexA解鎖
        mutexA.release()

class MyThread2(threading.Thread):
    def run(self):
        # 對mutexB上鎖
        mutexB.acquire()

        # mutexB上鎖後,延時1秒,等待另外那個線程 把mutexA上鎖
        print(self.name+'----do2---up----')
        time.sleep(1)

        # 此時會堵塞,因為這個mutexA已經被另外的線程搶先上鎖了
        mutexA.acquire()
        print(self.name+'----do2---down----')
        mutexA.release()

        # 對mutexB解鎖
        mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
# Thread-1----do1---up----
# Thread-2----do2---up----
           

三、Queue線程(隊列)

隊列:先進先出

empty():判斷隊列是否為空

full():判斷隊列是否滿了

from queue import Queue
q=Queue(3)
           
print(q.empty())   # True 空的隊列 False 不是空的隊列
print(q.full())    # True 滿的隊列 False 不是滿的隊列
           

put():将一個資料放到隊列中

q.put(1)
q.put(2)
q.put(3)
q.put(4,timeout=3)  # 隊列已滿,溢出,報錯
# 或 q.put_nowait(4)
           

get():從隊列中取最後一個資料

print(q.get())   # 1
print(q.get())   # 2
print(q.get())   # 3
print(q.get(timeout=3))  # 隊列已空,報錯
# 或 print(q.get_nowait())
           

四、線程同步

張三:李四同學

李四:在

張三:現在幾點了?

李四:不知道!

import threading
           

建立2個線程

class ZhangSan(threading.Thread):
    def __init__(self,cond):
        super().__init__(name='張三')
        self.cond=cond

    def run(self):
        self.cond.acquire()   # 加鎖

        print('{}:李四同學'.format(self.name))
        self.cond.notify()   # 喚醒

        self.cond.wait()     # 等待
        print('{}:現在幾點了?'.format(self.name))
        self.cond.notify()   # 喚醒

        self.cond.release()   # 解鎖
           
class LiSi(threading.Thread):
    def __init__(self,cond):
        super().__init__(name='李四')
        self.cond=cond

    def run(self):
        self.cond.acquire()   # 加鎖

        self.cond.wait()      # 等待
        print('{}:在'.format(self.name))
        self.cond.notify()    # 喚醒

        self.cond.wait()      # 等待
        print('{}:不知道!'.format(self.name))

        self.cond.release()   # 解鎖
           
if __name__ == '__main__':
    cond=threading.Condition()
    lisi=LiSi(cond)
    zhangsan=ZhangSan(cond)

    lisi.start()
    zhangsan.start()
           

五、生産者和消費者

生産者和消費者模式是多線程開發中常見的一種模式。通過生産者和消費者模式,可以讓代碼達到高内聚低耦合的目标,線程管理更加友善,程式分工更加明确。

5.1 Lock版的生産者和消費者

import threading
import random
import time
           
Gmoney=0
# 定義一個變量,儲存生産的次數
Gtime=0
# 定義一把鎖
lock=threading.Lock()
           
# 定義生産者
class Producer(threading.Thread):
    def run(self) -> None:
        global Gmoney
        global Gtime

        while True:
            lock.acquire()      # 加鎖
            if Gtime >= 10:
                lock.release()  # 解鎖
                break

            money = random.randint(0, 100)
            Gmoney += money
            Gtime += 1
            print('%s生産了%d元錢' % (threading.current_thread().name, money))
            lock.release()
            time.sleep(1)
           
# 定義消費者
class Consumer(threading.Thread):
    def run(self) -> None:
        global Gmoney
        global Gtime

        while True:
            lock.acquire()   # 加鎖
            money = random.randint(0, 100)
            if Gmoney >= money:
                Gmoney -= money
                print('%s消費了%d元錢' % (threading.current_thread().name, money))
            else:
                if Gtime >= 10:
                    lock.release()  # 解鎖
                    break
                print('%s消費了%d元錢,餘額隻有%d' % (threading.current_thread().name, money, Gmoney))
            lock.release()
            time.sleep(1)
           
def main():
    # 開啟5個生産者
    for i in range(5):
        p=Producer(name='生産者%d号'%i)
        p.start()
    # 開啟5個消費者
    for j in range(5):
        c=Consumer(name='消費者%d号'%j)
        c.start()

if __name__ == '__main__':
    main()
           

5.2 Condition版的生産者和消費者

import threading
import random
import time
           
Gmoney=0
# 定義一個變量,儲存生産的次數
Gtime=0
cond=threading.Condition()
           
# 定義生産者
class Producer(threading.Thread):
    def run(self) -> None:
        global Gmoney
        global Gtime

        while True:
            cond.acquire()      # 加鎖
            if Gtime >= 10:
                cond.release()  # 解鎖
                break

            money = random.randint(0, 100)
            Gmoney += money
            Gtime += 1
            print('%s生産了%d元錢,剩餘%d元' % (threading.current_thread().name, money,Gmoney))
            cond.notify_all()   # 喚醒所有
            cond.release()
            time.sleep(1)
           
# 定義消費者
class Consumer(threading.Thread):
    def run(self) -> None:
        global Gmoney
        global Gtime

        while True:
            cond.acquire()   # 加鎖
            money = random.randint(0, 100)

            while Gmoney < money:
                if Gtime >= 10:
                    cond.release()
                    return   # 這裡如果用break退出了内層循環,但是外層循環沒有退出,直接用return
                print('%s消費者消費了%d元,餘額%d元,生産者不再生成了'%(threading.current_thread().name,money,Gmoney))
                cond.wait()

            # 開始消費
            Gmoney -= money
            print('%s消費了%d元錢,剩餘%d元' % (threading.current_thread().name, money,Gmoney))
            cond.release()
            time.sleep(1)
           
def main():
    # 開啟5個生産者
    for i in range(5):
        p=Producer(name='生産者%d号'%i)
        p.start()
    # 開啟5個消費者
    for j in range(5):
        c=Consumer(name='消費者%d号'%j)
        c.start()

if __name__ == '__main__':
    main()
           

繼續閱讀