天天看點

python-網絡程式設計-02-程序一、程序二、生産者和消費者

基礎理論

#一 作業系統的作用:
    1:隐藏醜陋複雜的硬體接口,提供良好的抽象接口
    2:管理、排程程序,并且将多個程序對硬體的競争變得有序

#二 多道技術:
    1.産生背景:針對單核,實作并發
        現在的主機一般是多核,那麼每個核都會利用多道技術
        有4個cpu,運作于cpu1的某個程式遇到io阻塞,會等到io結束再重新排程,會被排程到4個
        cpu中的任意一個,具體由作業系統排程算法決定。

    2.空間上的複用:如記憶體中同時有多道程式
    3.時間上的複用:複用一個cpu的時間片
       強調:遇到io切換,占用cpu時間過長也會切換,核心在于切之前将程序的狀态儲存下來,這樣
            才能保證下次切換回來時,能基于上次切走的位置繼續運作
           

一、程序

# 什麼是程序
	程序是程式(軟體,應用)的一個執行執行個體,每個運作中的程式,可以同時建立多個程序,但至少要有一個。每個程序都提供執行程式所需的所有資源,都有一個虛拟的位址空間、可執行的代碼、作業系統的接口、安全的上下文(記錄啟動該程序的使用者和權限等等)、唯一的程序ID、環境變量、優先級類、最小和最大的工作空間(記憶體空間)。**程序可以包含線程**,并且**每個程序必須有至少一個線程**。每個程序啟動時都會最先産生一個線程,即主線程,然後主線程會再建立其他的子線程。

# 程序與程式的差別
   程式僅僅隻是一堆代碼而已,而程序指的是程式的運作過程

# 并發、并行、串行
    并發: 看起來是同時執行的程式,但實際隻要一個cpu在不同的切換執行不同的程序
    并行: 真正的多個任務同時執行
    串行: 如串串 隻有吃完第一個(執行完)然後才會繼續吃(進行) 下一個
           

1.1、程序-狀态

建立\終止

# 程序建立
	但凡是硬體,都需要有作業系統去管理,隻要有作業系統,就有程序的概念,就需要有建立程序的方式,一些作業系統隻為一個應用程式設計
	建立流程:
    	1、系統初始化 (檢視程序linux中用ps指令,windows中用任務管理器,前台程序負責與使用者互動,背景運作的程序與使用者無關,運作在背景并且隻在需要時才喚醒的程序,稱為守護程序,如電子郵件、web頁面、新聞、列印 )
        2、一個程序在運作過程中開啟了子程序(linux: fork, windows: createPorcess)
        3、使用者的互動式請求,而建立一個新程序
        4、一個批處理作業的初始化

    無論哪一種,新程序的建立都是由一個已經存在的程序執行了一個用于建立程序的系統調用而建立的:
	  1. 在UNIX中該系統調用是:fork,fork會建立一個與父程序一模一樣的副本,二者有相同的存儲映像、同樣的環境字元串和同樣的打開檔案(在shell解釋器程序中,執行一個指令就會建立一個子程序)
	  2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理程序的建立,也負責把正确的程式裝入新程序。

# 程序的終止
  1. 正常退出(自願,如使用者點選互動式頁面的叉号,或程式執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
  2. 出錯退出(自願,python a.py中a.py不存在)
  3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的記憶體,1/0等,可以捕捉異常,try...except...)
  4. 被其他程序殺死(非自願,如kill -9)
           

狀态切換

python-網絡程式設計-02-程式一、程式二、生産者和消費者
運作:正在被cpu執行的狀态
阻塞: 當程式執行io時會被阻塞,cpu同時也會切換到其它程序
就緒:當程式io執行完成,cpu會切換回來成為就緒态
           

中斷狀态資料

程序并發的實作在于,硬體中斷一個正在運作的程序,把此時程序運作的所有狀态儲存下來,為此,作業系統維護一張表格,即程序表(process table),每個程序占用一個程序表項(這些表項也稱為程序控制塊)

    該表存放了程序狀态的重要資訊:程式計數器、堆棧指針、記憶體配置設定狀況、所有打開檔案的狀态、帳号和排程資訊,以及其他在程序由運作态轉為就緒态或阻塞态時,必須儲存的資訊,進而保證該程序在再次啟動時,就像從未被中斷過一樣。
           
python-網絡程式設計-02-程式一、程式二、生産者和消費者

示例

開啟子程序
Python中的multiprocess提供了Process類,實作程序相關的功能。但是它基于fork機制,是以不被windows平台支援。想要在windows中運作,必須使用`if __name__ == '__main__:`的方式,顯然這隻能用于調試和學習,不能用于實際環境。

# 開啟方式一
import time
from multiprocessing import Process

def test(x):
    print("{} 開始".format(x))
    time.sleep(2)
    print("{} 結束".format(x))

if __name__ == '__main__':
    pro = Process(target=test, args=("子程序",))
    pro.start()     # 給作業系統發起一個建立子程式的信号
    print("主程序開始")

# 執行結果, 程式會先運作主程序中的代碼,然後在運作子程序
    主程序開始
    子程序 開始
    子程序 結束

# 開啟方式二
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self, x):
        super().__init__()
        self.x = x

    def run(self):		# 與直接調用不同, 如果用繼承需要重新定義run方法
        print("{} 開始".format(self.x))
        time.sleep(1)
        print("{} 結束".format(self.x))

if __name__ == '__main__':
    pro = Myprocess("子程序")
    pro.start()
    print("主程序")
           

參數說明

Process([group [, target [, name [, args [, kwargs]]]]]),由該類執行個體化得到的對象,表示一個子程序中的任務(尚未啟動)
# 強調:
    1. 需要使用關鍵字的方式來指定參數
    2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗号

# 參數:
    group   參數未使用,值始終為None
    target  表示調用對象,即子程序要執行的任務
    args    表示調用對象的位置參數元組,args=(1,2,'egon',)
    kwargs  表示調用對象的字典,kwargs={'name':'egon','age':18}
    name    為子程序的名稱

# 方法:
	p.start():啟動程序,并調用該子程序中的p.run()
    p.run():程序啟動時運作的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實作該方法  
    p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了僵屍程序,使用該方法需要特别小心這種情況。如果p還儲存了一個鎖那麼也将不會被釋放,進而導緻死鎖
    p.is_alive():如果p仍然運作,傳回True
    p.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀态,而p是處于運作的狀态)。timeout是可選的逾時時間,需要強調的是,p.join隻能join住start開啟的程序,而不能join住run開啟的程序

# 屬性:
    p.daemon:預設值為False,如果設為True,代表p為背景運作的守護程序,當p的父程序終止時,p也随之終止,并且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定
    p.name:程序的名稱
    p.pid:程序的pid
    p.exitcode:程序在運作時為None、如果為–N,表示被信号N結束(了解即可)
    p.authkey:程序的身份驗證鍵,預設是由os.urandom()随機生成的32字元的字元串。這個鍵的用途是為涉及網絡連接配接的底層程序間通信提供安全性,這類連接配接隻有在具有相同的身份驗證鍵時才能成功(了解即可)
           

僵屍\孤兒程序

僵屍程序是一種獨特的資料結構,子程序運作完之後其所打開的檔案、記憶體空間都會被釋放,但會保留子程式的ID号, 僵屍程序的存在是為了父程序無法預知子程序的狀态,而父程序卻想擷取到子程序的id資訊
           

1.2、守護程序

當父程序需要将一個任務并發出去執行,需要将該任務放到子程序中,當該子程序内的代碼在父程序代碼運作完畢後就沒有存在的意義了,那就應該将子程序設定為守護程序,這樣就會在父程序執行完畢之後直接退出。

# 主程序建立守護程序
  1:守護程序會在主程序代碼執行結束後就終止
  2:守護程序内無法再開啟子程序,否則抛出異常:AssertionError: daemonic processes are not allowed to have children
注意:程序之間是互相獨立的,主程序代碼運作結束,守護程序随即終止
           
示例
import time
from multiprocessing import Process

def fp():
    print("123")
    time.sleep(1)
    print("fp 123 end")

def dp():
    print("456")
    time.sleep(2)
    print("dp 456 end")

if __name__ == '__main__':
    p1 = Process(target=fp)
    p2 = Process(target=dp)

    p1.daemon = True
    p1.start()
    p2.start()
    print("main----------- ")

"""
    main----------- 
    456
    dp 456 end
"""
           

1.3、互斥鎖

将部分代碼(涉及到修改共享資料的代碼)變成串行,将并發變成串行,犧牲效率提高資料安全
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import os
import time
import json
from multiprocessing import Process, Lock


# 檢視票
def ticket_list():
    time.sleep(1)  # 模拟網絡延時
    with open("file.txt", "rt", encoding="utf-8") as read_f:
        dic = json.load(read_f)
    print("{} 目前還有{}票".format(os.getpid(), dic["count"]))


# 購買
def ticket_buy():
    # 先檢視票,如有票-1, 無票提示
    time.sleep(1)  # 模拟檢視延時
    with open("file.txt", "rt", encoding="utf-8") as read_f:
        dic = json.load(read_f)
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(1)  # 如果不添加延時,隻會依賴計算能力,在一定範圍内能直接處理
        with open("file.txt", "wt", encoding="utf-8") as wirte_f:
            json.dump(dic, wirte_f)
        print("{} 購票成功".format(os.getpid()))
    else:
        print("{} 無餘票,購票失敗".format(os.getpid()))


def task(mutex):
    ticket_list()

    mutex.acquire()		# 互斥鎖不能連續的acquire(), 必須是release以後才能acquire
    ticket_buy()
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=task, args=(mutex,))
        p.start()
           

1.4、IPC

# 程序間通信,不應該使用硬碟空間,而是使用記憶體空間通信, 是以我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing子產品為我們提供的基于消息的IPC通信機制:隊列和管道。

# IPC 程序間通信, 兩種實作方式, 隊列和管道都是将資料存放于記憶體中
	pipe:   管道
    queue:  隊列基于(管道+鎖)實作的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該盡量避免使用共享資料,盡可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。
           
queue
  • 類: Queue([maxsize]): 建立共享的程序隊列,Queue是多程序安全的隊列,可以使用Queue實作多程序之間的資料傳遞
  • 參數:maxsize 是隊列中允許最大項數,省略則無大小限制。
  • 方法
    • q.put 方法用以插入資料到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(預設值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果逾時,會抛出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即抛出Queue.Full異常。
    • q.get 方法可以從隊列讀取并且删除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(預設值),并且timeout為正值,那麼在等待時間内沒有取到任何元素,會抛出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即傳回該值,否則,如果隊列為空,則立即抛出Queue.Empty異常.
    • q.get_nowait() :同q.get(False)
    • q.put_nowait() :同q.put(False)
    • q.empty(): 調用此方法時q為空則傳回True,該結果不可靠,比如在傳回True的過程中,如果隊列中又加入了項目。
    • q.full():調用此方法時q已滿則傳回True,該結果不可靠,比如在傳回True的過程中,如果隊列中的項目被取走。
    • q.qsize(): 傳回隊列中目前項目的正确數量,結果也不可靠,理由同q.empty()和q.full()一樣
1、隊列占用的是記憶體空間
2、不應該往隊列中放大資料,應該隻存放資料量較小的消息

q = Queue(3)  # 設定隊列大小為3

# 使用-示
q.put("world3")  # 正常添加 
print(q.get())

# block=True時 timeout才會生效
    q.put("hello", block=True, timeout=3)
    print(q.get(block=True, timeout=3))   # 取出逾時,當隊列中沒有時,逾時會出現_queue.Empty

# block=False,  隊列滿了直接抛出異常,不阻塞
           

二、生産者和消費者

        利用多線程和隊列可以實作生産者消費者模式。該模式通過平衡生産線程和消費線程的工作能力來提高程式整體處理資料的速度。

# 什麼是生産者和消費者
   線上程世界裡,生産者就是生産資料(或者說釋出任務)的線程,消費者就是消費資料(或者說處理任務)的線程。在任務執行過程中,如果生産者處理速度很快,而消費者處理速度很慢,那麼生産者就必須等待消費者處理完,才能繼續生産資料。同樣的道理,如果消費者的處理能力大于生産者,那麼消費者就必須等待生産者提供更多的任務,本質上,這是一種供需不平衡的表現。為了解決這個問題,我們創造了生産者和消費者模式。

生産者 = 釋出任務線程,   消費者 = 處理任務線程

#  什麼時侯用?
	當程式中存在明顯的兩類任務,一類負責生産資料,一類負責處理資料,此時就應該考慮使用生産者消費者模型來提升 程式的效率
           

工作機制

​ 生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而是通過 阻塞隊列(消息隊列) 來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不直接找生産者要資料,而是從阻塞隊列裡取,阻塞隊列 (消息隊列) 就相當于一個緩沖區,平衡了生産者和消費者的處理能力,解耦了生産者和消費者。

python-網絡程式設計-02-程式一、程式二、生産者和消費者

​ 生産者消費者模式的核心是‘阻塞隊列’也稱消息隊列。在生産環境中有很多大名鼎鼎的分布式消息隊列,例如RabbitMQ,RocketMq,Kafka等等。在學習過程中,我們沒必要使用這麼大型的隊列,直接使用Python内置的queue子產品中提供的隊列就可以了。

多程序-demo-1
q.task_done() # 消費者告知生産者取了一次

q.join()   # 等待隊列被取幹淨, 結束意味着主程序的代碼運作完畢 --> 生産運作完畢并且隊列中的資料也被取幹淨了--> 消費者也就沒有意義了

"""    # 正常寫法,但這樣的每添加一個程序都需要put一個None
import os
import random
import time
from multiprocessing import Process, Queue


def product(q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        print("\033[45m{} 生産了 包子 {}\033[0m".format(os.getpid(), i))
        q.put(i)
    # q.put(None)


def consume(q):
    while True:
        time.sleep(random.randint(1, 3))
        res = q.get()
        if res is None: break
        print("{} 吃掉了 包子 {}".format(os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=product, args=(q,))
    p2 = Process(target=product, args=(q,))
    p3 = Process(target=product, args=(q,))
    c1 = Process(target=consume, args=(q,))
    c2 = Process(target=consume, args=(q,))
	
	# 啟動程序
    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

	# 讓子程序運作在父程序前,直到代碼結束
    p1.join()
    p2.join()
    p3.join()

    # 有多少個消費者就得傳多少個None
    q.put(None)
    q.put(None)
"""
           
多程序-demo-2
  • 類: JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知程序是使用共享的信号和條件變量來實作的。
  • 參數: maxsize是隊列中允許最大項數,省略則無大小限制。
    • 方法:JoinableQueue的執行個體p除了與Queue對象相同的方法之外還具有:
      • q.task_done():使用者使用此方法發出信号,表示q.get()的傳回項目已經被處理。如果調用此方法的次數大于從隊列中删除項目的數量,将引發ValueError異常
      • q.join():生産者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞将持續到隊列中的每個項目均調用q.task_done()方法為止
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import os
import random
import time
from multiprocessing import Process, JoinableQueue


def product(q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        print("\033[45m{} 生産了 包子 {}\033[0m".format(os.getpid(), i))
        q.put(i)


def consume(q):
    while True:
        time.sleep(random.randint(1, 3))
        res = q.get()
        if res is None: break
        print("{} 吃掉了 包子 {}".format(os.getpid(), res))
        q.task_done()       # 1、當取走代碼時,通知隊列取走了一次


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=product, args=(q,))
    p2 = Process(target=product, args=(q,))
    p3 = Process(target=product, args=(q,))
    c1 = Process(target=consume, args=(q,))
    c2 = Process(target=consume, args=(q,))
    c1.daemon = True        # 守護程序需要運作在主程序之前,守護程序會在主程序代碼執行結束後就終止
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()       # 等待子程序運作完之後在執行主程序
    p2.join()
    p3.join()

    q.join()        # # 等待隊列被取幹淨, 結束意味着主程序的代碼運作完畢 -->
                        # 生産運作完畢并且隊列中的資料也被取幹淨了-->
                        # 消費者也就沒有意義了
    print("master")

""" 
    # JoinableQueue()Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知程序是使用共享的信号和條件變量來實作
    1、開啟子程序     Process(target=product, args=(q,)) # 傳遞q
    2、啟動子程序     p1.start()  生産者 開始生産
    3、啟動子程序     c1.start()  消費者 開始消費
        3.1、  消費者q.task_done()       # 當取走代碼時,通知隊列取走了一次
        3.2、  q.join()  等待隊列被取幹淨
    4、待子程序運作完, 每個程序都需要join, 否則p1結束時會直接到 master程序
    5、當子程序都運作完了,列印master, 子程序 設定為: daemon=True
    6、當父程序都結束了,子程序也會跟着一并結束
"""
           
線程-demo
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import time
import threading
import queue

q = queue.Queue(10)  # 生成10個隊列


# 生産者
def productor(i):
    while True:
        q.put("廚師做了{0}号包子".format(i))
        time.sleep(2)


# 消費者
def consumer(k):
    while True:
        print("消費者{} 吃了{}".format(k, q.get()))
        time.sleep(1)


for i in range(5):
    p = threading.Thread(target=productor, args=(i,))
    p.start()

for k in range(3):
    c = threading.Thread(target=consumer, args=(k,))
    c.start()

# 5個廚師生産, 3個生産者消費
消費者0 吃了廚師做了0号包子
消費者1 吃了廚師做了1号包子
消費者2 吃了廚師做了2号包子
消費者1 吃了廚師做了3号包子
消費者0 吃了廚師做了4号包子
...
# 最怕生産者太快,而消費者消費的速度太慢導緻資料存儲過慢