一、同一個程序内的隊列(多線程)
import queue
queue.Queue() 先進先出
queue.LifoQueue() 後進先出
queue.PriorityQueue() 優先級隊列
優先級隊列 q = queue.PriorityQueue()
q.put((pri , data)) 接收的是一個元祖
元祖中第一個參數是:表示目前資料的優先級
元祖中第二個參數是:需要存放到隊列中的資料
優先級的比較(首先保證整個隊列中,所有表示優先級的東西類型必須一緻)
如果都是int,比數值的大小
如果都是str,比較字元串的大小(從第一個字元的ASCII碼開始比較),當ASCII碼相同時會按照先進先出的原則
from multiprocessing import Queue
import queue
q = queue.LifoQueue() # 後進先出的隊列
q.put(1)
q.put(2)
q.put(3)
for i in range(q.qsize()):
print(q.get())
q = queue.Queue() # 先進先出隊列
q.put(1)
q.put(2)
q.put(3)
for i in range(q.qsize()):
print(q.get())
q = queue.PriorityQueue() # 優先級隊列
q.put((1,3))
q.put((2,2))
q.put((3,1))
for i in range(q.qsize()):
print(q.get())
二、線程池
1、在一個池子裡放固定數量的線程,這些線程處于等待狀态,一旦有任務來,就有線程自發的去執行任務。
concurrent.futures 這個子產品是異步調用的機制(送出任務都是用submit)
for + submit 多個任務的送出
shutdown 是等效于Pool中的close + join ,是指不允許再繼續向池中增加任務,然後讓父程序(線程)等待池中所有程序(線程)執行完所有任務。
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
return sum
t = ThreadPoolExecutor(20) # 執行個體化20個線程
start = time.time()
res = t.map(func,range(1000)) # 送出多個任務給池中,等效于for + submit
t.shutdown()
for i in range(1000):
print(res.__next__()) # res是一個生成器
print(time.time() - start)
2、如何把多個任務扔進池中?
要麼使用for + submit 的方法去送出多個任務
要麼直接使用map(func,iterable) 方式去送出多個任務
不同的方式送出多個任務(for + submit 或者map),擁有不同的得到結果的方式。
如果是for + submit 的方式送出任務,想得到結果用result方法
如果是map的方式送出任務,結果是一個生成器,采用__next__() 的方式得到結果。
關于回調函數,不管是Pool程序池的方式,還是ProcessPoolExecutor的方式開啟程序池,
回調函數都是由父程序調用
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
return sum
def call_back(res):
num = res.result()
print(num)
t = ThreadPoolExecutor(20) # 執行個體化20個線程
start = time.time()
# t.map(func,range(1000)) # 送出多個任務給池中,等效于for + submit
for i in range(1000):
t.submit(func,i).add_done_callback(call_back)
t.shutdown()
print(time.time() - start)
3、鎖
第一種情況,在同一個線程内,遞歸鎖可以無止盡的acquire,但是互斥鎖不行。
第二種情況,在不同的線程内,遞歸鎖是保證隻能被一個線程拿到鑰匙,然後無止盡的acquire,其他線程等待。
from threading import RLock,Thread
from threading import Semaphore
import time
def func(i,l):
l.acquire()
l.acquire()
l.acquire()
l.acquire()
print(i)
l.release()
l.release()
l.release()
l.release()
l = RLock()
for i in range(10):
Thread(target=func,args=(i,l)).start()
轉載于:https://www.cnblogs.com/wjs521/p/9544051.html