天天看點

同一個程序内的隊列(多線程) 線程池

一、同一個程序内的隊列(多線程)

  import queue

  queue.Queue()   先進先出

  queue.LifoQueue()  後進先出

  queue.PriorityQueue()   優先級隊列

    優先級隊列   q = queue.PriorityQueue() 

      q.put((pri , data))   接收的是一個元祖

      元祖中第一個參數是:表示目前資料的優先級

      元祖中第二個參數是:需要存放到隊列中的資料

    優先級的比較(首先保證整個隊列中,所有表示優先級的東西類型必須一緻)

      如果都是int,比數值的大小

      如果都是str,比較字元串的大小(從第一個字元的ASCII碼開始比較),當ASCII碼相同時會按照先進先出的原則

from multiprocessing import Queue
import queue

q = queue.LifoQueue() # 後進先出的隊列
q.put(1)
q.put(2)
q.put(3)
for i in range(q.qsize()):
    print(q.get())

q = queue.Queue() # 先進先出隊列
q.put(1)
q.put(2)
q.put(3)
for i in range(q.qsize()):
    print(q.get())

q = queue.PriorityQueue() # 優先級隊列
q.put((1,3))
q.put((2,2))
q.put((3,1))
for i in range(q.qsize()):
    print(q.get())      

二、線程池

  1、在一個池子裡放固定數量的線程,這些線程處于等待狀态,一旦有任務來,就有線程自發的去執行任務。

  concurrent.futures   這個子產品是異步調用的機制(送出任務都是用submit)

  for + submit 多個任務的送出

  shutdown  是等效于Pool中的close  +  join ,是指不允許再繼續向池中增加任務,然後讓父程序(線程)等待池中所有程序(線程)執行完所有任務。

from concurrent.futures import ThreadPoolExecutor
import time


def func(num):
    sum = 0
    for i in range(num):
        sum += i ** 2
    return sum


t = ThreadPoolExecutor(20) # 執行個體化20個線程
start = time.time()
res = t.map(func,range(1000)) # 送出多個任務給池中,等效于for + submit
t.shutdown()
for i in range(1000):
    print(res.__next__()) # res是一個生成器
print(time.time() - start)      

  2、如何把多個任務扔進池中?

    要麼使用for + submit  的方法去送出多個任務

    要麼直接使用map(func,iterable) 方式去送出多個任務

  不同的方式送出多個任務(for + submit  或者map),擁有不同的得到結果的方式。

    如果是for + submit 的方式送出任務,想得到結果用result方法

    如果是map的方式送出任務,結果是一個生成器,采用__next__()  的方式得到結果。

  關于回調函數,不管是Pool程序池的方式,還是ProcessPoolExecutor的方式開啟程序池,

          回調函數都是由父程序調用

from concurrent.futures import ThreadPoolExecutor
import time


def func(num):
    sum = 0
    for i in range(num):
        sum += i ** 2
    return sum


def call_back(res):
    num = res.result()
    print(num)


t = ThreadPoolExecutor(20) # 執行個體化20個線程
start = time.time()
# t.map(func,range(1000)) # 送出多個任務給池中,等效于for + submit
for i in range(1000):
    t.submit(func,i).add_done_callback(call_back)
t.shutdown()
print(time.time() - start)      

  3、鎖

    第一種情況,在同一個線程内,遞歸鎖可以無止盡的acquire,但是互斥鎖不行。

    第二種情況,在不同的線程内,遞歸鎖是保證隻能被一個線程拿到鑰匙,然後無止盡的acquire,其他線程等待。

from threading import RLock,Thread
from threading import Semaphore
import time


def func(i,l):
    l.acquire()
    l.acquire()
    l.acquire()
    l.acquire()
    print(i)
    l.release()
    l.release()
    l.release()
    l.release()

l = RLock()
for i in range(10):
    Thread(target=func,args=(i,l)).start()      

轉載于:https://www.cnblogs.com/wjs521/p/9544051.html