天天看點

Python學習記錄-協程、異步IOPython學習記錄-協程、異步IO

Python學習記錄-協程、異步IO

  • Python學習記錄-協程、異步IO
    • 1. 協程
      • 1.1 greenlet
      • 1.2 gevent
    • 2. 事件驅動與異步IO
      • 2.1 事件驅動模型
      • 2.2 Select、Poll、Epoll異步IO
      • 2.3 Python select
      • 2.4 Python selectors

1. 協程

線程和程序的操作是由程式觸發系統接口,最後的執行者是系統;協程的操作則是程式員。

協程存在的意義:對于多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(儲存狀态,下次繼續)。協程,則隻使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程的适用場景:當程式中存在大量不需要CPU的操作時(IO),适用于協程;

協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操作鎖定及同步的開銷

    “原子操作(atomic operation)是不需要synchronized”,所謂原子操作是指不會被線程排程機制打斷的操作;這種操作一旦開始,就一直運作到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉隻執行部分。視作整體是原子性的核心。

  • 友善切換控制流,簡化程式設計模型
  • 高并發+高擴充性+低成本:一個CPU支援上萬的協程都不是問題。是以很适合用于高并發處理。

協程的缺點:

  • 無法利用多核資源:協程的本質是個單線程,它不能同時将 單個CPU 的多個核用上,協程需要和程序配合才能運作在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程式

使用

yield

實作協程操作例子:

import time
import queue

def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        # time.sleep(1)

def producer():

    r = con.__next__()
    r = con2.__next__()
    n = 
    while n < :
        n +=
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" %n )


if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()
           

1.1 greenlet

from greenlet import greenlet


def test1():
    print()
    gr2.switch()
    print()
    gr2.switch()


def test2():
    print()
    gr1.switch()
    print()

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
           

運作結果:

12
56
34
78
           

1.2 gevent

Python通過yield提供了對協程的基本支援,但是不完全。而第三方的gevent為Python提供了比較完善的協程支援。

gevent是第三方庫,通過greenlet實作協程,其基本思想是:

當一個greenlet遇到IO操作時,比如通路網絡,就自動切換到其他的greenlet,等到IO操作完成,再在适當的時候切換回來繼續執行。由于IO操作非常耗時,經常使程式處于等待狀态,有了gevent為我們自動切換協程,就保證總有greenlet在運作,而不是等待IO。

import gevent

def foo():
    print('Running in foo')
    gevent.sleep()
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep()
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
           

運作結果:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
           

遇到IO操作自動切換:

from gevent import monkey
import gevent
import time
from urllib import request

monkey.patch_all()

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

urls = ['https://www.python.org/','https://www.baidu.com/','https://www.so.com/']
time_start = time.time()
for url in urls:
    f(url)

print("同步cost", time.time() - time_start)

async_time_start =  time.time()
gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.baidu.com/'),
        gevent.spawn(f, 'https://www.so.com/'),
])
print("異步cost", time.time() - async_time_start )
           

通過gevent實作單線程下的多socket并發:

server.py

import socket
import gevent

from gevent import socket,monkey
monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen()
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)



def handle_request(conn):
    try:
        while True:
            data = conn.recv()
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()
if __name__ == '__main__':
    server()
           

client.py

import socket

HOST = 'localhost'    # The remote host
PORT =            # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"),encoding="utf8")
    s.sendall(msg)
    data = s.recv()
    print('Received', repr(data))

s.close()
           

多線程自發自收:

import socket
import threading

def sock_conn():

    client = socket.socket()

    client.connect(("localhost",))
    count = 
    while True:
        #msg = input(">>:").strip()
        #if len(msg) == 0:continue
        client.send( ("hello %s" %count).encode("utf-8"))

        data = client.recv()

        print("[%s]recv from server:" % threading.get_ident(),data.decode()) #結果
        count +=
    client.close()


for i in range():
    t = threading.Thread(target=sock_conn)
    t.start()
           

2. 事件驅動與異步IO

2.1 事件驅動模型

通常,我們寫伺服器處理模型的程式時,有以下幾種模型:

(1)每收到一個請求,建立一個新的程序,來處理該請求;

(2)每收到一個請求,建立一個新的線程,來處理該請求;

(3)每收到一個請求,放入一個事件清單,讓主程序通過非阻塞I/O方式來處理請求

上面的幾種方式,各有千秋,

第(1)種方法,由于建立新的程序的開銷比較大,是以,會導緻伺服器性能比較差,但實作比較簡單。

第(2)種方式,由于要涉及到線程的同步,有可能會面臨死鎖等問題。

第(3)種方式,在寫應用程式代碼時,邏輯比前面兩種都複雜。

綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網絡伺服器采用的方式。

事件驅動程式設計是一種程式設計範式,這裡程式的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的程式設計範式是(單線程)同步以及多線程程式設計。

讓我們用例子來比較和對比一下單線程、多線程以及事件驅動程式設計模型。下圖展示了随着時間的推移,這三種模式下程式所做的工作。這個程式有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框标示出來了。

Python學習記錄-協程、異步IOPython學習記錄-協程、異步IO

在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之後它們才能依次執行。這種明确的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間并沒有互相依賴的關系,但仍然需要互相等待的話這就使得程式不必要的降低了運作速度。

在多線程版本中,這3個任務分别在獨立的線程中執行。這些線程由作業系統來管理,在多處理器系統上可以并行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程式相比,這種方式更有效率,但程式員必須寫代碼來保護共享資源,防止其被多個線程同時通路。多線程程式更加難以推斷,因為這類程式不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實作不當就會導緻出現微妙且令人痛不欲生的bug。

在事件驅動版本的程式中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然後當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時将它們配置設定給等待處理事件的回調函數。這種方式讓程式盡可能的得以執行而不需要用到額外的線程。事件驅動型程式比多線程程式更容易推斷出行為,因為程式員不需要關心線程安全問題。

當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:

程式中有許多任務,而且…

任務之間高度獨立(是以它們不需要互相通信,或者等待彼此)而且…

在等待事件到來時,某些任務會阻塞。

當應用程式需要在任務間共享可變的資料時,這也是一個不錯的選擇,因為這裡不需要采用同步處理。

網絡應用程式通常都有上述這些特點,這使得它們能夠很好的契合事件驅動程式設計模型。

2.2 Select、Poll、Epoll異步IO

首先列一下,sellect、poll、epoll三者的差別

select

select最早于1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個檔案描述符的數組,當select()傳回後,該數組中就緒的檔案描述符便會被核心修改标志位,使得程序可以獲得這些檔案描述符進而進行後續的讀寫操作。

select目前幾乎在所有的平台上支援,其良好跨平台支援也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。

select的一個缺點在于單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯核心的方式提升這一限制。

另外,select()所維護的存儲大量檔案描述符的資料結構,随着檔案描述符數量的增大,其複制的開銷也線性增長。同時,由于網絡響應時間的延遲使得大量TCP連接配接處于非活躍狀态,但調用select()會對所有socket進行一次線性掃描,是以這也浪費了一定的開銷。

poll

poll在1986年誕生于System V Release 3,它和select在本質上沒有多大差别,但是poll沒有最大檔案描述符數量的限制。

poll和select同樣存在一個缺點就是,包含大量檔案描述符的數組被整體複制于使用者态和核心的位址空間之間,而不論這些檔案描述符是否就緒,它的開銷随着檔案描述符數量的增加而線性增大。

另外,select()和poll()将就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次調用

select()

poll()

的時候将再次報告這些檔案描述符,是以它們一般不會丢失就緒的消息,這種方式稱為水準觸發(Level Triggered)。

epoll

直到Linux2.6才出現了由核心直接支援的實作方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。

epoll可以同時支援水準觸發和邊緣觸發(Edge Triggered,隻告訴程序哪些檔案描述符剛剛變為就緒狀态,它隻說一遍,如果我們沒有采取行動,那麼它将不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實作相當複雜。

epoll同樣隻告知那些就緒的檔案描述符,而且當我們調用epoll_wait()獲得就緒檔案描述符時,傳回的不是實際的描述符,而是一個代表就緒描述符數量的值,你隻需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體映射(mmap)技術,這樣便徹底省掉了這些檔案描述符在系統調用時複制的開銷。

另一個本質的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,程序隻有在調用一定的方法後,核心才對所有監視的檔案描述符進行掃描,而epoll事先通過

epoll_ctl()

來注冊一個檔案描述符,一旦基于某個檔案描述符就緒時,核心會采用類似callback的回調機制,迅速激活這個檔案描述符,當程序調用epoll_wait()時便得到通知。

2.3 Python select

下面是Python select的服務端和用戶端通信示例:

select_server.py

import select
import socket
import sys
import queue

# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)

# Bind the socket to the port
server_address = ('localhost', )
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)

# Listen for incoming connections
server.listen()

# Sockets from which we expect to read
inputs = [ server ]

# Sockets to which we expect to write
outputs = [ ]

message_queues = {}
while inputs:

    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:

        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)

            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv()
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然用戶端都斷開了,我就不用再給它傳回資料了,是以這時候如果這個用戶端的連接配接對象還在outputs清單中,就把它删掉
                inputs.remove(s)    #inputs中也删除掉
                s.close()           #把這個連接配接關閉掉

                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        # Remove message queue
        del message_queues[s]
           

select_client.py

import socket
import sys

messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', )

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print(sys.stderr, 'connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print(sys.stderr, '%s: sending "%s"' % (s.getsockname(), message))
        s.send(bytes(message, encoding='utf-8'))

    # Read responses on both sockets
    for s in socks:
        data = s.recv()
        print(sys.stderr, '%s: received "%s"' % (s.getsockname(), data))
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname())
            s.close()
           

運作結果:

python3 select_server.py

<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> starting up on localhost port 10000

waiting for the next event
new connection from ('127.0.0.1', )

waiting for the next event
new connection from ('127.0.0.1', )

waiting for the next event
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> received "b'This is the message. '" from ('127.0.0.1', )

waiting for the next event
sending "b'This is the message. '" to ('127.0.0.1', )

waiting for the next event
output queue for ('127.0.0.1', ) is empty

waiting for the next event
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> received "b'This is the message. '" from ('127.0.0.1', )

waiting for the next event
sending "b'This is the message. '" to ('127.0.0.1', )

waiting for the next event
output queue for ('127.0.0.1', ) is empty

waiting for the next event
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> received "b'It will be sent '" from ('127.0.0.1', )

waiting for the next event
sending "b'It will be sent '" to ('127.0.0.1', )

waiting for the next event
output queue for ('127.0.0.1', ) is empty

waiting for the next event
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> received "b'It will be sent '" from ('127.0.0.1', )

waiting for the next event
sending "b'It will be sent '" to ('127.0.0.1', )

waiting for the next event
output queue for ('127.0.0.1', ) is empty

waiting for the next event
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> received "b'in parts.'" from ('127.0.0.1', )

waiting for the next event
sending "b'in parts.'" to ('127.0.0.1', )

waiting for the next event
output queue for ('127.0.0.1', ) is empty

waiting for the next event
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> received "b'in parts.'" from ('127.0.0.1', )

waiting for the next event
sending "b'in parts.'" to ('127.0.0.1', )

waiting for the next event
output queue for ('127.0.0.1', ) is empty

waiting for the next event
closing ('127.0.0.1', ) after reading no data

waiting for the next event
closing ('127.0.0.1', ) after reading no data

waiting for the next event
           

python3 select_client.py

<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> connecting to localhost port 
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): sending "This is the message. "
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): sending "This is the message. "
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): received "b'This is the message. '"
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): received "b'This is the message. '"
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): sending "It will be sent "
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): sending "It will be sent "
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): received "b'It will be sent '"
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): received "b'It will be sent '"
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): sending "in parts."
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): sending "in parts."
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): received "b'in parts.'"
<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'> ('127.0.0.1', ): received "b'in parts.'"
           

2.4 Python selectors

This module allows high-level and efficient I/O multiplexing, built upon the select module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.

示例:

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv()  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', ))
sock.listen()
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
           

參考資料:

[1] http://www.aosabook.org/en/twisted.html

[2] https://docs.python.org/3/library/select.html

[3] https://docs.python.org/3/library/selectors.html