天天看點

網絡IO-阻塞、非阻塞、IO複用、異步

  網絡socket輸入操作分為兩個階段:等待網絡資料到達和将到達核心的資料複制到應用程序緩沖區。對這兩個階段不同的處理方式将網絡IO分為不同的模型:IO阻塞模型、非阻塞模型、多路複用和異步IO。

一 阻塞模型

  阻塞模型原理如下圖1.1,當進行系統調用recvfrom時,應用程序進入核心态,核心判斷是否已收到資料報,若沒有則阻塞直到資料報準備好,接着複制資料到應用程序緩沖區,然後函數傳回。

網絡IO-阻塞、非阻塞、IO複用、異步

圖1.1 阻塞IO模型

  阻塞模型缺點:若資料報未準備好,則線程阻塞,不能進行其它操作和網絡連接配接請求。

  利用多程序多線程方案,為每個連接配接建立一個程序或線程,這樣一個線程的阻塞不會影響到其它連接配接,但當遇到連接配接請求比較多時,會建立較多的程序或線程,嚴重浪費系統資源,影響程序的響應效率,程序和線程也更容易進入假死狀态。

  利用線程池或連接配接池,可以減少資源消耗。線程池利用已有線程,減少線程頻繁建立和銷毀,線程維持在一定數量,當有新的連接配接請求時,重用已有線程。連接配接池盡量重用已有連接配接,減少連接配接的建立和關閉。線程池和連接配接池一定程度上緩解頻繁IO的資源消耗,但線程池和連接配接池都有一定規模,當連接配接請求數遠超過池上線,池系統構成的響應并不比多線程方案好多少。[1]

  阻塞模型python執行個體demo如下:

  阻塞模型server端

def start_blocking(self):
        """同步阻塞server"""
        self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.ssock.bind(('', 8080))
        self.ssock.listen(5)
        count = 0
        while True:
            conn, addr = self.ssock.accept()
            count += 1
            print 'Connected by', addr
            print 'Accepted clinet count:%d' % count
            data = conn.recv(1024) #若無資料則阻塞
            if data:
                conn.sendall(data)
            conn.close()      

  阻塞模型client

def start_blocking(self):
        self.host = '123.207.123.108'
        self.port = 8080
        self.csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.csock.connect((self.host, self.port))
        data = self.csock.recv(1024)
        print data      

  運作server端,并運作兩個client執行個體去連接配接服務端,運作結果如下圖1.2,可以看到雖然有兩個用戶端去連接配接,但卻隻有一個連接配接上,服務端的socket conn為阻塞套接字,conn.recv(1024)未收到用戶端發送的資料,處于阻塞狀态,服務端無法再響應另一個用戶端的連接配接。

網絡IO-阻塞、非阻塞、IO複用、異步

圖1.2 阻塞IO服務端運作結果

二 非阻塞模型

  由于阻塞IO無法滿足大規模請求的缺點,是以出現了非阻塞模型。非阻塞IO模型如下圖1.3所示,當資料報未準備好,recvfrom立即傳回一個EWOULDBLOCK錯誤,可以利用輪詢不停調用recvfrom,當資料報準備好,核心則将資料複制到應用程序緩沖區。

網絡IO-阻塞、非阻塞、IO複用、異步

圖1.3 非阻塞IO模型

  非阻塞IO模型需要利用輪詢不斷調用recvfrom,浪費大量CPU時間,且當核心接收到資料時,需要等到下一次輪詢才能複制到應用程序緩沖區,資料得不到立刻處理。

  非阻塞模型python demo如下:

  非阻塞服務端

def start_noblocking(self):
        """
        同步非阻塞
        """
        self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.ssock.bind(('', 8080))
        self.ssock.listen(5)
        count = 0
        while True:
            conn, addr = self.ssock.accept()
            conn.setblocking(0) #設定為非阻塞socket
            count += 1
            print 'Connected by', addr
            print 'Accepted clinet count:%d' % count
            try:
                data = conn.recv(1024) #非阻塞,沒有資料會立刻傳回
                if data:
                    conn.sendall(data)
            except Exception as e:
               pass
            finally:
                conn.close()      

  運作非阻塞服務端和兩個用戶端執行個體,結果如下圖1.4所示,服務端接收兩個連接配接請求。由于conn被設定為非阻塞socket,即使用戶端并沒有向服務端發送資料,conn.recv(1024)也會立即傳回,不會阻塞,進而程序可以接收新的連接配接請求。

網絡IO-阻塞、非阻塞、IO複用、異步

圖1.4 非阻塞服務端運作結果

三 IO複用

  IO複用在linux中包括select、poll、epoll模型三種,這三個IO複用模型有各自的API實作,以select模型為例,調用select函數,程序進入阻塞, 同時監控多個套接字描述符的狀态 ,當有資料報變為可讀或阻塞逾時才傳回,接着程序可調用recvfrom接收資料報到應用程序緩沖區。

網絡IO-阻塞、非阻塞、IO複用、異步

圖3.1 IO複用模型

  使用IO複用的優點是可以等待多個描述符就緒。[2]

3.1 select模型

  select模型api如下:

  int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);

timout表示核心等待任一描述符就緒可等待的時間,有三種情況:

  1) 空指針,表示可以一直等下去,直到有描述符就緒。

  2) timeout時間為0,不等待檢查描述符狀态立即傳回。

  3) 時間不為0,表示等待一定時間,在有描述符準備好但不超過timeval結構所指定的秒數和微秒數。

readset、writeset、exceptset指定需要核心測試讀、寫和異常條件的描述符。fd_set表示描述符集,在select中用整數數組表示,整數的每一位表示一個描述符, readset、writeset、exceptset這三個參數是值-結果類型。

  可以用以下幾個宏設定和測試fd_set。在調用select函數前,用1、2、3設定需要監控的描述符,循環調用4測試調用select函數後的描述符,看是否準備好。

  1) int FD_ZERO(int fd, fd_set *fdset);

  2) int FD_CLR(int fd, fd_set *fdset);

  3) int FD_SET(int fd, fd_set *fd_set);

  4) int FD_ISSET(int fd, fd_set *fdset);

  導緻select傳回某個套接字就緒的條件如下:

網絡IO-阻塞、非阻塞、IO複用、異步

圖3.2 就緒條件

  maxfd1表示指定待測試描述符個數,值為待測試描述符最大值加1,用這個參數可告訴核心最大隻周遊到maxfd1-1的描述符。maxfd1最大不能超過常量FD_SETSIZE(值預設為1024,更改該值需重新編譯核心)。

  select函數的傳回值為整數,表示跨所有描述符集已就緒的總位數。如果逾時則傳回0。傳回-1表示出錯,比如被中斷[3]。

  select實作原理:從使用者空間拷貝fd_set到核心空間,周遊所有fd,将目前程序挂到各個裝置的等待隊列中,挂到隊列的同時會傳回是否就緒的掩碼,當所有fd傳回的掩碼均未就緒,則目前程序睡眠。當fd對應裝置驅動發現可讀寫時,則會喚醒處于睡眠态的程序。如果超過一定時間還未喚醒, 則調用select的程序會重新被喚醒獲得CPU,進而重新周遊fd,判斷有沒有就緒的fd,将fd_set從核心空間拷到使用者空間[4]。

  select實作的缺點:

  1) 每次都需要将fd_set拷貝到核心空間,當fd_set較大時開銷很大

  2) 每次都需要在核心中周遊fd加入到等待隊列,fd較多開銷較大

  3) Select支援的檔案描述符太小,預設為1024。

  select模型python demo如下:

  select模型服務端

def start(self):
        # create a socket
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setblocking(False)
        # set option reused
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_address = ('', 8080)
        server.bind(server_address)

        server.listen(10)

        # sockets from which we except to read
        inputs = [server]

        # sockets from which we expect to write
        outputs = []

        # Outgoing message queues (socket:Queue)
        message_queues = {}

        # A optional parameter for select is TIMEOUT
        timeout = 20

        while inputs:
            print "waiting for next event"
            # 每次調用select函數,需要将所有socket重新傳一次
            readable, writable, exceptional = select.select(
                inputs, outputs, inputs, timeout)

            # When timeout reached , select return three empty lists
            if not (readable or writable or exceptional):
                print "Time out ! "
                break
            for s in readable:
                if s is server:  # 監聽套接字
                    # A "readable" socket is ready to accept a connection
                    connection, client_address = s.accept()
                    print "    connection from ", client_address
                    connection.setblocking(0)
                    inputs.append(connection)
                    message_queues[connection] = Queue.Queue()
                else:
                    data = s.recv(1024)  # 接收到資料
                    if data:
                        print " received ", data, "from ", s.getpeername()
                        message_queues[s].put(data)
                        # Add output channel for response
                        if s not in outputs:
                            outputs.append(s)
                    else:  # 讀這端的連接配接關閉
                        # Interpret empty result as closed connection
                        print "  closing", client_address
                        if s in outputs:
                            outputs.remove(s)
                        inputs.remove(s)
                        s.close()
                        # remove message queue
                        del message_queues[s]
            for s in writable:
                try:
                    next_msg = message_queues[s].get_nowait()
                except Queue.Empty:
                    print " ", s.getpeername(), 'queue empty'
                    outputs.remove(s)
                else:
                    print " sending ", next_msg, " to ", s.getpeername()
                    s.send(next_msg)

            for s in exceptional:
                print " exception condition on ", s.getpeername()
                # stop listening for input on the connection
                inputs.remove(s)
                if s in outputs:
                    outputs.remove(s)
                s.close()
                # Remove message queue
                del message_queues[s]      

  select模型用戶端

def start(self):
        messages = ["hello world"]
        print "Connect to the server"
         
        server_address = ("123.207.123.108",8080)
         
        #Create a TCP/IP sock
         
        socks = []
         
        for i in range(3):
            socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM))
         
        for s in socks:
            s.connect(server_address)
         
        counter = 0
        for message in messages :
            #Sending message from different sockets
            for s in socks:
                counter+=1
                print "  %s sending %s" % (s.getpeername(),message+" version "+str(counter))
                s.send(message+" version "+str(counter))
            #Read responses on both sockets
            for s in socks:
                data = s.recv(1024)
                print " %s received %s" % (s.getpeername(),data)
                if not data:
                    print "%s closing socket "%s.getpeername()
                    s.close()      

  分别運作服務端和用戶端,結果如下:

網絡IO-阻塞、非阻塞、IO複用、異步

圖3.3 select模型服務端運作結果

網絡IO-阻塞、非阻塞、IO複用、異步

圖3.4 select模型用戶端運作結果

3.2 poll模型

  poll模型api如下[8]:

#include <poll.h>
int poll(struct pollfd fds[], nfds_t nfds, int timeout);

typedef struct pollfd {
        int fd;      // 需要被檢測或選擇的檔案描述符
        short events;  // 對檔案描述符fd上感興趣的事件
        short revents;   // 檔案描述符fd上目前實際發生的事件*/
} pollfd_t;      

  1) poll()函數傳回fds集合中就緒的讀、寫,或出錯的描述符數量,傳回0表示逾時,傳回-1表示出錯;

  2) fds是一個struct pollfd類型的數組,用于存放需要檢測其狀态的socket描述符,并且調用poll函數之後fds數組不會被清空;

  3) nfds記錄數組fds中描述符的總數量;

  4) timeout是調用poll函數阻塞的逾時時間,機關毫秒;

  5) 一個pollfd結構體表示一個被監視的檔案描述符,通過傳遞fds[]訓示 poll() 監視多個檔案描述符。其中,結構體的events域是監視該檔案描述符的事件掩碼,由使用者來設定這個域,結構體的revents域是檔案描述符的操作結果事件掩碼,核心在調用傳回時設定這個域。events域中請求的任何事件都可能在revents域中傳回。

  合法的事件如下:

  1) POLLIN 有資料可讀

  2) POLLRDNORM 有普通資料可讀

  3) POLLRDBAND 有優先資料可讀

  4) POLLPRI 有緊迫資料可讀

  5) POLLOUT 寫資料不會導緻阻塞

  6) POLLWRNORM 寫普通資料不會導緻阻塞

  7) POLLWRBAND 寫優先資料不會導緻阻塞

  8) POLLERR 發生錯誤

  9) POLLHUP 發生挂起

  當需要監聽多個事件時,使用POLLIN | POLLPRI設定 events 域;當poll調用之後檢測某事件是否發生時,fds[i].revents & POLLIN進行判斷

poll模型和select模型相似,poll模型同樣需要将所有監控的描述符重新拷貝到核心,并在核心中對所有描述符進行周遊,沒有解決select模型的性能問題,但是poll模型沒有最大檔案描述符數量的限制。

  select()和poll()将就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次調用select()和poll()的時候将再次報告這些檔案描述符,是以它們一般不會丢失就緒的消息,這種方式稱為水準觸發[5]。

  poll模型python demo如下:

def start(self)://poll模型服務端
        # Create a TCP/IP socket, and then bind and listen
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setblocking(False)
        server_address = ('', 8080)
        print "Starting up on %s port %s" % server_address
        server.bind(server_address)
        server.listen(5)
        message_queues = {}
        # The timeout value is represented in milliseconds, instead of seconds.
        timeout = 5000
        # Create a limit for the event,POLLIN = POLLRDNORM | POLLRDBAND
        READ_ONLY = (select.POLLIN | select.POLLPRI)
        READ_WRITE = (READ_ONLY | select.POLLOUT) #POLLOUT=POLLWRNORM | POLLWRBAND
        # Set up the poller
        poller = select.poll()
        poller.register(server, READ_ONLY)
        # Map file descriptors to socket objects
        fd_to_socket = {server.fileno(): server, }
        while True:
            print "Waiting for the next event"
            events = poller.poll(timeout)
            if len(events) == 0:
                print 'Time out'
                break
            print "*" * 20
            print len(events)
            print events
            print "*" * 20
            for fd, flag in events:
                s = fd_to_socket[fd]
                if flag & (select.POLLIN | select.POLLPRI):
                    if s is server:
                        # A readable socket is ready to accept a connection
                        connection, client_address = s.accept()
                        print " Connection ", client_address
                        connection.setblocking(False)

                        fd_to_socket[connection.fileno()] = connection
                        poller.register(connection, READ_ONLY)

                        # Give the connection a queue to send data
                        message_queues[connection] = Queue.Queue()
                    else:
                        data = s.recv(1024)
                        if data:
                            # A readable client socket has data
                            print "  received %s from %s " % (data, s.getpeername())
                            message_queues[s].put(data)
                            poller.modify(s, READ_WRITE)
                        else:
                            # Close the connection
                            print "  closing", s.getpeername()
                            # Stop listening for input on the connection
                            poller.unregister(s)
                            s.close()
                            del message_queues[s]
                elif flag & select.POLLHUP:
                    # A client that "hang up" , to be closed.
                    print " Closing ", s.getpeername(), "(HUP)"
                    poller.unregister(s)
                    s.close()
                elif flag & select.POLLOUT:
                    # Socket is ready to send data , if there is any to send
                    try:
                        next_msg = message_queues[s].get_nowait()
                    except Queue.Empty:
                        # No messages waiting so stop checking
                        print s.getpeername(), " queue empty"
                        poller.modify(s, READ_ONLY)
                    else:
                        print " sending %s to %s" % (next_msg, s.getpeername())
                        s.send(next_msg)
                elif flag & select.POLLERR:
                    # Any events with POLLERR cause the server to close the
                    # socket
                    print "  exception on", s.getpeername()
                    poller.unregister(s)
                    s.close()
                    del message_queues[s]      

3.3 epoll模型

  epoll模型api包含三個系統調用[7]:

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);      

  1. epoll_create建立epoll句柄epfd。size表示在這個epoll fd上能關注的最大fd數,失敗時傳回-1。

  2. epoll_ctl注冊要監聽的事件。

    1) epfd表示epoll句柄;

    2) op表示fd操作類型:EPOLL_CTL_ADD(注冊新的fd到epfd中),EPOLL_CTL_MOD(修改已注冊的fd的監聽事件),EPOLL_CTL_DEL(從epfd中删除一個fd)

    3) fd是要監聽的描述符;

    4) event表示要監聽的事件; EPOLLIN表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉);EPOLLOUT表示對應的檔案描述符可以寫;EPOLLPRI表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來);EPOLLERR表示對應的檔案描述符發生錯誤;EPOLLHUP表示對應的檔案描述符被挂斷;EPOLLET将EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水準觸發(Level Triggered)來說的。EPOLLONESHOT隻監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列裡[8]。

  3. epoll_wait函數等待事件就緒,成功時傳回就緒的事件數目,調用失敗時傳回 -1,等待逾時傳回 0。

    1) epfd是epoll句柄

    2) events表示從核心得到的就緒事件集合

    3) maxevents告訴核心events的大小

    4) timeout表示等待的逾時事件

  epoll_event結構體定義如下:

  

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;      

  epoll模型利用三個函數代替select和poll模型的三個函數,可以避免select模型的三個缺點。

  1) 不需要每次都将相同的fd監聽事件重新拷貝到核心。epoll的解決方案在epoll_ctl函數中。每次注冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把所有的fd拷貝進核心,而不是在epoll_wait的時候重複拷貝。epoll保證了每個fd在整個過程中隻會拷貝一次。

  2) 不需要再核心中周遊所有fd來看事件是否就緒。epoll的解決方案不像select或poll一樣每次都把current程序輪流加入fd對應的裝置等待隊列中,而隻在epoll_ctl時把current程序挂一遍(這一遍必不可少)并為每個fd指定一個回調函數,當裝置就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒連結清單)。epoll_wait的工作實際上就是在這個就緒連結清單中檢視有沒有就緒的fd。

  3) 所監聽的檔案描述符的數目不像select有上限限制, 所支援的FD上限是最大可以打開檔案的數目。

  epoll對檔案描述符的操作有兩種模式:LT(level trigger,水準觸發)和ET(edge trigger)。

  1) 水準觸發:預設工作模式,即當epoll_wait檢測到某描述符事件就緒并通知應用程式時,應用程式可以不立即處理該事件;下次調用epoll_wait時,會再次通知此事件。

  2) 邊緣觸發:當epoll_wait檢測到某描述符事件就緒并通知應用程式時,應用程式必須立即處理該事件。如果不處理,下次調用epoll_wait時,不會再次通知此事件。(直到你做了某些操作導緻該描述符變成未就緒狀态了,也就是說邊緣觸發隻在狀态由未就緒變為就緒時通知一次)。

  ET模式很大程度上減少了epoll事件的觸發次數,是以效率比LT模式高。

  epoll模型python demo如下:

def start(self):
        # Create a TCP/IP socket, and then bind and listen
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setblocking(False)
        server_address = ('', 8080)
        print "Starting up on %s port %s" % server_address
        server.bind(server_address)
        server.listen(5)
        message_queues = {}
        # The timeout value is represented in milliseconds, instead of seconds.
        timeout = 5000
        # Create a limit for the event
        READ_ONLY = (select.EPOLLIN)
        READ_WRITE = (READ_ONLY | select.EPOLLOUT)
        # Set up the epoll
        epoll = select.epoll()
        epoll.register(server.fileno(), READ_ONLY)
        # Map file descriptors to socket objects
        fd_to_socket = {server.fileno(): server, }
        while True:
            print "Waiting for the next event"
            events = epoll.poll(timeout)
            if len(events) == 0:
                print 'Time out'
                break
            print "*" * 20
            print len(events)
            print events
            print "*" * 20
            for fd, flag in events:
                s = fd_to_socket[fd]
                if flag & (select.EPOLLIN):
                    if s is server:
                        # A readable socket is ready to accept a connection
                        connection, client_address = s.accept()
                        print " Connection ", client_address
                        connection.setblocking(False)

                        fd_to_socket[connection.fileno()] = connection
                        epoll.register(connection, READ_ONLY)

                        # Give the connection a queue to send data
                        message_queues[connection] = Queue.Queue()
                    else:
                        data = s.recv(1024)
                        if data:
                            # A readable client socket has data
                            print "  received %s from %s " % (data, s.getpeername())
                            message_queues[s].put(data)
                            epoll.modify(s, READ_WRITE)
                        else:
                            # Close the connection
                            print "  closing", s.getpeername()
                            # Stop listening for input on the connection
                            epoll.unregister(s)
                            s.close()
                            del message_queues[s]
                elif flag & select.EPOLLHUP:
                    # A client that "hang up" , to be closed.
                    print " Closing ", s.getpeername(), "(HUP)"
                    epoll.unregister(s)
                    s.close()
                elif flag & select.EPOLLOUT:
                    # Socket is ready to send data , if there is any to send
                    try:
                        next_msg = message_queues[s].get_nowait()
                    except Queue.Empty:
                        # No messages waiting so stop checking
                        print s.getpeername(), " queue empty"
                        epoll.modify(s, READ_ONLY)
                    else:
                        print " sending %s to %s" % (next_msg, s.getpeername())
                        s.send(next_msg)
                elif flag & select.epollERR:
                    # Any events with epollR cause the server to close the
                    # socket
                    print "  exception on", s.getpeername()
                    epoll.unregister(s)
                    s.close()
                    del message_queues[s]      

3.4 IO複用小結

  沒有IO複用之前,用阻塞型IO,必須為每個建立的連接配接建立線程或線程,當面對大量連接配接時, 嚴重浪費系統資源,影響程序的響應效率,用非阻塞IO,需要輪詢測試socket集合是否已經讀寫就緒,在已經就緒和測試到就緒有一定的時延,資料得不到及時處理。利用IO複用, 同時可監控多個套接字描述符的狀态,而不用像阻塞型IO,每個套接字需要一個線程或程序處理,也不像非阻塞IO,存在處理時延,IO複用函數是阻塞函數,不用輪詢測試,有socket就緒或逾時才會傳回。

IO複用分為select、poll、epoll模型三種,select模型存在如下三個缺點:

  1) 每次都需要将fd_set拷貝到核心空間,當fd_set較大時開銷很大

  2) 每次都需要在核心中周遊fd加入到等待隊列,fd較多開銷較大

  3) select支援的檔案描述符太小,預設為1024

  poll模型不存在同時監聽的描述符大小限制,但是仍然存在缺點1和2。epoll模型克服了這三個缺點,epoll模型對于加入監聽的socket描述符,會将描述符和監聽的事件記在核心,無需像select和poll每次都需要将檔案描述符集拷貝到核心。在判斷是否有讀寫就緒時。當有讀寫事件就緒時,核心會調用函數将就緒的fd加入就緒連結清單,是以epoll模型隻需讀就緒連結清單,而不需要将所有fd周遊一遍,性能會比select和poll模型高。

四 信号驅動和異步IO

4.1 信号驅動IO

  信号驅動式IO模型原理如下圖4.1:

網絡IO-阻塞、非阻塞、IO複用、異步

圖4.1 信号驅動IO

  Signal Driven I/O 的工作原理就是使用者程序首先和 kernel 之間建立信号的通知機制,即使用者程序告訴 kernel,如果 kernel 中資料準備好了,就通過 SIGIO 信号通知程序。然後使用者空間的程序就會調用 read 系統調用将準備好的資料從 kernel 拷貝到使用者空間。

  但是這種 I/O 模型存在一個非常重大的缺陷問題:SIGIO 這種信号對于每個程序來說隻有一個!如果使該信号對程序中的兩個描述符(這兩個檔案描述符都等待着 I/O 操作)都起作用,那麼程序在接到此信号後就無法判别是哪一個檔案描述符準備好了。是以 Signal Driven I/O 模型在現實中用的非常少。

4.2 異步IO

  異步IO模型原理如下圖:

網絡IO-阻塞、非阻塞、IO複用、異步

圖4.2 異步IO

  在異步IO中,使用者程序調用aio_read立即傳回,直到核心将資料拷貝到程序緩沖區,然後通知程序完成,整個過程完全沒阻塞,連recvfrom都不用使用者程序調用。其它的IO模型都屬于同步IO。

在異步非阻塞 I/O 中,可以同時發起多個傳輸操作。這需要每個傳輸操作都有惟一的上下文,這樣才能在它們完成時區分到底是哪個傳輸操作完成了。在 AIO 中,這是一個 aiocb(AIO I/O Control Block)結構。這個結構包含了有關傳輸的所有資訊,包括為資料準備的使用者緩沖區。在産生 I/O (稱為完成)通知時,aiocb 結構就被用來惟一辨別所完成的 I/O 操作。這個 API 的展示顯示了如何使用它[10]。

  aiocb結構如下:

  

struct aiocb {
int aio_fildes; // File Descriptor
int aio_lio_opcode; // Valid only for lio_listio (r/w/nop)
volatile void *aio_buf; // Data Buffer
size_t aio_nbytes; // Number of Bytes in Data Buffer
struct sigevent aio_sigevent; // Notification Structure

/* Internal fields */
...

};      

  sigevent 結構告訴 AIO 在 I/O 操作完成時應該執行什麼操作。Aio api如下:

  1) int aio_read( struct aiocb *aiocbp ) 請求異步讀操作

  2) aio_error 檢查異步請求的狀态

  3) aio_return 獲得完成的異步請求的傳回狀态

  4) aio_write 請求異步寫操作

  5) aio_suspend 挂起調用程序,直到一個或多個異步請求已經完成(或失敗)

  6) aio_cancel 取消異步 I/O 請求

  7) lio_listio 發起一系列 I/O 操作

  為了便于了解,這裡使用c語言,使用 aio_read 進行異步讀操作c執行個體如下:

//使用aio api讀執行個體
#include <aio.h>

...

int fd, ret;
struct aiocb my_aiocb;

fd = open( "file.txt", O_RDONLY );
if (fd < 0) perror("open");

/* Zero out the aiocb structure (recommended) */
bzero( (char *)&my_aiocb, sizeof(struct aiocb) );

/* Allocate a data buffer for the aiocb request */
my_aiocb.aio_buf = malloc(BUFSIZE+1);// 清空了 aiocb 結構,配置設定一個資料緩沖區
if (!my_aiocb.aio_buf) perror("malloc");

/* Initialize the necessary fields in the aiocb */
my_aiocb.aio_fildes = fd; //檔案描述符
my_aiocb.aio_nbytes = BUFSIZE;//緩沖區大小
my_aiocb.aio_offset = 0;// // 将 aio_offset 設定成 0(該檔案中的第一個偏移量)

ret = aio_read( &my_aiocb );//發起異步讀請求
if (ret < 0) perror("aio_read");

while ( aio_error( &my_aiocb ) == EINPROGRESS ) ;//檢查異步請求是否完成

if ((ret = aio_return( &my_iocb )) > 0) {//所傳輸的位元組數,如果發生錯誤,傳回值就為 -1
/* got ret bytes on the read */
} else {
/* read failed, consult errno */
}      

  當異步請求完成時,核心有兩種方式通知程序,一種是通過信号,另一種是調用回調函數。

  使用信号作為AIO通知demo如下,應用程式對指定信号注冊信号處理函數, 在産生指定的信号時就會調用這個處理程式。并指定AIO操作完成時,由核心發出指定信号,将aiocb作為信号的上下文,用來分辨多個IO請求。

  

AIO完成通知-信号
void setup_io( ... )
{
int fd;
struct sigaction sig_act;
struct aiocb my_aiocb;

...

/* Set up the signal handler */
sigemptyset(&sig_act.sa_mask);
sig_act.sa_flags = SA_SIGINFO;
sig_act.sa_sigaction = aio_completion_handler;


/* Set up the AIO request */
bzero( (char *)&my_aiocb, sizeof(struct aiocb) );
my_aiocb.aio_fildes = fd;
my_aiocb.aio_buf = malloc(BUF_SIZE+1);
my_aiocb.aio_nbytes = BUF_SIZE;
my_aiocb.aio_offset = next_offset;

/* Link the AIO request with the Signal Handler */
my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;//指定信号作為通知方法
my_aiocb.aio_sigevent.sigev_signo = SIGIO;
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;

/* Map the Signal to the Signal Handler */
ret = sigaction( SIGIO, &sig_act, NULL );

...

ret = aio_read( &my_aiocb );

}


void aio_completion_handler( int signo, siginfo_t *info, void *context )
{
struct aiocb *req;


/* Ensure it's our signal */
if (info->si_signo == SIGIO) {

req = (struct aiocb *)info->si_value.sival_ptr;

/* Did the request complete? */
if (aio_error( req ) == 0) {

/* Request completed successfully, get the return status */
ret = aio_return( req );

}

}

return;
}      

  使用回調函數作為異步請求通知demo如下, 這種機制不會為通知而産生一個信号,而是會調用使用者空間的一個函數來實作通知功能.

  

//AIO完成通知-回調函數
void setup_io( ... )
{
int fd;
struct aiocb my_aiocb;

...

/* Set up the AIO request */
bzero( (char *)&my_aiocb, sizeof(struct aiocb) );
my_aiocb.aio_fildes = fd;
my_aiocb.aio_buf = malloc(BUF_SIZE+1);
my_aiocb.aio_nbytes = BUF_SIZE;
my_aiocb.aio_offset = next_offset;

/* Link the AIO request with a thread callback */
my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;// SIGEV_THREAD 指定線程回調函數來作為通知方法
my_aiocb.aio_sigevent.notify_function = aio_completion_handler;
my_aiocb.aio_sigevent.notify_attributes = NULL;
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;

...

ret = aio_read( &my_aiocb );

}


void aio_completion_handler( sigval_t sigval )
{
struct aiocb *req;

req = (struct aiocb *)sigval.sival_ptr;

/* Did the request complete? */
if (aio_error( req ) == 0) {

/* Request completed successfully, get the return status */
ret = aio_return( req );

}

return;
}      

總結

  網絡IO模型包括阻塞、非阻塞、IO複用、信号驅動IO和異步IO五種類型。阻塞IO無法應對多個連接配接的情形,單個socket操作阻塞會導緻服務端無法接受其他連接配接,雖然可以用多線程、多程序的方式,将不同的連接配接放在不同的線程中和用戶端互動,并利用線程池和連接配接池進行優化。但建立程序和線程會占用系統資源,當面對大規模連接配接時,系統資源浪費嚴重,系統響應效率不高。

非阻塞模型當socket讀寫操作未就緒時會立即傳回,而不會阻塞等待,可以利用輪詢的方式來進行讀寫操作,但當核心收到資料報到應用程序感覺并處理會有時延。

利用IO複用,将監控socket讀寫操作是否就緒和進行讀寫操作分開,且IO複用可監控socket集合,IO複用包含select、poll、epoll三種模型。

  select模型存在如下三種缺點:

  1) 每次都需要将fd_set拷貝到核心空間,當fd_set較大時開銷很大

  2) 每次都需要在核心中周遊fd加入到等待隊列,fd較多開銷較大

  3) select支援的檔案描述符太小,預設為1024。

  poll模型可同時監控的socket沒有上線限制,取決于系統資源,但poll模型不能避免缺點1和2。epoll模型可以避免select和poll模型的缺點。select,poll每次調用都要把fd集合從使用者态往核心态拷貝一次,并且要把current程序往裝置等待隊列中挂一次,而epoll隻要一次拷貝,而且把current程序往等待隊列上挂也隻挂一次。這也能節省不少的開銷。select,poll内部實作需要自己不斷輪詢所有fd集合,直到裝置就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要調用epoll_wait不斷輪詢就緒連結清單,期間也可能多次睡眠和喚醒交替,但是它是裝置就緒時,調用回調函數,把就緒fd放入就緒連結清單中,并喚醒在epoll_wait中進入睡眠的程序。雖然都要睡眠和交替,但是select和poll在“醒着”的時候要周遊整個fd集合,而epoll在“醒着”的時候隻要判斷一下就緒連結清單是否為空就行了,這節省了大量的CPU時間。這就是回調機制帶來的性能提升。

  信号驅動式IO,當核心資料準備好時,發出信号,調用程序提前注冊好的信号處理函數,但當存在多個socket操作時,無法厘清是哪個socket準備好,是以實際應用中較少。

無論是阻塞IO、非阻塞IO、IO複用還是信号驅動IO模型,都是同步IO模型。其要麼是監控socket就緒,要麼是從核心拷貝資料到程序緩沖區,至少其中一個是阻塞的,不會立即傳回。異步IO模型發起讀寫操作後,立即傳回,可以接着進行其它操作,核心完成将資料拷貝到應用程序後,通過信号或者回調函數通知程序。

參考文獻

[1]. 阻塞IO(blocking IO). https://www.chenxie.net/archives/1956.html

[2]. Unix網絡程式設計卷1.124~125.

[3]. linux select函數詳解. https://blog.csdn.net/lingfengtengfei/article/details/12392449

[4]. select,poll,epoll實作分析—結合核心源代碼. https://www.linuxidc.com/Linux/2012-05/59873.htm

[5]. Python網絡程式設計中的select 和 poll I/O複用的簡單使用. https://www.cnblogs.com/coser/archive/2012/01/06/2315216.html

[6]. socket選項總結(setsocketopt). https://blog.csdn.net/c1520006273/article/details/50420408

[7]. Linux下I/O多路複用系統調用(select, poll, epoll)介紹. https://zhuanlan.zhihu.com/p/22834126

[8]. IO多路複用:select、poll、epoll示例. https://blog.csdn.net/lisonglisonglisong/article/details/51328062

[9]. Linux I/O 模型. https://woshijpf.github.io/linux/2017/07/10/Linux-IO%E6%A8%A1%E5%9E%8B.html.

[10]. 使用異步 I/O 大大提高應用程式的性能. https://www.ibm.com/developerworks/cn/linux/l-async/

繼續閱讀