天天看點

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

導言

Go 基于 I/O multiplexing 和 goroutine 建構了一個簡潔而高性能的原生網絡模型(基于 Go 的I/O 多路複用

netpoll

),提供了

goroutine-per-connection

這樣簡單的網絡程式設計模式。在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯,極大地降低了開發者編寫網絡應用時的心智負擔,且借助于 Go runtime scheduler 對 goroutines 的高效排程,這個原生網絡模型不論從适用性還是性能上都足以滿足絕大部分的應用場景。

然而,在工程性上能做到如此高的普适性和相容性,最終暴露給開發者提供接口/模式如此簡潔,其底層必然是基于非常複雜的封裝,做了很多取舍,也有可能放棄了一些『極緻』的設計和理念。事實上

netpoll

底層就是基于 epoll/kqueue/iocp 這些系統調用來做封裝的,最終暴露出

goroutine-per-connection

這樣的極簡的開發模式給使用者。

Go netpoll 在不同的作業系統,其底層使用的 I/O 多路複用技術也不一樣,可以從 Go 源碼目錄結構和對應代碼檔案了解 Go 在不同平台下的網絡 I/O 模式的實作。比如,在 Linux 系統下基于 epoll,freeBSD 系統下基于 kqueue,以及 Windows 系統下基于 iocp。

本文将基于 linux 平台來解析 Go netpoll 之 I/O 多路複用的底層是如何基于 epoll 封裝實作的,從源碼層層推進,全面而深度地解析 Go netpoll 的設計理念和實作原理,以及 Go 是如何利用

netpoll

來建構它的原生網絡模型的。主要涉及到的一些概念:I/O 模式、使用者/核心空間、epoll、linux 源碼、goroutine scheduler 等等,我會盡量簡單地講解,如果有對相關概念不熟悉的同學,還是希望能提前熟悉一下。

使用者空間與核心空間

現在作業系統都是采用虛拟存儲器,那麼對 32 位作業系統而言,它的尋址空間(虛拟存儲空間)為 4G(2 的 32 次方)。作業系統的核心是核心,獨立于普通的應用程式,可以通路受保護的記憶體空間,也有通路底層硬體裝置的所有權限。為了保證使用者程序不能直接操作核心(kernel),保證核心的安全,操心系統将虛拟空間劃分為兩部分,一部分為核心空間,一部分為使用者空間。針對 linux 作業系統而言,将最高的 1G 位元組(從虛拟位址 0xC0000000 到 0xFFFFFFFF),供核心使用,稱為核心空間,而将較低的 3G 位元組(從虛拟位址 0x00000000 到0xBFFFFFFF),供各個程序使用,稱為使用者空間。

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

I/O 多路複用

在神作《UNIX 網絡程式設計》裡,總結歸納了 5 種 I/O 模型,包括同步和異步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路複用 (I/O multiplexing)
  • 信号驅動 I/O (Signal driven I/O)
  • 異步 I/O (Asynchronous I/O)

作業系統上的 I/O 是使用者空間和核心空間的資料互動,是以 I/O 操作通常包含以下兩個步驟:

  1. 等待網絡資料到達網卡(讀就緒)/等待網卡可寫(寫就緒) –> 讀取/寫入到核心緩沖區
  2. 從核心緩沖區複制資料 –> 使用者空間(讀)/從使用者空間複制資料 -> 核心緩沖區(寫)

而判定一個 I/O 模型是同步還是異步,主要看第二步:資料在使用者和核心空間之間複制的時候是不是會阻塞目前程序,如果會,則是同步 I/O,否則,就是異步 I/O。基于這個原則,這 5 種 I/O 模型中隻有一種異步 I/O 模型:Asynchronous I/O,其餘都是同步 I/O 模型。

這 5 種 I/O 模型的對比如下:

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

所謂 I/O 多路複用指的就是 select/poll/epoll 這一系列的多路選擇器:支援單一線程同時監聽多個檔案描述符(I/O事件),阻塞等待,并在其中某個檔案描述符可讀寫時收到通知。 I/O 複用其實複用的不是 I/O 連接配接,而是複用線程,讓一個 thread of control 能夠處理多個連接配接(I/O 事件)。

select & poll

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 緊密結合的四個宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);           

select 是 epoll 之前 linux 使用的 I/O 事件驅動技術。

了解 select 的關鍵在于了解 fd_set,為說明友善,取 fd_set 長度為 1 位元組,fd_set 中的每一 bit 可以對應一個檔案描述符 fd,則 1 位元組長的 fd_set 最大可以對應 8 個 fd。select 的調用過程如下:

  1. 執行 FD_ZERO(&set), 則 set 用位表示是

    0000,0000

  2. 若 fd=5, 執行 FD_SET(fd, &set); 後 set 變為 0001,0000(第5位置為1)
  3. 再加入 fd=2, fd=1,則 set 變為

    0001,0011

  4. 執行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都發生可讀事件,則 select 傳回,此時 set 變為

    0000,0011

    (注意:沒有事件發生的 fd=5 被清空)

基于上面的調用過程,可以得出 select 的特點:

  • 可監控的檔案描述符個數取決于 sizeof(fd_set) 的值。假設伺服器上 sizeof(fd_set)=512,每 bit 表示一個檔案描述符,則伺服器上支援的最大檔案描述符是 512*8=4096。fd_set的大小調整可參考 【原創】技術系列之 網絡模型(二) 中的模型 2,可以有效突破 select 可監控的檔案描述符上限
  • 将 fd 加入 select 監控集的同時,還要再使用一個資料結構 array 儲存放到 select 監控集中的 fd,一是用于在 select 傳回後,array 作為源資料和 fd_set 進行 FD_ISSET 判斷。二是 select 傳回後會把以前加入的但并無事件發生的 fd 清空,則每次開始 select 前都要重新從 array 取得 fd 逐一加入(FD_ZERO最先),掃描 array 的同時取得 fd 最大值 maxfd,用于 select 的第一個參數
  • 可見 select 模型必須在 select 前循環 array(加 fd,取 maxfd),select 傳回後循環 array(FD_ISSET判斷是否有事件發生)

是以,select 有如下的缺點:

  1. 最大并發數限制:使用 32 個整數的 32 位,即 32*32=1024 來辨別 fd,雖然可修改,但是有以下第 2, 3 點的瓶頸
  2. 每次調用 select,都需要把 fd 集合從使用者态拷貝到核心态,這個開銷在 fd 很多時會很大
  3. 性能衰減嚴重:每次 kernel 都需要線性掃描整個 fd_set,是以随着監控的描述符 fd 數量增長,其 I/O 性能會線性下降

poll 的實作和 select 非常相似,隻是描述 fd 集合的方式不同,poll 使用 pollfd 結構而不是 select 的 fd_set 結構,poll 解決了最大檔案描述符數量限制的問題,但是同樣需要從使用者态拷貝所有的 fd 到核心态,也需要線性周遊所有的 fd 集合,是以它和 select 隻是實作細節上的區分,并沒有本質上的差別。

epoll

epoll 是 linux kernel 2.6 之後引入的新 I/O 事件驅動技術,I/O 多路複用的核心設計是 1 個線程處理所有連接配接的

等待消息準備好

I/O 事件,這一點上 epoll 和 select&poll 是大同小異的。但 select&poll 預估錯誤了一件事,當數十萬并發連接配接存在時,可能每一毫秒隻有數百個活躍的連接配接,同時其餘數十萬連接配接在這一毫秒是非活躍的。select&poll 的使用方法是這樣的:

傳回的活躍連接配接 == select(全部待監控的連接配接)

什麼時候會調用 select&poll 呢?在你認為需要找出有封包到達的活躍連接配接時,就應該調用。是以,select&poll 在高并發時是會被頻繁調用的。這樣,這個頻繁調用的方法就很有必要看看它是否有效率,因為,它的輕微效率損失都會被

高頻

二字所放大。它有效率損失嗎?顯而易見,全部待監控連接配接是數以十萬計的,傳回的隻是數百個活躍連接配接,這本身就是無效率的表現。被放大後就會發現,處理并發上萬個連接配接時,select&poll 就完全力不從心了。這個時候就該 epoll 上場了,epoll 通過一些新的設計和優化,基本上解決了 select&poll 的問題。

epoll 的 API 非常簡潔,涉及到的隻有 3 個系統調用:

#include <sys/epoll.h>  
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);           

其中,epoll_create 建立一個 epoll 執行個體并傳回 epollfd;epoll_ctl 注冊 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 執行個體上;epoll_wait 則是阻塞監聽 epoll 執行個體上所有的 file descriptor 的 I/O 事件,它接收一個使用者空間上的一塊記憶體位址 (events 數組),kernel 會在有 I/O 事件發生的時候把檔案描述符清單複制到這塊記憶體位址上,然後 epoll_wait 解除阻塞并傳回,最後使用者空間上的程式就可以對相應的 fd 進行讀寫了:

#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);           

epoll 的工作原理如下:

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

與 select&poll 相比,epoll 厘清了高頻調用和低頻調用。例如,epoll_ctl 相對來說就是不太頻繁被調用的,而 epoll_wait 則是非常頻繁被調用的。是以 epoll 利用 epoll_ctl 來插入或者删除一個 fd,實作使用者态到核心态的資料拷貝,這確定了每一個 fd 在其生命周期隻需要被拷貝一次,而不是每次調用 epoll_wait 的時候都拷貝一次。 epoll_wait 則被設計成幾乎沒有入參的調用,相比 select&poll 需要把全部監聽的 fd 集合從使用者态拷貝至核心态的做法,epoll 的效率就高出了一大截。

在實作上 epoll 采用紅黑樹來存儲所有監聽的 fd,而紅黑樹本身插入和删除性能比較穩定,時間複雜度 O(logN)。通過 epoll_ctl 函數添加進來的 fd 都會被放在紅黑樹的某個節點内,是以,重複添加是沒有用的。當把 fd 添加進來的時候時候會完成關鍵的一步:該 fd 都會與相應的裝置(網卡)驅動程式建立回調關系,也就是在核心中斷處理程式為它注冊一個回調函數,在 fd 相應的事件觸發(中斷)之後(裝置就緒了),核心就會調用這個回調函數,該回調函數在核心中被稱為:

ep_poll_callback

,這個回調函數其實就是把這個 fd 添加到 rdllist 這個雙向連結清單(就緒連結清單)中。epoll_wait 實際上就是去檢查 rdlist 雙向連結清單中是否有就緒的 fd,當 rdlist 為空(無就緒fd)時挂起目前程序,直到 rdlist 非空時程序才被喚醒并傳回。

相比于 select&poll 調用時會将全部監聽的 fd 從使用者态空間拷貝至核心态空間并線性掃描一遍找出就緒的 fd 再傳回到使用者态,epoll_wait 則是直接傳回已就緒 fd,是以 epoll 的 I/O 性能不會像 select&poll 那樣随着監聽的 fd 數量增加而出現線性衰減,是一個非常高效的 I/O 事件驅動技術。

由于使用 epoll 的 I/O 多路複用需要使用者程序自己負責 I/O 讀寫,從使用者程序的角度看,讀寫過程是阻塞的,是以 select&poll&epoll 本質上都是同步 I/O 模型,而像 Windows 的 IOCP 這一類的異步 I/O,隻需要在調用 WSARecv 或 WSASend 方法讀寫資料的時候把使用者空間的記憶體 buffer 送出給 kernel,kernel 負責資料在使用者空間和核心空間拷貝,完成之後就會通知使用者程序,整個過程不需要使用者程序參與,是以是真正的異步 I/O。

延伸

另外,我看到有些文章說 epoll 之是以性能高是因為利用了 linux 的 mmap 記憶體映射讓核心和使用者程序共享了一片實體記憶體,用來存放就緒 fd 清單和它們的資料 buffer,是以使用者程序在

epoll_wait

傳回之後使用者程序就可以直接從共享記憶體那裡讀取/寫入資料了,這讓我很疑惑,因為首先看

epoll_wait

的函數聲明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);           

第二個參數:就緒事件清單,是需要在使用者空間配置設定記憶體然後再傳給

epoll_wait

的,如果核心會用 mmap 設定共享記憶體,直接傳遞一個指針進去就行了,根本不需要在使用者态配置設定記憶體,多此一舉。其次,核心和使用者程序通過 mmap 共享記憶體是一件極度危險的事情,核心無法确定這塊共享記憶體什麼時候會被回收,而且這樣也會賦予使用者程序直接操作核心資料的權限和入口,非常容易出現大的系統漏洞,是以一般極少會這麼做。是以我很懷疑 epoll 是不是真的在 linux kernel 裡用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源碼:

/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
             int maxevents, int timeout)
{
    // ...
  
    /* Time to fish for events ... */
    error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入參時設定 timeout == 0, 那麼直接通過 ep_events_available 判斷目前是否有使用者感興趣的事件發生,如果有則通過 ep_send_events 進行處理
// 如果設定 timeout > 0,并且目前沒有使用者關注的事件發生,則進行休眠,并添加到 ep->wq 等待隊列的頭部;對等待事件描述符設定 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件喚醒後會重新檢查是否有關注事件,如果對應的事件已經被搶走,那麼 ep_poll 會繼續休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
    // ...
  
    send_events:
    /*
     * Try to transfer events to user space. In case we get 0 events and
     * there's still timeout left over, we go trying again in search of
     * more luck.
     */
  
    // 如果一切正常, 有 event 發生, 就開始準備資料 copy 給使用者空間了
    // 如果有就緒的事件發生,那麼就調用 ep_send_events 将就緒的事件 copy 到使用者态記憶體中,
    // 然後傳回到使用者态,否則判斷是否逾時,如果沒有逾時就繼續等待就緒事件發生,如果逾時就傳回使用者态。
    // 從 ep_poll 函數的實作可以看到,如果有就緒事件發生,則調用 ep_send_events 函數做進一步處理
    if (!res && eavail &&
            !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
        goto fetch_events;
  
    // ...
}

// ep_send_events 函數是用來向使用者空間拷貝就緒 fd 清單的,它将使用者傳入的就緒 fd 清單記憶體簡單封裝到
// ep_send_events_data 結構中,然後調用 ep_scan_ready_list 将就緒隊列中的事件寫入使用者空間的記憶體;
// 使用者程序就可以通路到這些資料進行處理
static int ep_send_events(struct eventpoll *ep,
                struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;

    esed.maxevents = maxevents;
    esed.events = events;
    // 調用 ep_scan_ready_list 函數檢查 epoll 執行個體 eventpoll 中的 rdllist 就緒連結清單,
    // 并注冊一個回調函數 ep_send_events_proc,如果有就緒 fd,則調用 ep_send_events_proc 進行處理
    ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
    return esed.res;
}

// 調用 ep_scan_ready_list 的時候會傳遞指向 ep_send_events_proc 函數的函數指針作為回調函數,
// 一旦有就緒 fd,就會調用 ep_send_events_proc 函數
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
    // ...
  
    /*
     * If the event mask intersect the caller-requested one,
     * deliver the event to userspace. Again, ep_scan_ready_list()
     * is holding ep->mtx, so no operations coming from userspace
     * can change the item.
     */
    revents = ep_item_poll(epi, &pt, 1);
    // 如果 revents 為 0,說明沒有就緒的事件,跳過,否則就将就緒事件拷貝到使用者态記憶體中
    if (!revents)
        continue;
    // 将目前就緒的事件和使用者程序傳入的資料都通過 __put_user 拷貝回使用者空間,
  // 也就是調用 epoll_wait 之時使用者程序傳入的 fd 清單的記憶體
    if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
        list_add(&epi->rdllink, head);
        ep_pm_stay_awake(epi);
        if (!esed->res)
            esed->res = -EFAULT;
        return 0;
    }
  
    // ...
}           

do_epoll_wait

開始層層跳轉,我們可以很清楚地看到最後核心是通過

__put_user

函數把就緒 fd 清單和事件傳回到使用者空間,而

__put_user

正是核心用來拷貝資料到使用者空間的标準函數。此外,我并沒有在 linux kernel 的源碼中和 epoll 相關的代碼裡找到 mmap 系統調用做記憶體映射的邏輯,是以基本可以得出結論:epoll 在 linux kernel 裡并沒有使用 mmap 來做使用者空間和核心空間的記憶體共享,是以那些說 epoll 使用了 mmap 的文章都是誤解。

Non-blocking I/O

什麼叫非阻塞 I/O,顧名思義就是:所有 I/O 操作都是立刻傳回而不會阻塞目前使用者程序。I/O 多路複用通常情況下需要和非阻塞 I/O 搭配使用,否則可能會産生意想不到的問題。比如,epoll 的 ET(邊緣觸發) 模式下,如果不使用非阻塞 I/O,有極大的機率會導緻阻塞 event-loop 線程,進而降低吞吐量,甚至導緻 bug。

Linux 下,我們可以通過

fcntl

系統調用來設定

O_NONBLOCK

标志位,進而把 socket 設定成 non-blocking。當對一個 non-blocking socket 執行讀操作時,流程是這個樣子:

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

當使用者程序發出 read 操作時,如果 kernel 中的資料還沒有準備好,那麼它并不會 block 使用者程序,而是立刻傳回一個 EAGAIN error。從使用者程序角度講 ,它發起一個 read 操作後,并不需要等待,而是馬上就得到了一個結果。使用者程序判斷結果是一個 error 時,它就知道資料還沒有準備好,于是它可以再次發送 read 操作。一旦 kernel 中的資料準備好了,并且又再次收到了使用者程序的 system call,那麼它馬上就将資料拷貝到了使用者記憶體,然後傳回。

是以,non-blocking I/O 的特點是使用者程序需要不斷的主動詢問 kernel 資料好了沒有。

Go netpoll

一個典型的 Go TCP server:

package main

import (
    "fmt"
    "net"
)

func main() {
    listen, err := net.Listen("tcp", ":8888")
    if err != nil {
        fmt.Println("listen error: ", err)
        return
    }

    for {
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("accept error: ", err)
            break
        }

        // start a new goroutine to handle the new connection
        go HandleConn(conn)
    }
}
func HandleConn(conn net.Conn) {
    defer conn.Close()
    packet := make([]byte, 1024)
    for {
        // 如果沒有可讀資料,也就是讀 buffer 為空,則阻塞
        _, _ = conn.Read(packet)
        // 同理,不可寫則阻塞
        _, _ = conn.Write(packet)
    }
}           

上面是一個基于 Go 原生網絡模型(基于 netpoll)編寫的一個 TCP server,模式是

goroutine-per-connection

,在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯而且對于開發者來說 I/O 是否阻塞是無感覺的,也就是說開發者無需考慮 goroutines 甚至更底層的線程、程序的排程和上下文切換。而 Go netpoll 最底層的事件驅動技術肯定是基于 epoll/kqueue/iocp 這一類的 I/O 事件驅動技術,隻不過是把這些排程和上下文切換的工作轉移到了 runtime 的 Go scheduler,讓它來負責排程 goroutines,進而極大地降低了程式員的心智負擔!

Go netpoll 核心

Go netpoll 通過在底層對 epoll/kqueue/iocp 的封裝,進而實作了使用同步程式設計模式達到異步執行的效果。總結來說,所有的網絡操作都以網絡描述符 netFD 為中心實作。netFD 與底層 PollDesc 結構綁定,當在一個 netFD 上讀寫遇到 EAGAIN 錯誤時,就将目前 goroutine 存儲到這個 netFD 對應的 PollDesc 中,同時調用 gopark 把目前 goroutine 給 park 住,直到這個 netFD 上再次發生讀寫事件,才将此 goroutine 給 ready 激活重新運作。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅動機制。

接下來我們通過分析最新的 Go 源碼(v1.13.4),解讀一下整個 netpoll 的運作流程。

上面的示例代碼中相關的在源碼裡的幾個資料結構和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
    fd *netFD
    lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
    conn
}

// Conn
type conn struct {
    fd *netFD
}

type conn struct {
    fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Write(b)
    if err != nil {
        err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}           

netFD

net.Listen("tcp", ":8888")

方法傳回了一個 TCPListener,它是一個實作了

net.Listener

接口的 struct,而通過

listen.Accept()

接收的新連接配接 TCPConn 則是一個實作了

net.Conn

接口的 struct,它内嵌了

net.conn

struct。仔細閱讀上面的源碼可以發現,不管是 Listener 的 Accept 還是 Conn 的 Read/Write 方法,都是基于一個

netFD

的資料結構的操作,

netFD

是一個網絡描述符,類似于 Linux 的檔案描述符的概念,netFD 中包含一個 poll.FD 資料結構,而 poll.FD 中包含兩個重要的資料結構 Sysfd 和 pollDesc,前者是真正的系統檔案描述符,後者對是底層事件驅動的封裝,所有的讀寫逾時等操作都是通過調用後者的對應方法實作的。

netpoll

的工作流程如下:

  1. 服務端的 netFD 在

    listen

    時會建立 epoll 的執行個體,并将 listenerFD 加入 epoll 的事件隊列
  2. netFD 在

    accept

    時将傳回的 connFD 也加入 epoll 的事件隊列
  3. netFD 在讀寫時出現

    syscall.EAGAIN

    錯誤,通過 pollDesc 的

    waitRead

    方法将目前的 goroutine park 住,直到 ready,從 pollDesc 的

    waitRead

    中傳回

netFD

poll.FD

的源碼:

// Network file descriptor.
type netFD struct {
    pfd poll.FD

    // immutable until Close
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex

    // System file descriptor. Immutable until Close.
    Sysfd int

    // I/O poller.
    pd pollDesc

    // Writev cache.
    iovecs *[]syscall.Iovec

    // Semaphore signaled when file is closed.
    csema uint32

    // Non-zero if this file has been set to blocking mode.
    isBlocking uint32

    // Whether this is a streaming descriptor, as opposed to a
    // packet-based descriptor like a UDP socket. Immutable.
    IsStream bool

    // Whether a zero byte read indicates EOF. This is false for a
    // message based socket connection.
    ZeroReadIsEOF bool

    // Whether this is a file rather than a network socket.
    isFile bool
}           

pollDesc

前面提到了 pollDesc 是底層事件驅動的封裝,netFD 通過它來完成各種 I/O 相關的操作,它的定義如下:

type pollDesc struct {
    runtimeCtx uintptr
}           

這裡的 struct 隻包含了一個指針,而通過 pollDesc 的 init 方法,我們可以找到它具體的定義是在

runtime.pollDesc

這裡:

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return syscall.Errno(errno)
    }
    pd.runtimeCtx = ctx
    return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock

    // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
    // in a lock-free way by all operations.
    // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    // that will blow up when GC starts moving objects.
    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    everr   bool    // marks event scanning error happened
    user    uint32  // user settable cookie
    rseq    uintptr // protects from stale read timers
    rg      uintptr // pdReady, pdWait, G waiting for read or nil
    rt      timer   // read deadline timer (set if rt.f != nil)
    rd      int64   // read deadline
    wseq    uintptr // protects from stale write timers
    wg      uintptr // pdReady, pdWait, G waiting for write or nil
    wt      timer   // write deadline timer
    wd      int64   // write deadline
}           

runtime.pollDesc

包含自身類型的一個指針,用來儲存下一個

runtime.pollDesc

的位址,以此來實作連結清單,可以減少資料結構的大小,所有的

runtime.pollDesc

儲存在

runtime.pollCache

結構中,定義如下:

type pollCache struct {
   lock  mutex
   first *pollDesc
   // PollDesc objects must be type-stable,
   // because we can get ready notification from epoll/kqueue
   // after the descriptor is closed/reused.
   // Stale notifications are detected using seq variable,
   // seq is incremented when deadlines are changed or descriptor is reused.
}           

net.Listen

調用

net.Listen

之後,底層會通過 Linux 的系統調用

socket

方法建立一個 fd 配置設定給 listener,并用以來初始化 listener 的

netFD

,接着調用 netFD 的

listenStream

方法完成對 socket 的 bind&listen 操作以及對

netFD

的初始化(主要是對 netFD 裡的 pollDesc 的初始化),相關源碼如下:

// 調用 linux 系統調用 socket 建立 listener fd 并設定為為阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc        func(int, int, int) (int, error)  = syscall.Socket

// 用上面建立的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
    poll.CloseFunc(s)
    return nil, err
}

// 對 listener fd 進行 bind&listen 操作,并且調用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    // ...
  
    // 完成綁定操作
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
  
    // 完成監聽操作
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
  
    // 調用 init,内部會調用 poll.FD.Init,最後調用 pollDesc.init
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

// 使用 sync.Once 來確定一個 listener 隻持有一個 epoll 執行個體
var serverInit sync.Once

// netFD.init 會調用 poll.FD.Init 并最終調用到 pollDesc.init,
// 它會建立 epoll 執行個體并把 listener fd 加入監聽隊列
func (pd *pollDesc) init(fd *FD) error {
  // runtime_pollServerInit 内部調用了 netpollinit 來建立 epoll 執行個體
    serverInit.Do(runtime_pollServerInit)
  
    // runtime_pollOpen 内部調用了 netpollopen 來将 listener fd 注冊到 
    // epoll 執行個體中,另外,它會初始化一個 pollDesc 并傳回
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return syscall.Errno(errno)
    }
    // 把真正初始化完成的 pollDesc 執行個體指派給目前的 pollDesc 代表自身的指針,
    // 後續使用直接通過該指針操作
    pd.runtimeCtx = ctx
    return nil
}

// netpollopen 會被 runtime_pollOpen,注冊 fd 到 epoll 執行個體,
// 同時會利用萬能指針把 pollDesc 儲存到 epollevent 的一個 8 位的位元組數組 data 裡
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}           

我們前面提到的 epoll 的三個基本調用,Go 在源碼裡實作了對那三個調用的封裝:

#include <sys/epoll.h>  
int epoll_create(int size);  
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 對上面三個調用的封裝
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList           

netFD 就是通過這三個封裝來對 epoll 進行建立執行個體、注冊 fd 和等待事件操作的。

Listener.Accept()

Listener.Accept()

接收來自用戶端的新連接配接,具體還是調用

netFD.accept

方法來完成這個功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}           

netFD.accept

方法裡再調用

poll.FD.Accept

,最後會使用 linux 的系統調用

accept

來完成新連接配接的接收,并且會把 accept 的 socket 設定成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()

    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
    for {
        // 使用 linux 系統調用 accept 接收新連接配接,建立對應的 socket
        s, rsa, errcall, err := accept(fd.Sysfd)
        // 因為 listener fd 在建立的時候已經設定成非阻塞的了,
        // 是以 accept 方法會直接傳回,不管有沒有新連接配接到來;如果 err == nil 則表示正常建立新連接配接,直接傳回
        if err == nil {
            return s, rsa, "", err
        }
        // 如果 err != nil,則判斷 err == syscall.EAGAIN,符合條件則進入 pollDesc.waitRead 方法
        switch err {
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                // 如果目前沒有發生期待的 I/O 事件,那麼 waitRead 會通過 park goroutine 讓邏輯 block 在這裡
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

// 使用 linux 的 accept 系統調用接收新連接配接并把這個 socket fd 設定成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4           

pollDesc.waitRead

方法主要負責檢測目前這個 pollDesc 的上層 netFD 對應的 fd 是否有『期待的』I/O 事件發生,如果有就直接傳回,否則就 park 住目前的 goroutine 并持續等待直至對應的 fd 上發生可讀/可寫或者其他『期待的』I/O 事件為止,然後它就會傳回到外層的 for 循環,讓 goroutine 繼續執行邏輯。

Conn.Read/Conn.Write

我們先來看看

Conn.Read

方法是如何實作的,原理其實和

Listener.Accept

是一樣的,具體調用鍊還是首先調用 conn 的

netFD.Read

,然後内部再調用

poll.FD.Read

,最後使用 linux 的系統調用 read:

syscall.Read

完成資料讀取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    runtime.KeepAlive(fd)
    return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    if len(p) == 0 {
        // If the caller wanted a zero byte read, return immediately
        // without trying (but after acquiring the readLock).
        // Otherwise syscall.Read returns 0, nil which looks like
        // io.EOF.
        // TODO(bradfitz): make it wait for readability? (Issue 15735)
        return 0, nil
    }
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        // 嘗試從該 socket 讀取資料,因為 socket 在被 listener accept 的時候設定成
        // 了非阻塞 I/O,是以這裡同樣也是直接傳回,不管有沒有可讀的資料
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            // err == syscall.EAGAIN 表示目前沒有期待的 I/O 事件發生,也就是 socket 不可讀
            if err == syscall.EAGAIN && fd.pd.pollable() {
                // 如果目前沒有發生期待的 I/O 事件,那麼 waitRead 
                // 會通過 park goroutine 讓邏輯 block 在這裡
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }

            // On MacOS we can see EINTR here if the user
            // pressed ^Z.  See issue #22838.
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}           

conn.Write

conn.Read

的原理是一緻的,它也是通過類似

pollDesc.waitRead

pollDesc.waitWrite

來 park 住 goroutine 直至期待的 I/O 事件發生才傳回,而

pollDesc.waitWrite

的内部實作原理和

pollDesc.waitRead

是一樣的,都是基于

runtime_pollWait

,這裡就不再贅述。

pollDesc.waitRead

pollDesc.waitRead

内部調用了

runtime_pollWait

來達成無 I/O 事件時 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
    // 進入 netpollblock 并且判斷是否有期待的 I/O 事件發生,
    // 這裡的 for 循環是為了一直等到 io ready
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    // gpp 儲存的是 goroutine 的資料結構 g,這裡會根據 mode 的值決定是 rg 還是 wg
    // 後面調用 gopark 之後,會把目前的 goroutine 的抽象資料結構 g 存入 gpp 這個指針
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to WAIT
    // 這個 for 循環是為了等待 io ready 或者 io wait
    for {
        old := *gpp
        // gpp == pdReady 表示此時已有期待的 I/O 事件發生,
        // 可以直接傳回 unblock 目前 goroutine 并執行響應的 I/O 操作
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // 如果沒有期待的 I/O 事件發生,則通過原子操作把 gpp 的值置為 pdWait 并退出 for 循環
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // need to recheck error states after setting gpp to WAIT
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
  
    // waitio 此時是 false,netpollcheckerr 方法目前 pollDesc 對應的 fd 是否是正常的,
    // 通常來說  netpollcheckerr(pd, mode) == 0 是成立的,是以這裡會執行 gopark 
    // 把目前 goroutine 給 park 住,直至對應的 fd 上發生可讀/可寫或者其他『期待的』I/O 事件為止,
    // 然後 unpark 傳回,在 gopark 内部會把目前 goroutine 的抽象資料結構 g 存入
    // gpp(pollDesc.rg/pollDesc.wg) 指針裡,以便在後面的 netpoll 函數取出 pollDesc 之後,
    // 把 g 添加到連結清單裡傳回,接着重新排程 goroutine
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 注冊 netpollblockcommit 回調給 gopark,在 gopark 内部會執行它,儲存目前 goroutine 到 gpp
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent READY notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

// gopark 會停住目前的 goroutine 并且調用傳遞進來的回調函數 unlockf,從上面的源碼我們可以知道這個函數是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
  // gopark 最終會調用 park_m,在這個函數内部會調用 unlockf,也就是 netpollblockcommit,
    // 然後會把目前的 goroutine,也就是 g 資料結構儲存到 pollDesc 的 rg 或者 wg 指針裡
    mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
    _g_ := getg()

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()

    if fn := _g_.m.waitunlockf; fn != nil {
        // 調用 netpollblockcommit,把目前的 goroutine,
        // 也就是 g 資料結構儲存到 pollDesc 的 rg 或者 wg 指針裡
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}

// netpollblockcommit 在 gopark 函數裡被調用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    // 通過原子操作把目前 goroutine 抽象的資料結構 g,也就是這裡的參數 gp 存入 gpp 指針,
    // 此時 gpp 的值是 pollDesc 的 rg 或者 wg 指針
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}           

netpoll

前面已經從源碼的角度分析完了 netpoll 是如何通過 park goroutine 進而達到阻塞 Accept/Read/Write 的效果,而通過調用 gopark,goroutine 會被放置在某個等待隊列中(如 channel 的 waitq ,此時 G 的狀态由

_Grunning

_Gwaitting

),是以G必須被手動喚醒(通過 goready ),否則會丢失任務,應用層阻塞通常使用這種方式。

是以,最後還有一個非常關鍵的問題是:當 I/O 事件發生之後,netpoll 是通過什麼方式喚醒那些在 I/O wait 的 goroutine 的?答案是通過

epoll_wait

,在 Go 源碼中的

src/runtime/netpoll_epoll.go

檔案中有一個

func netpoll(block bool) gList

方法,它會内部調用

epoll_wait

擷取就緒的 fd 清單,并将每個 fd 對應的 goroutine 添加到連結清單傳回:

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
    if epfd == -1 {
        return gList{}
    }
    waitms := int32(-1)
    // 是否以阻塞模式調用 epoll_wait
    if !block {
        waitms = 0
    }
    var events [128]epollevent
retry:
    // 擷取就緒的 fd 清單
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        goto retry
    }
    // toRun 是一個 g 的連結清單,存儲要恢複的 goroutines,最後傳回給調用方
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        var mode int32
        // 判斷發生的事件類型,讀類型或者寫類型
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            // 取出儲存在 epollevent 裡的 pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            // 調用 netpollready,傳入就緒 fd 的 pollDesc,把 fd 對應的 goroutine 添加到連結清單 toRun 中
            netpollready(&toRun, pd, mode)
        }
    }
    if block && toRun.empty() {
        goto retry
    }
    return toRun
}

// netpollready 調用 netpollunblock 傳回就緒 fd 對應的 goroutine 的抽象資料結構 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

// netpollunblock 會依據傳入的 mode 決定從 pollDesc 的 rg 或者 wg 取出當時 gopark 之時存入的
// goroutine 抽象資料結構 g 并傳回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    // mode == 'r' 代表當時 gopark 是為了等待讀事件,而 mode == 'w' 則代表是等待寫事件
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        // 取出 gpp 存儲的 g
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            // Only set READY for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // 重置 pollDesc 的 rg 或者 wg
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0
            }
            // 通過萬能指針還原成 g 并傳回
            return (*g)(unsafe.Pointer(old))
        }
    }
}           

而 Go 在多種場景下都可能會調用

netpoll

檢查檔案描述符狀态。尋找到 I/O 就緒的 socket fd,并找到這些 socket fd 對應的輪詢器中附帶的資訊,根據這些資訊将之前等待這些 socket fd 就緒的 goroutine 狀态修改為

_Grunnable

。執行完

netpoll

之後,會傳回一個就緒 fd 清單對應的 goroutine 清單,接下來将就緒的 goroutine 加入到排程隊列中,等待排程運作。

首先,在 Go runtime scheduler 正常排程 goroutine 之時就有可能會調用

netpoll

擷取到已就緒的 fd 對應的 goroutine 來排程執行:

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    // ...
  
  if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }
  
    // ...
}

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
  // ...
  
  // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(false); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }
  
  // ...
}           

Go scheduler 的核心方法

schedule

裡會調用一個叫

findrunable()

的方法擷取可運作的 goroutine 來執行,而在

findrunable()

方法裡就調用了

netpoll

擷取已就緒的 fd 清單對應的 goroutine 清單。

另外,

sysmon

監控線程也可能會調用到

netpoll

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
        // ...
        now := nanotime()
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            // 以非阻塞的方式調用 netpoll 擷取就緒 fd 清單
            list := netpoll(false) // non-blocking - returns list of goroutines
            if !list.empty() {
                // Need to decrement number of idle locked M's
                // (pretending that one more is running) before injectglist.
                // Otherwise it can lead to the following situation:
                // injectglist grabs all P's but before it starts M's to run the P's,
                // another M returns from syscall, finishes running its G,
                // observes that there is no work to do and no other running M's
                // and reports deadlock.
                incidlelocked(-1)
                // 将其插入排程器的runnable清單中(全局),等待被排程執行
                injectglist(&list)
                incidlelocked(1)
            }
        }
        // retake P's blocked in syscalls
        // and preempt long running G's
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
        // check if we need to force a GC
        if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
            lock(&forcegc.lock)
            forcegc.idle = 0
            var list gList
            list.push(forcegc.g)
            injectglist(&list)
            unlock(&forcegc.lock)
        }
        if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
            lasttrace = now
            schedtrace(debug.scheddetail > 0)
        }
    }
}           

Go runtime 在程式啟動的時候會建立一個獨立的 M 作為監控線程,叫

sysmon

,這個線程為系統級的 daemon 線程,無需 P 即可運作,

sysmon

每 20us~10ms 運作一次。

sysmon

中以輪詢的方式執行以下操作(如上面的代碼所示):

  1. 以非阻塞的方式調用

    runtime.netpoll

    ,從中找出能從網絡 I/O 中喚醒的 G,并調用

    injectglist

    ,将其插入排程器的 runnable 清單中(全局),排程觸發時,有可能從這個全局 runnable 清單擷取 G。然後再循環調用

    startm

    ,直到所有 P 都不處于

    _Pidle

    狀态。
  2. retake

    ,搶占長時間處于

    _Psyscall

    狀态的 P。

綜上,Go 借助于 epoll/kqueue/iocp 和 runtime scheduler 等的幫助,設計出了自己的 I/O 多路複用 netpoll,成功地讓

Listener.Accept

/

conn.Read

conn.Write

等方法從開發者的角度看來是同步模式。

Go netpoll 的價值

通過前面對源碼的分析,我們現在知道 Go netpoll 依托于 runtime scheduler,為開發者提供了一種強大的同步網絡程式設計模式;然而,Go netpoll 存在的意義卻遠不止于此,Go netpoll I/O 多路複用搭配 Non-blocking I/O 而打造出來的這個原生網絡模型,它最大的價值是把網絡 I/O 的控制權牢牢掌握在 Go 自己的 runtime 裡,關于這一點我們需要從 Go 的 runtime scheduler 說起,Go 的 G-P-M 排程模型如下:

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

G 在運作過程中如果被阻塞在某個 system call 操作上,那麼不光 G 會阻塞,執行該 G 的 M 也會解綁 P(實質是被 sysmon 搶走了),與 G 一起進入 sleep 狀态。如果此時有 idle 的 M,則 P 與其綁定繼續執行其他 G;如果沒有 idle M,但仍然有其他 G 要去執行,那麼就會建立一個新的 M。當阻塞在 system call 上的 G 完成 syscall 調用後,G 會去嘗試擷取一個可用的 P,如果沒有可用的 P,那麼 G 會被标記為

_Grunnable

并把它放入全局的 runqueue 中等待排程,之前的那個 sleep 的 M 将再次進入 sleep。

現在清楚為什麼 netpoll 為什麼一定要使用非阻塞 I/O 了吧?就是為了避免讓操作網絡 I/O 的 goroutine 陷入到系統調用進而進入核心态,因為一旦進入核心态,整個程式的控制權就會發生轉移(到核心),不再屬于使用者程序了,那麼也就無法借助于 Go 強大的 runtime scheduler 來排程業務程式的并發了;而有了 netpoll 之後,借助于非阻塞 I/O ,G 就再也不會因為系統調用的讀寫而陷入核心态,當 G 被阻塞在某個 network I/O 操作上時,實際上它不是因為陷入核心态被阻塞住了,而是被 Go runtime 調用 gopark 給 park 住了,此時 G 會被放置到某個 wait queue 中,而 M 會嘗試運作下一個

_Grunnable

的 G,如果此時沒有

_Grunnable

的 G 供 M 運作,那麼 M 将解綁P,并進入 sleep 狀态。當 I/O available,在 wait queue 中的 G 會被喚醒,标記為

_Grunnable

,放入某個可用的 P 的 local 隊列中,綁定一個 M 恢複執行。

Go netpoll 的問題

Go netpoll 的設計不可謂不精巧、性能也不可謂不高效,配合 goroutine 寫網絡程式是真的爽:簡潔高效。然而,沒有任何一種設計和架構是完美的,

goroutine-per-connection

這種模式雖然簡單高效,但是在某些極端的場景下也會暴露出問題:goroutine 雖然非常輕量,它的自定義棧記憶體初始值僅為 2KB,後面按需擴容;海量連接配接的業務場景下,

goroutine-per-connection

,此時 goroutine 數量以及消耗的資源就會呈線性趨勢暴漲,首先給 Go runtime scheduler 造成極大的壓力和侵占系統資源,然後資源被侵占又反過來影響 runtime 的排程,導緻性能大幅下降。

Reactor 模式

目前在 Linux 平台下建構的高性能網絡程式中,大都使用 Reactor 模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。

Reactor 模式本質上指的是使用

I/O 多路複用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)

的模式。

通常設定一個主線程負責做 event-loop 事件循環和 I/O 讀寫,通過 select/poll/epoll_wait 等系統調用監聽 I/O 事件,業務邏輯送出給其他工作線程去做。而所謂『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系統調用上,這樣可以最大限度的複用 event-loop 線程,讓一個線程能服務于多個 sockets。在 Reactor 模式中,I/O 線程隻能阻塞在 I/O multiplexing 函數上(select/poll/epoll_wait)。

Reactor 模式通常的工作流程如下:

  • Server 端完成在

    bind&listen

    之後,将 listenfd 注冊到epollfd中,最後進入 event-loop事件循環。循環過程中會調用

    select/poll/epoll_wait

    阻塞等待,若有在 listenfd 上的新連接配接事件則解除阻塞傳回,并調用

    socket.accept

    接收新連接配接 connfd,并将 connfd 加入到 epollfd 的 I/O 複用(監聽)隊列。
  • 當 connfd 上發生可讀/可寫事件也會解除

    select/poll/epoll_wait

    的阻塞等待,然後進行 I/O 讀寫操作,這裡讀寫 I/O 都是非阻塞 I/O,這樣才不會阻塞 event-loop 的下一個循環。然而,這樣容易割裂業務邏輯,不易了解和維護。
  • read

    讀取資料之後進行解碼并放入隊列中,等待工作線程處理。
  • 工作線程處理完資料之後,傳回到 event-loop 線程,由這個線程負責調用

    write

    把資料寫回 client。

accept 連接配接以及 conn 上的讀寫操作若是在主線程完成,則要求是非阻塞 I/O,因為 Reactor 模式一條最重要的原則就是:I/O 操作不能阻塞 event-loop 事件循環。實際上 event loop 可能也可以是多線程的,隻是一個線程裡隻有一個 select/poll/epoll_wait。

上面提到了 Go netpoll 在某些場景下可能因為建立太多的 goroutine 而過多地消耗系統資源,而在現實世界的網絡業務中,伺服器持有的海量連接配接中在極短的時間視窗内隻有極少數是 active 而大多數則是 idle,就像這樣(非真實資料,僅僅是為了比喻):

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

那麼為每一個連接配接指派一個 goroutine 就顯得太過奢侈了,而 Reactor 模式這種利用 I/O 多路複用進而隻需要使用少量線程即可管理海量連接配接的設計就可以在這樣網絡業務中大顯身手了:

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

在絕大部分應用場景下,我推薦大家還是遵循 Go 的 best practices,以這種 netpoll 模式來建構自己的網絡應用。然而,在某些極度追求性能、壓榨系統資源以及技術棧必須是原生 Go (不考慮 C/C++ 寫中間層而 Go 寫業務層)的業務場景下,我們可以考慮自己建構 Reactor 網絡模型。

gnet

是一個基于事件驅動的高性能和輕量級網絡架構,支援多種協定:TCP/UDP/Unix-Socket。它直接使用

kqueue

系統調用而非标準 Golang 網絡包:

net

來建構網絡應用,它的工作原理類似兩個開源的網絡庫:

netty libuv

的亮點在于它是一個高性能、輕量級、非阻塞的純 Go 實作的傳輸層(TCP/UDP/Unix-Socket)網絡架構,開發者可以使用

來實作自己的應用層網絡協定,進而建構出自己的應用層網絡應用:比如在

上實作 HTTP 協定就可以建立出一個 HTTP 伺服器 或者 Web 開發架構,實作 Redis 協定就可以建立出自己的 Redis 伺服器等等。

,在某些極端的網絡業務場景,比如海量連接配接、高頻建立銷毀連接配接等等場景,

在性能和資源占用上都遠超 Go 原生的

包(基于 netpoll)。

gnet

已經實作了

Multi-Reactors

Multi-Reactors + Goroutine Pool

兩種網絡模型,也得益于這些網絡模型,使得

gnet

成為一個高性能和低損耗的 Go 網絡架構:

Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析
Go netpoll I/O 多路複用建構原生網絡模型之源碼深度解析

功能

  • [x] 高性能 的基于多線程/Go程模型的 event-loop 事件驅動
  • [x] 内置 Round-Robin 輪詢負載均衡算法
  • [x] 内置 goroutine 池,由開源庫 ants 提供支援
  • [x] 内置 bytes 記憶體池,由開源庫 pool
  • [x] 簡潔的 APIs
  • [x] 基于 Ring-Buffer 的高效記憶體利用
  • [x] 支援多種網絡協定:TCP、UDP、Unix Sockets
  • [x] 支援兩種事件驅動機制:Linux 裡的 epoll 以及 FreeBSD 裡的 kqueue
  • [x] 支援異步寫操作
  • [x] 靈活的事件定時器
  • [x] SO_REUSEPORT 端口重用
  • [x] 内置多種編解碼器,支援對 TCP 資料流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,參考自 netty codec ,而且支援自定制編解碼器
  • [ ] 加入更多的負載均衡算法:随機、最少連接配接、一緻性哈希等等
  • [ ] 支援 Windows 平台的 IOCP 事件驅動機制
  • [ ] 支援 TLS
  • [ ] 實作

    gnet

    用戶端

參考

繼續閱讀