天天看點

python eventlet并發原理分析

最近在學習eventlet這個強悍的東東,看到我同僚的一些整理。故貼出來,大家一起分享~

motivation

114.113.199.11伺服器上nova服務中基于python eventlet實作的定時任務(periodic_task)和 心跳任務(report_state)都是eventlet的一個greenthread執行個體.

目前伺服器上出現了nova定時任務中某些任務執行時間過長而導緻心跳任務不能準時運作的問題.

如果eventlet是一個完全意義上的類似線程/程序的并發庫的話, 不應該出現這個問題, 需要研究 eventlet的并發實作, 了解它的并發實作原理, 避免以後出現類似的問題.

分析

經過閱讀eventlet源代碼, 可以知道eventlet主要依賴另外2個python package:

  • greenlet
  • python-epoll (或其他類似的異步IO庫, 如poll/select等)

主要做了3個工作:

  • 封裝greenlet
  • 封裝epoll
  • 改寫python标準庫中相關的module, 以便支援epoll

epoll

epoll是linux實作的一個基于事件的異步IO庫, 在之前類似的異步IO庫poll上改進而來.

下面兩個例子會示範如何用epoll将阻塞的IO操作用epoll改寫為異步非阻塞. (取自官方文檔)

blocking IO

import socket

    EOL1 = b'\n\n'
    EOL2 = b'\n\r\n'
    response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
    response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
    response += b'Hello, world!'

    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serversocket.bind(('0.0.0.0', 8080))
    serversocket.listen(1)

    try:
        while True:
            connectiontoclient, address = serversocket.accept()
            request = b''
            while EOL1 not in request and EOL2 not in request:
                request += connectiontoclient.recv(1024)
            print('-'*40 + '\n' + request.decode()[:-2])
            connectiontoclient.send(response)
            connectiontoclient.close()
    finally:
        serversocket.close()
      

這個例子實作了一個簡單的監聽在8080端口的web伺服器. 通過一個死循環不停的接收來自8080端口 的連接配接, 并傳回結果.

需要注意的是程式會在

connectiontoclient, address = serversocket.accept()
           

這一行block住, 直到擷取到新的連接配接, 程式才會繼續往下運作.

同時, 這個程式同一個時間内隻能處理一個連接配接, 如果有很多使用者同時通路8080端口, 必須要按先後 順序依次處理這些連接配接, 前面一個連接配接成功傳回後, 才會處理後面的連接配接.

下面的例子将用epoll将這個簡單的web伺服器改寫為異步的方式

non-blocking IO by using epoll

import socket, select

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
    connections = {}; requests = {}; responses = {}
    while True:
        events = epoll.poll(1)
        for fileno, event in events:
            if fileno == serversocket.fileno():
                connection, address = serversocket.accept()
                connection.setblocking(0)
                epoll.register(connection.fileno(), select.EPOLLIN)
                connections[connection.fileno()] = connection
                requests[connection.fileno()] = b''
                responses[connection.fileno()] = response
            elif event & select.EPOLLIN:
                requests[fileno] += connections[fileno].recv(1024)
                if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
                    epoll.modify(fileno, select.EPOLLOUT)
                    print('-'*40 + '\n' + requests[fileno].decode()[:-2])
            elif event & select.EPOLLOUT:
                byteswritten = connections[fileno].send(responses[fileno])
                responses[fileno] = responses[fileno][byteswritten:]
                if len(responses[fileno]) == 0:
                    epoll.modify(fileno, 0)
                    connections[fileno].shutdown(socket.SHUT_RDWR)
            elif event & select.EPOLLHUP:
                epoll.unregister(fileno)
                connections[fileno].close()
                del connections[fileno]
finally:
    epoll.unregister(serversocket.fileno())
    epoll.close()
    serversocket.close()
      

可以看到, 例子中首先使用

serversocket.setblocking(0)

将socket設為異步的模式, 然後 用

select.epoll()

建立了一個epoll, 接着用

epoll.register(serversocket.fileno(), select.EPOLLIN)

 将該socket上的IO輸入事件(

select.EPOLLIN

)注冊到epoll裡. 這樣做了以後, 就可以将 上面例子中會在

socket.accept()

這步阻塞的Main Loop改寫為基于異步IO事件的epoll循環了.

events = epoll.poll(1)
           

簡單的說, 如果有很多使用者同時連接配接到8080端口, 這個程式會同時accept()所有的socket連接配接, 然後通過這行代碼将發生IO事件socket放到events中, 并在後面循環中處理. 沒有發生IO事件的 socket不會在loop中做處理. 這樣使用epoll就實作了一個簡單的并發web伺服器.

注意, 這裡提到的并發, 和我們通常所了解線程/程序的并發并不太一樣, 更準确的說, 是 IO多路複用 .

greenlet

greentlet是python中實作我們所謂的"Coroutine(協程)"的一個基礎庫.

看了下面的例子就明白了.

from greenlet import greenlet

    def test1():
        print 12
        gr2.switch()
        print 34

    def test2():
        print 56
        gr1.switch()
        print 78

    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()
      

輸出

12
56
34
           

程式先分别為兩個函數定義了2個greenlet: gr1和gr2.

gr1.switch()

顯式切換到gr1上執行, gr1中輸出"12"後

gr2.switch()

顯式切換到gr2上執行 輸出56, 又

gr1.switch()

顯式切換到gr1上, 輸出34. test1()執行結束, gr1 die. 于是 test2()裡的78不會輸出.

可以發現greenlet僅僅是實作了一個最簡單的"coroutine", 而eventlet中的greenthread是在 greenlet的基礎上封裝了一些更high-level的功能, 比如greenlet的排程等.

eventlet.green

從epoll的運作機制可以看出, 要使用異步IO, 必須要将相關IO操作改寫成non-blocking的方式. 但是我們用

eventlet.spawn()

的函數, 并沒有針對epoll做任何改寫, 那eventlet是怎麼實作 異步IO的呢?

這也是eventlet這個package最兇殘的地方, 它自己重寫了python标準庫中IO相關的操作, 将它們 改寫成支援epoll的模式, 放在eventlet.green中.

比如說, 

socket.accept()

被改成了這樣

def accept(self):
    if self.act_non_blocking:
        return self.fd.accept()
    fd = self.fd
    while True:
        res = socket_accept(fd)
        if res is not None:
            client, addr = res
            set_nonblocking(client)
            return type(self)(client), addr
        trampoline(fd, read=True, timeout=self.gettimeout(),
                       timeout_exc=socket.timeout("timed out"))
           

然後在eventlet.spawn()的時候, 通過 一些高階魔法和"huge hack", 将這些改寫過得子產品"patch"到spawn出的greenthread上, 進而 實作epoll的IO多路複用, 相當兇殘.

eventlet并發機制分析

前面說了這麼多, 這裡可以分析一下eventlet的并發機制了.

eventlet的結構如下圖所示

_______________________________________
| python process                        |
|   _________________________________   |
|  | python thread                   |  |
|  |   _____   ___________________   |  |
|  |  | hub | | pool              |  |  |
|  |  |_____| |   _____________   |  |  |
|  |          |  | greenthread |  |  |  |
|  |          |  |_____________|  |  |  |
|  |          |   _____________   |  |  |
|  |          |  | greenthread |  |  |  |
|  |          |  |_____________|  |  |  |
|  |          |   _____________   |  |  |
|  |          |  | greenthread |  |  |  |
|  |          |  |_____________|  |  |  |
|  |          |                   |  |  |
|  |          |        ...        |  |  |
|  |          |___________________|  |  |
|  |                                 |  |
|  |_________________________________|  |
|                                       |
|   _________________________________   |
|  | python thread                   |  |
|  |_________________________________|  |
|   _________________________________   |
|  | python thread                   |  |
|  |_________________________________|  |
|                                       |
|                 ...                   |
|_______________________________________|
           
python eventlet并發原理分析

其中的hub和greenthread分别對應eventlet.hubs.hub和eventlet.greenthread, 本質都是 一個greenlet的執行個體.

hub中封裝前面提到的epoll, epoll的事件循環是由

hub.run()

這個方法裡實作. 每當使用者調用 eventlet.spawn(), 就會在目前python線程的pool裡産生一個新的greenthread. 由于greenthread 裡的IO相關的python标準庫被改寫成non-blocking的模式(參考上面的

socket.accept()

).

每當greenthread裡做IO相關的操作時, 最終都會傳回到hub中的epoll循環, 然後根據epoll中的 IO事件, 調用響應的函數. 具體如下面所示.

greenthread.sleep()

, 實際上也是将CPU控制權交給hub, 然後由hub排程下一個需要運作的 greenthread.

# in eventlet.hubs.poll.Hub

    def wait(self, seconds=None):
        readers = self.listeners[READ]
        writers = self.listeners[WRITE]

        if not readers and not writers:
            if seconds:
                sleep(seconds)
            return
        try:
            presult = self.poll.poll(int(seconds * self.WAIT_MULTIPLIER))
        except select.error, e:
            if get_errno(e) == errno.EINTR:
                return
            raise
        SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS

        for fileno, event in presult:
            try:
                if event & READ_MASK:
                    readers.get(fileno, noop).cb(fileno)
                if event & WRITE_MASK:
                    writers.get(fileno, noop).cb(fileno)
                if event & select.POLLNVAL:
                    self.remove_descriptor(fileno)
                    continue
                if event & EXC_MASK:
                    readers.get(fileno, noop).cb(fileno)
                    writers.get(fileno, noop).cb(fileno)
            except SYSTEM_EXCEPTIONS:
                raise
            except:
                self.squelch_exception(fileno, sys.exc_info())
                clear_sys_exc_info()
      

總結

eventlet實作的并發和我們了解的通常意義上類似線程/程序的并發是不同的, eventlet實作的"并發" 更準确的講, 是 IO多路複用 . 隻有在被

eventlet.spawn()

的函數中存在可以 支援異步IO 相關的操作, 比如說讀寫socket/named pipe等時, 才能不用對被調用的函數做任何修改而實作 所謂的"并發".

如果被

eventlet.spawn()

的函數中存在大量的CPU計算或者讀寫普通檔案, eventlet是無法對其 實作并發操作的. 如果想要在這樣的greenthread間實作類似"并發"運作的效果, 需要手動的在函數 中插入

greenthread.sleep()

.