天天看點

python系列之 - select

I/O多路複用是在單線程模式下實作多線程的效果,實作一個多I/O并發的效果。

看一個簡單socket例子:

import socket

SOCKET_FAMILY = socket.AF_INET
SOCKET_TYPE = socket.SOCK_STREAM

sockServer = socket.socket()
sockServer.bind(('0.0.0.0', 8888))
sockServer.listen(5)

while True:
    cliobj, addr = sockServer.accept()
    while True:
        recvdata = cliobj.recv(1024)
        if recvdata:
            print(recvdata.decode())
        else:
            cliobj.close()
            break
           

用戶端:

import socket

socCli = socket.socket()
socCli.connect(('127.0.0.1', 8888))
while True:
    data = input("input str:")
    socCli.send(data.encode())
<br />
           

以上為一個簡單的用戶端發送一個輸入資訊給服務端的socket通信的執行個體,在以上的例子中,服務端是一個單線程、阻塞模式的。如何實作多用戶端連接配接呢,我們可以使用多線程模式,這個當然沒有問題。 使用多線程、阻塞socket來處理的話,代碼會很直覺,但是也會有不少缺陷。它很難確定線程共享資源沒有問題。而且這種程式設計風格的程式在隻有一個CPU的電腦上面效率更低。但如果一個使用者開啟的線程有限的情況下,比如1024個。當第1025個用戶端連接配接是仍然會阻塞。

有沒有一種比較好的方式呢,當然有,其一是使用異步socket。 這種socket隻有在一些event觸發時才會阻塞。相反,程式在異步socket上面執行一個動作,會立即被告知這個動作是否成功。程式會根據這個信 息決定怎麼繼續下面的操作由于異步socket是非阻塞的,就沒有必要再來使用多線程。所有的工作都可以在一個線程中完成。這種單線程模式有它自己的挑 戰,但可以成為很多方案不錯的選擇。它也可以結合多線程一起使用:單線程使用異步socket用于處理伺服器的網絡部分,多線程可以用來通路其他阻塞資 源,比如資料庫。Linux的2.6核心有一系列機制來管理異 步socket,其中3個有對應的Python的API:select、poll和epoll。epoll和pool比select更好,因為 Python程式不需要檢查每一個socket感興趣的event。相反,它可以依賴作業系統來告訴它哪些socket可能有這些event。epoll 比pool更好,因為它不要求作業系統每次都去檢查python程式需要的所有socket感興趣的event。而是Linux在event發生的時候會 跟蹤到,并在Python需要的時候傳回一個清單。是以epoll對于大量(成千上萬)并發socket連接配接,是更有效率和可擴充的機制

異步I/O處理模型

python系列之 - select

select最早于1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個檔案描述符的數組,當select()傳回後,該數組中就緒的檔案描述符便會被核心修改标志位,使得程序可以獲得這些檔案描述符進而進行後續的讀寫操作。

select目前幾乎在所有的平台上支援,其良好跨平台支援也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。

select的一個缺點在于單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯核心的方式提升這一限制。

另外,select()所維護的存儲大量檔案描述符的資料結構,随着檔案描述符數量的增大,其複制的開銷也線性增長。同時,由于網絡響應時間的延遲使得大量TCP連接配接處于非活躍狀态,但調用select()會對所有socket進行一次線性掃描,是以這也浪費了一定的開銷

select  poll   epoll比較

1 特點
select  

select本質上是通過設定或者檢查存放fd标志位的資料結構來進行下一步處理。這樣所帶來的缺點是:

1 單個程序可監視的fd數量被限制

2 需要維護一個用來存放大量fd的資料結構,這樣會使得使用者空間和核心空間在傳遞該結構時複制開銷大

3 對socket進行掃描時是線性掃描

poll  

poll本質上和select沒有差別,它将使用者傳入的數組拷貝到核心空間,然後查詢每個fd對應的裝置狀态,如果裝置就緒則在裝置等待隊列中加入一項并繼續周遊,如果周遊完所有fd後沒有發現就緒裝置,則挂起目前程序,直到裝置就緒或者主動逾時,被喚醒後它又要再次周遊fd。這個過程經曆了多次無謂的周遊。

它沒有最大連接配接數的限制,原因是它是基于連結清單來存儲的,但是同樣有一個缺點:

大量的fd的數組被整體複制于使用者态和核心位址空間之間,而不管這樣的複制是不是有意義。

poll還有一個特點是“水準觸發”,如果報告了fd後,沒有被處理,那麼下次poll時會再次報告該fd。

epoll  

epoll支援水準觸發和邊緣觸發,最大的特點在于邊緣觸發,它隻告訴程序哪些fd剛剛變為就需态,并且隻會通知一次。

在前面說到的複制問題上,epoll使用mmap減少複制開銷。

還有一個特點是,epoll使用“事件”的就緒通知方式,通過epoll_ctl注冊fd,一旦該fd就緒,核心就會采用類似callback的回調機制來激活該

fd,epoll_wait便可以收到通知

2 支援一個程序所能打開的最大連接配接數
select   單個程序所能打開的最大連接配接數有FD_SETSIZE宏定義,其大小是32個整數的大小(在32位的機器上,大小就是32*32,同理64位機器上FD_SETSIZE為32*64),當然我們可以對進行修改,然後重新編譯核心,但是性能可能會受到影響,這需要進一步的測試。
poll   poll本質上和select沒有差別,但是它沒有最大連接配接數的限制,原因是它是基于連結清單來存儲的
epoll   雖然連接配接數有上限,但是很大,1G記憶體的機器上可以打開10萬左右的連接配接,2G記憶體的機器可以打開20萬左右的連接配接
3 FD劇增後帶來的IO效率問題
select   因為每次調用時都會對連接配接進行線性周遊,是以随着FD的增加會造成周遊速度慢的“線性下降性能問題”。
poll   同上
epoll   因為epoll核心中實作是根據每個fd上的callback函數來實作的,隻有活躍的socket才會主動調用callback,是以在活躍socket較少的情況下,使用epoll沒有前面兩者的線性下降的性能問題,但是所有socket都很活躍的情況下,可能會有性能問題。
4 消息傳遞方式
select   核心需要将消息傳遞到使用者空間,都需要核心拷貝動作。
poll   同上
epoll   epoll通過核心和使用者空間共享一塊記憶體來實作的。

下面我們對上面的socket例子進行改造,看一下select的例子:

elect 詳細解釋,用線程的IO多路複用實作一個讀寫分離的、支援多用戶端的連接配接請求
"""
import socket
import queue
from select import select

SERVER_IP = ('127.0.0.1', 9999)

# 儲存用戶端發送過來的消息,将消息放入隊列中
message_queue = {}
input_list = []
output_list = []

if __name__ == "__main__":
    server = socket.socket()
    server.bind(SERVER_IP)
    server.listen(10)
    # 設定為非阻塞
    server.setblocking(False)

    # 初始化将服務端加入監聽清單
    input_list.append(server)

    while True:
        # 開始 select 監聽,對input_list中的服務端server進行監聽
        stdinput, stdoutput, stderr = select(input_list, output_list, input_list)

        # 循環判斷是否有用戶端連接配接進來,當有用戶端連接配接進來時select将觸發
        for obj in stdinput:
            # 判斷目前觸發的是不是服務端對象, 當觸發的對象是服務端對象時,說明有新用戶端連接配接進來了
            if obj == server:
                # 接收用戶端的連接配接, 擷取用戶端對象和用戶端位址資訊
                conn, addr = server.accept()
                print("Client {0} connected! ".format(addr))
                # 将用戶端對象也加入到監聽的清單中, 當用戶端發送消息時 select 将觸發
                input_list.append(conn)
                # 為連接配接的用戶端單獨建立一個消息隊列,用來儲存用戶端發送的消息
                message_queue[conn] = queue.Queue()

            else:
                # 由于用戶端連接配接進來時服務端接收用戶端連接配接請求,将用戶端加入到了監聽清單中(input_list),用戶端發送消息将觸發
                # 是以判斷是否是用戶端對象觸發
                try:
                    recv_data = obj.recv(1024)
                    # 用戶端未斷開
                    if recv_data:
                        print("received {0} from client {1}".format(recv_data.decode(), addr))
                        # 将收到的消息放入到各用戶端的消息隊列中
                        message_queue[obj].put(recv_data)

                        # 将回複操作放到output清單中,讓select監聽
                        if obj not in output_list:
                            output_list.append(obj)

                except ConnectionResetError:
                    # 用戶端斷開連接配接了,将用戶端的監聽從input清單中移除
                    input_list.remove(obj)
                    # 移除用戶端對象的消息隊列
                    del message_queue[obj]
                    print("\n[input] Client  {0} disconnected".format(addr))

        # 如果現在沒有用戶端請求,也沒有用戶端發送消息時,開始對發送消息清單進行處理,是否需要發送消息
        for sendobj in output_list:
            try:
                # 如果消息隊列中有消息,從消息隊列中擷取要發送的消息
                if not message_queue[sendobj].empty():
                    # 從該用戶端對象的消息隊列中擷取要發送的消息
                    send_data = message_queue[sendobj].get()
                    sendobj.sendall(send_data)
                else:
                    # 将監聽移除等待下一次用戶端發送消息
                    output_list.remove(sendobj)

            except ConnectionResetError:
                # 用戶端連接配接斷開了
                del message_queue[sendobj]
                output_list.remove(sendobj)
                print("\n[output] Client  {0} disconnected".format(addr))

           

epoll實作執行個體

#!/usr/bin/env python
import select
import socket

response = b''

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
# 因為socket預設是阻塞的,是以需要使用非阻塞(異步)模式。
serversocket.setblocking(0)

# 建立一個epoll對象
epoll = select.epoll()
# 在服務端socket上面注冊對讀event的關注。一個讀event随時會觸發服務端socket去接收一個socket連接配接
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
    # 字典connections映射檔案描述符(整數)到其相應的網絡連接配接對象
    connections = {}
    requests = {}
    responses = {}
    while True:
        # 查詢epoll對象,看是否有任何關注的event被觸發。參數“1”表示,我們會等待1秒來看是否有event發生。
        # 如果有任何我們感興趣的event發生在這次查詢之前,這個查詢就會帶着這些event的清單立即傳回
        events = epoll.poll(1)
        # event作為一個序列(fileno,event code)的元組傳回。fileno是檔案描述符的代名詞,始終是一個整數。
        for fileno, event in events:
            # 如果是服務端産生event,表示有一個新的連接配接進來
            if fileno == serversocket.fileno():
                connection, address = serversocket.accept()
                print('client connected:', address)
                # 設定新的socket為非阻塞模式
                connection.setblocking(0)
                # 為新的socket注冊對讀(EPOLLIN)event的關注
                epoll.register(connection.fileno(), select.EPOLLIN)
                connections[connection.fileno()] = connection
                # 初始化接收的資料
                requests[connection.fileno()] = b''

            # 如果發生一個讀event,就讀取從用戶端發送過來的新資料
            elif event & select.EPOLLIN:
                print("------recvdata---------")
                # 接收用戶端發送過來的資料
                requests[fileno] += connections[fileno].recv(1024)
                # 如果用戶端退出,關閉用戶端連接配接,取消所有的讀和寫監聽
                if not requests[fileno]:
                    connections[fileno].close()
                    # 删除connections字典中的監聽對象
                    del connections[fileno]
                    # 删除接收資料字典對應的句柄對象
                    del requests[connections[fileno]]
                    print(connections, requests)
                    epoll.modify(fileno, 0)
                else:
                    # 一旦完成請求已收到,就登出對讀event的關注,注冊對寫(EPOLLOUT)event的關注。寫event發生的時候,會回複資料給用戶端
                    epoll.modify(fileno, select.EPOLLOUT)
                    # 列印完整的請求,證明雖然與用戶端的通信是交錯進行的,但資料可以作為一個整體來組裝和處理
                    print('-' * 40 + '\n' + requests[fileno].decode())

            # 如果一個寫event在一個用戶端socket上面發生,它會接受新的資料以便發送到用戶端
            elif event & select.EPOLLOUT:
                print("-------send data---------")
                # 每次發送一部分響應資料,直到完整的響應資料都已經發送給作業系統等待傳輸給用戶端
                byteswritten = connections[fileno].send(requests[fileno])
                requests[fileno] = requests[fileno][byteswritten:]
                if len(requests[fileno]) == 0:
                    # 一旦完整的響應資料發送完成,就不再關注寫event
                    epoll.modify(fileno, select.EPOLLIN)

            # HUP(挂起)event表明用戶端socket已經斷開(即關閉),是以服務端也需要關閉。
            # 沒有必要注冊對HUP event的關注。在socket上面,它們總是會被epoll對象注冊
            elif event & select.EPOLLHUP:
                print("end hup------")
                # 登出對此socket連接配接的關注
                epoll.unregister(fileno)
                # 關閉socket連接配接
                connections[fileno].close()
                del connections[fileno]
finally:
    # 打開的socket連接配接不需要關閉,因為Python會在程式結束的時候關閉。這裡顯式關閉是一個好的代碼習慣
    epoll.unregister(serversocket.fileno())
    epoll.close()
    serversocket.close()