天天看點

并發程式設計

程序理論

程序是什麼?

程序是正在運作的一個過程或者說一個任務,而負責執行任務則是cpu。

舉例:

一個平平無奇的戀愛小天才climber和他的女朋友去烘焙店做蛋糕。店家給了他蛋糕的食譜,一堆原料(面粉、雞蛋、等等等)

climber就是處理器(cpu);做蛋糕的食譜就好比是計算機程式;而哪些原料就是輸入資料;程序就是climber閱讀食譜,取來各種原料以及拱北蛋糕的整個過程。

并行與并發是什麼?

并發:僞并行,單個cpu利用多道技術,使我們看起來cpu在“同時”執行多個任務。

并行:隻有具備多個cpu才能實作并行。

程序與線程的差別:

一般來說,每個程序都有一個位址空間。多線程就是一個程序記憶體在多個線程,多個線程共享該程序的位址空間。相當于一個工廠中的房間内有多個流水線,都共用這個工廠中的房間的資源。

關鍵點:

  1. 同一個程序内的多個線程共享該程序内的位址資源
  2. 建立線程的開銷要遠小于建立程序的開銷(建立一個程序,就是建立一個工廠中的房間,涉及到申請空間,而且在該空間内建至少一條流水線,但建立線程,就隻是在一個工廠中的房間内造一條流水線,無需申請空間,是以建立開銷小)

開啟程序的兩種方式:

import time
import random
from multiprocessing import Process

def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)

if __name__ == '__main__':
    #執行個體化得到四個對象
    p1=Process(target=piao,args=('egon',)) #必須加,号
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('wupeqi',))
    p4=Process(target=piao,args=('yuanhao',))

    #調用對象下的方法,開啟四個程序
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主')           
import time
import random
from multiprocessing import Process

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s 正在吃瓜' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s 吃完瓜了' %self.name)

if __name__ == '__main__':
    #執行個體化得到四個對象
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')

    #調用對象下的方法,開啟四個程序
    p1.start() #start會自動調用run
    p2.start()
    p3.start()
    p4.start()
    print('主')           

join方法

在主程序運作過程中如果想并發地執行其他的任務,我們可以開啟子程序,此時主程序的任務與子程序的任務分兩種情況

情況一:在主程序的任務與子程序的任務彼此獨立的情況下,主程序的任務先執行完畢後,主程序還需要等待子程序執行完畢,然後統一回收資源。

情況二:如果主程序的任務在執行到某一個階段時,需要等待子程序執行完畢後才能繼續執行,就需要有一種機制能夠讓主程序檢測子程序是否運作完畢,在子程序執行完畢後才繼續執行,否則主程序就一直在原地阻塞,這就是join方法的作用

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)

if __name__ == '__main__':
    p1=Process(target=task,args=('egon',))
    p2=Process(target=task,args=('alex',))
    p3=Process(target=task,args=('yuanhao',))
    p4=Process(target=task,args=('wupeiqi',))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    # 很明顯p.join()是讓主線程等待p的結束,卡住的是主程序而絕非子程序p,
    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print('主程序結束')           

守護程序

from multiprocessing import Process

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True #一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,并且父程序代碼執行結束,p即終止運作
    p1.start()
    p2.start()
    print("main-------") #隻要終端列印出這一行内容,那麼守護程序p也就跟着結束掉了           

互斥鎖

程序之間資料不共享,但是共享同一套檔案系統,是以通路同一個檔案,或同一個列印終端,是沒有問題的,而共享帶來的是競争,競争帶來的結果就是錯亂。

互斥鎖的工作原理就是多個人都要去争搶同一個資源:衛生間,一個人搶到衛生間後上一把鎖,其他人都要等着,等到這個完成任務後釋放鎖,其他人才有可能有一個搶到......是以互斥鎖的原理,就是把并發改成穿行,降低了效率,但保證了資料安全不錯亂。

from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire() #加鎖
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    lock.release() #釋放鎖
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()           

 生産者消費者模型

生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。

這個阻塞隊列就是用來給生産者和消費者解耦的

from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q,name):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[43m%s 吃 %s\033[0m' %(name,res))
        q.task_done() #發送信号給q.join(),說明已經從隊列中取走一個資料并處理完畢了

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m%s 生産了 %s\033[0m' %(name,res))
    q.join() #等到消費者把自己放入隊列中的所有的資料都取走之後,生産者才結束

if __name__ == '__main__':
    q=JoinableQueue() #使用JoinableQueue()

    #生産者們:即廚師們
    p1=Process(target=producer,args=(q,'egon1','包子'))
    p2=Process(target=producer,args=(q,'egon2','骨頭'))
    p3=Process(target=producer,args=(q,'egon3','泔水'))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,'alex1'))
    c2=Process(target=consumer,args=(q,'alex2'))
    c1.daemon=True
    c2.daemon=True

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    #1、主程序等生産者p1、p2、p3結束
    #2、而p1、p2、p3是在消費者把所有資料都取幹淨之後才會結束
    #3、是以一旦p1、p2、p3結束了,證明消費者也沒必要存在了,應該随着主程序一塊死掉,因而需要将生産者們設定成守護程序
    print('主')           

多線程

開啟線程的兩種方式

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主程序')           
#方式二
from threading import Thread
import time

class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)

if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主程序')           

多線程性能測試

 如果并發的多個任務是計算密集型:多程序效率高

from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i


if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本機為4核
    start=time.time()
    for i in range(4):
        p=Process(target=work) #耗時5s多
        p=Thread(target=work) #耗時18s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))           

如果并發的多個任務是I/O密集型:多線程效率高

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    print('===>')

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本機為4核
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #耗時12s多,大部分時間耗費在建立程序上
        p=Thread(target=work) #耗時2s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))           

多線程用于IO密集型,如socket,爬蟲,web

多程序用于計算密集型,如金融分析

信号量

相當于一群人搶公共廁所,坑位為信号量參數

from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()

if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()           

程序池、線程池

如果無限制的開啟程序或線程,這會對服務端主機帶來巨大的壓力,甚至于不堪重負而癱瘓,于是我們必須對服務端開啟的程序數或線程數加以控制,讓機器在一個自己可以承受的範圍内運作,這就是程序池或線程池的用途。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())           

協程

1. python的線程屬于核心級别的,即由作業系統控制排程(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運作)
2. 單線程内開啟協程,一旦遇到io,就會從應用程式級别(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)           
import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'dzw')
g2=gevent.spawn(play,name='dzw')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')           

繼續閱讀