天天看點

IO同步、異步與多路複用1. 重要概念

1. 重要概念

1.1 同步、異步

函數或方法被調用的時候,調用者是否能得到最終結果。直接得到最終結果的,就是同步調用,不直接得到最終結果的,就是異步調用。

1.2 阻塞、非阻塞

函數或方法調用的時候,是否like傳回,立即傳回就是非阻塞調用,不立即傳回就是阻塞調用。

同步、異步,與阻塞、給阻塞不相關,同步、異步強調的是,是否得到最終的結果,阻塞、非阻塞強調是時間,是否等待。

同步與異步差別在于:調用者是否得到了想要的最終結果。 同步就是一直要執行到傳回最終結果; 異步就是直接傳回了,但是傳回的不是最終結果。調用者不能通過這種調用得到結果,需要通過被調用者的其它方 式通知調用者,來取回最終結果。 阻塞與非阻塞的差別在于,調用者是否還能幹其他事。 阻塞,調用者就隻能幹等; 非阻塞,調用者可以先去忙會别的,不用一直等。

1.3 作業系統知識

X86 CPU有4種工作級别:

Ring0,可以執行特權指令,可以通路所有級别資料,可以通路IO裝置等。Ring3級,級别最低,隻能通路本級别資料。核心代碼運作在Ring0,使用者代碼運作在Ring3。

作業系統中,核心程式獨立且運作在 較高的特權級别上,它們駐留在被保護的記憶體空間上,擁有通路硬體的所有權限,這部分記憶體稱為核心空間(核心态)

普通應用程式運作在使用者空間(使用者态)。應用程式想通路某些硬體資源就需要通過作業系統提供的系統調用,系統調用可以使用特權指令運作在核心空間,此時程序陷入核心态運作。系統調用完成,程序将傳回到使用者态執行使用者空間代碼。

IO同步、異步與多路複用1. 重要概念

1.4 同步IO、異步IO、IO多路複用

1.4.1 IO兩個階段

  1. 資料準備階段。核心從裝置讀取資料到核心空間的緩沖區
  2. 核心空間複制回使用者空間程序緩沖區階段

發生IO的時候:

  1. 核心從IO裝置讀資料
  2. 程序從核心複制資料

1.5 IO模型

1.5.1 同步IO

同步IO模型包括阻塞IO、非阻塞IO、IO多路複用

阻塞IO:

IO同步、異步與多路複用1. 重要概念

程序等待(阻塞),直到讀寫完成。(全程等待)

非阻塞IO:

IO同步、異步與多路複用1. 重要概念

程序調用recvfrom操作,如果IO裝置沒有準備好,立即傳回ERROR,程序不阻塞。使用者可以再次發起系統調用(可以輪詢)。如果核心已經準備好,就阻塞,然後複制資料到使用者空間。

第一階段資料沒有準備好,可以先忙别的,等會再來看看,檢查資料是否準備好了的過程是非阻塞的。第二階段是阻塞的,即核心空間和使用者空間之間複制資料是阻塞的。

IO多路複用:

所謂IO多路複用,就是同時監控多個IO,有一個準備好了,就不需要等了開始處理,提高了同時處理IO的能力。select幾乎所有作業系統平台都支援,poll是對select的更新。epoll,Linux系統核心2.5+開始支援,對select和poll的更新,在監視的基礎上,增加了回調機制。BSD、Mac平台有kqueue,Windows有iocp。

IO同步、異步與多路複用1. 重要概念

以select為例,将關注的IO操作告訴select函數并調用,程序阻塞,核心“監視”關注的檔案描述符fd,被關注的任意一個fd對應的IO準備好了資料,select傳回。再使用read将資料複制到使用者程序。

一般情況下,select最多能監聽1024個fd,但是由于select采用輪詢的方式,當管理的IO多了,每次都要 周遊全部fd,效率低下。epoll沒有管理的fd上限,且是回調機制,不需周遊,效率很高。

信号驅動IO:

程序在IO通路時,先通過sigaction系統調用,送出一個信号處理函數,立即傳回,程序不阻塞。當核心準備好資料後,産生一個SIGIO信号并投遞給信号處理函數,可以在此函數中調用recvfrom函數操作資料從核心空間複制到使用者空間,這段過程阻塞。

IO同步、異步與多路複用1. 重要概念

異步IO:  (注意:回調是被調用者做得,不是調用者)

程序發起異步IO請求,立即傳回。核心完成IO的兩個階段,核心給程序發一個信号。在整個過程中,程序都可以忙别的,等好了再過來。

IO同步、異步與多路複用1. 重要概念

Linux的aio的系統調用,核心從版本二2.6開始支援:

IO同步、異步與多路複用1. 重要概念

1.6 python中的IO多路複用

IO多路複用:

  • 大多數作業系統都支援select和poll
  • Linux2.5+支援epoll
  • BSD、Mac支援kqueue
  • Solaris實作了/dev/poll
  • WindowsDE IOCP

python的select庫實作了select、poll系統調用,這個基本上作業系統都支援。部分實作了epoll,它是底層的額IO多路複用子產品。

開發中的選擇:

  1. 完全跨平台,使用select、poll。但是性能較差。
  2. 針對不同作業系統自行選擇支援的技術,這樣做會提高IO處理的性能。

select維護一個檔案描述符資料結構,單個程序使用有上限,通常是1024,線性掃面這個資料結構,效率低。poll和select的差別是内部資料結構使用連結清單,沒有這個最大限制,但是依然要周遊才能知道哪個裝置就緒了。epoll、使用事件通知機制,使用回調機制提高效率。select、poll還要從核心空間複制資料到使用者空間,而epoll通過核心空間和使用者空間共享一塊記憶體來減少複制。

1.6.1 selectors庫

poython3.4提供了selectors庫,進階的IO複用庫。

類層次結構:

IO同步、異步與多路複用1. 重要概念

selectors.DefaultSelector傳回目前平台最有效、性能最高的實作。但是沒有實作Windows下的IOCP,是以,Windows下隻能退化為select。

# 在selects子產品源碼最下面有如下代碼
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector
           

事件注冊:

class SelectSelector(BaseselctorImpol):
    """Select-based selector."""
    def register(fileobj, events, data=None) -> SelectorKey: 
        pass
           
  • 為selector注冊一個檔案對象,監視它的IO事件,傳回SelectorKey對象。
  • fileobj 被監視檔案對象,例如socket對象
  • events 事件,該檔案對象必須等待的事件
  • data 可選的與此檔案對象相關聯的不透明資料,例如,關聯用來存儲每個用戶端的會話ID,關聯方法。通過這個 參數在關注的事件産生後讓selector幹什麼事。
IO同步、異步與多路複用1. 重要概念

EVENT_READ =  (1 << 0)

EVENT_WRITE =  (1 << 1)

這樣定義常量的好處是便于合并

selectors.SelectorKey有4個屬性:

  1. fileobj注冊的檔案對象
  2. fd檔案描述符
  3. events等待上面的檔案描述符的檔案對象的事件
  4. data注冊時關聯的資料

IO多路複用實作TCP Server:

import selectors
import socket


s = selectors.DefaultSelector()  # 1拿到selector

# 準備類檔案對象
server = socket.socket()
server.bind(('127.0.0.1', 9997))
server.listen()

# 官方建議采用非阻塞IO
server.setblocking(False)


def accept(sock: socket.socket, mas: int):
    conn, r_address = sock.accept()
    # print(conn)
    # print(r_address)
    print(mas)
    # pass
    conn.setblocking(False)
    key1 = s.register(conn, selectors.EVENT_READ, rec)
    print(key1)


def rec(conn: socket.socket, mas: int):
    print(mas)
    data = conn.recv(1024)
    print(data)

    msg = 'Your msg = {} form {}'.format(data.decode(), conn.getpeername())
    conn.send(msg.encode())


# 2注冊關注的類檔案對象和其事件們
key = s.register(server, selectors.EVENT_READ, accept)  # socket fileobject
print(key)

while True:
    events = s.select()  # epoll select,預設是阻塞的
    # 當你注冊時的檔案對象們,這其中的至少一個對象關注的事件就緒了,就不阻塞了
    print(events)  # 獲得了就緒的對象們,包括就緒的事件,還會傳回data

    for key, mask in events:  # event =>key, mask
        # 每一個event都是某一個被觀察的就緒的對象
        print(type(key), type(mask))   # key, mask
        # <class 'selectors.SelectorKey'> <class 'int'>
        print(key.data)
        # <function accept at 0x0000000001EA3A60>
        key.data(key.fileobj, mask)  # mask為掩碼

server.close()
s.close()
           

IO多路複用實作群聊:

# IO多路複用,實作TCP版本的群聊
import socket
import threading
import selectors
import logging


FORMAT = "%(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatServer:

    def __init__(self, ip='127.0.0.1', port=9992):
        self.sock = socket.socket()
        self.address = ip, port
        self.event = threading.Event()

        self.selector = selectors.DefaultSelector()

    def start(self):
        self.sock.bind(self.address)
        self.sock.listen()
        self.sock.setblocking(False)

        key = self.selector.register(self.sock, selectors.EVENT_READ, self.accept)  # 隻有一個
        logging.info(key)  # 隻有一個
        # self.accept_key = key
        # self.accept_fd = key.fd

        threading.Thread(target=self.select, name='select', daemon=True).start()

    def select(self):

        while not self.event.is_set():
            events = self.selector.select()  # 阻塞
            for key, _ in events:
                key.data(key.fileobj)  # select線程

    def accept(self, sock: socket.socket):  # 在select線程中運作的
        new_sock, r_address = sock.accept()
        new_sock.setblocking(False)
        print('~' * 30)

        key = self.selector.register(new_sock, selectors.EVENT_READ, self.rec)  # 有n個
        logging.info(key)

    def rec(self, conn: socket.socket):  # 在select線程中運作的
        data = conn.recv(1024)
        logging.info(data.decode(encoding='cp936'))

        if data.strip() == b'quit' or data.strip() == b'':
            self.selector.unregister(conn)  # 關閉之前,登出,了解為之前的從字典中移除socket對象
            conn.close()
            return

        for key in self.selector.get_map().values():
            s = key.fileobj
            # if key.fileobj is self.sock:  # 方法一
            #     continue
            # if key == self.accept_key:  # 方法二
            #     continue
            # if key.fd == self.accept_fd:  # 方法三
            #     continue
            # msg = 'Your msg = {} form {}'.format(data.decode(encoding='cp936'), conn.getpeername())
            # s.send(msg.encode(encoding='cp936'))
            # print(key.data)
            # print(self.rec)
            # print(1, key.data is self.rec)  # False
            # print(2, key.data == self.rec)  # True
            if key.data == self.rec:  # 方法四
                msg = 'Your msg = {} form {}'.format(data.decode(encoding='cp936'), conn.getpeername())
                s.send(msg.encode(encoding='cp936'))

    def stop(self):  # 在主線程中運作的
        self.event.set()
        fs = set()
        for k in self.selector.get_map().values():
            fs.add(k.fileobj)
        for f in fs:
            self.selector.unregister(f)  # 相當于以前的釋放資源
            f.close()
        self.selector.close()


if __name__ == "__main__":
    cs = ChatServer()
    cs.start()

    while True:
        cmd = input(">>>").strip()
        if cmd == 'quit':
            cs.stop()
            break
        logging.info(threading.enumerate())
        logging.info(list(cs.selector.get_map().keys()))
        # for fd, ke in cs.selector.get_map().items():
        #     logging.info(fd)
        #     print(ke)
        #     print()
           

總結:

使用IO多路複用 + (select、epoll)并不一定比多線程+ 同步阻塞性能好,其最大的優勢是可以處理更多的連接配接。多線程+同步阻塞IO模式,開辟太多的線程,線程開辟、銷毀開銷還是較大,倒是可以使用線程池;線程多,線程自己使用的記憶體也很可觀,多線程切換時,要保護現場和恢複現場,線程過多,切換回占用大量的時間 。

連接配接較少,多線程+同步阻塞IO模式比較合适,效率也不低。如果連接配接非常多,對服務端來說,IO并發還是比較高的,這時候開辟很多線程其實也不是很劃算,此時IO多路複用或許是更好的選擇。

内容源碼均在github倉庫