從軟體角度看,資料包最先肯定是由網絡裝置驅動程式收到,那麼之後協定棧又是如何接收的,這篇筆記就來看看linux核心和驅動程式是如何配合完整這個接收過程的。
資料包接收模式
目前核心提供了兩種資料包接收模式:非NAPI方式(老方法)和NAPI(新方法,即New API)。新老方法的接收過程分别如下圖所示:
非NAPI模式
如上圖b所示,當網卡收到資料包,産生一個硬中斷,驅動程式在硬中斷處理過程中,從網卡取資料,構造出SKB,然後直接将該SKB放入目前CPU的接收隊列input_pkt_queue中,該隊列随後會由網絡接收軟中斷程式處理,軟中斷處理程式會将資料包遞交給上層協定棧。
圖中的“将資料包添加到輸入隊列”操作是驅動程式通過netif_rx()接口實作的,該接口是核心架構提供給驅動程式使用的。
關于接收隊列和網絡接收軟中斷的介紹見下文。
NAPI模式
如上圖a所示,這種方式的第一步并不是将資料從裝置中讀取出來,而是将網絡裝置對象(即struct net_device)添加到poll_list(一個輪詢隊列)中,然後激活網絡接收軟中斷,在軟中斷處理函數中會周遊該poll_list,依次處理該輪詢隊列上的裝置,回調裝置提供的資料接收函數完成資料包的接收。
注:實際上該隊列中挂接的是下文的struct napi_struct,但是由于網絡裝置和該結構是一一對應的,這麼描述也沒什麼毛病。
新老模式對比
老方法完全依賴于中斷讀取資料包,在高負載場景下,可能會頻繁的中斷CPU,造成資源浪費。新方法背後的思想也簡單:就是中斷+輪詢。首次資料包到達時,中斷CPU,然後驅動程式關掉裝置中斷,将裝置放入poll_list中,隻要裝置中一直有資料,那麼就讓該裝置一直在poll_list中,網絡接收軟中斷會不斷的處理該poll_list,這樣可以不斷的從裝置中讀取資料,直到該裝置中的資料讀取完畢為止。這種模式可以盡可能的減少中斷的次數,但是有不會引入太大的時延,是以目前的驅動基本上都采用這種新方法,當然了,作為核心架構是完全相容這兩種方式的,下文會減少這是怎麼實作的。
接收隊列
從上面可以看出,非NAPI模式,接收隊列就是input_pkt_queue隊列;NAPI模式的接收隊列就是poll_list隊列。
這兩個隊列都是由協定棧統一維護的,因為對于協定棧來講,它要能夠處理來自所有網卡的資料包。為了接收效率,接收隊列都是每個CPU一份,這樣就可以避免加鎖導緻效率變低,核心将接收隊列定義為struct softnet_data:
/*
* Incoming packets are placed on per-cpu queues so that
* no locking is needed.
*/
// 由于struct softnet_data是PER-CPU的,是以對它的通路無需持鎖
struct softnet_data
{
// 對于非NAPI方式的接收,驅動通過輪詢或者硬中斷(或二者結合)的方式将資料包放入該隊列,然後激活
// 輸入軟中斷程式,軟中斷程式會處理該隊列中資料包,基于流量控制的排隊規則将資料包遞交給上層
struct sk_buff_head input_pkt_queue;
// 網絡裝置輪詢隊列。驅動應該将需要輪詢的網絡裝置的struct napi_struct連結到該隊列并激活輸入
// 軟中斷程式。輸入軟中斷程式會周遊該隊列,調用驅動提供的netpoll()接收完成接收
struct list_head poll_list;
// 為了将軟中斷接收處理程式對非NAPI方式和NAPIF方式的處理統一,對于非NAPI接收,在硬中斷處理
// 後,将backlog結構加入到poll_list,然後觸發軟中斷接收程式,具體見下面非NAPI方式的接收
struct napi_struct backlog;
};
PS:實際上softnet_data還包括發送隊列,隻不過這裡我們先将其忽略,聚焦接收過程
struct napi_struct
核心開始支援NAPI模式後,将NAPI和非NAPI兩種情況下的接收過程進行了整合,實際上就是把非NAPI模式下的資料包接收過程抽象成了一個特殊的NAPI模式,這就是靠接收隊列中的backlog成員實作的,這點見後面非NAPI模式接收部分的詳細介紹。這裡先看一個新的資料結構,它用來支援NAPI模式。
/*
* Structure for NAPI scheduling similar to tasklet but with weighting
*/
struct napi_struct {
/* The poll_list must only be managed by the entity which
* changes the state of the NAPI_STATE_SCHED bit. This means
* whoever atomically sets that bit can add this napi_struct
* to the per-cpu poll_list, and whoever clears that bit
* can remove from the list right before clearing the bit.
*/
// 用于将該裝置接入CPU的輪詢隊列poll_list中
struct list_head poll_list;
unsigned long state;
// 該裝置的配額,一次輪詢可以接收的最大資料包數不能大于等于該值,如果poll()的傳回值等于weight
// 有特殊含義,見下面net_rx_action()
int weight;
//驅動提供的輪詢接口,網絡接收軟中斷會回調該接口進行資料的讀取
int (*poll)(struct napi_struct *, int);
};
state目前版本有兩個bit可以設定:
enum
{
// 設定該标記說明該網絡裝置已經被挂在了poll_list中等待被輪詢
NAPI_STATE_SCHED,
// 設定該标記說明該網絡裝置此時不能被排程接收資料包
NAPI_STATE_DISABLE,
};
接收隊列初始化
接收隊列的初始化是在裝置接口層初始化過程中完成的,代碼片段如下:
int weight_p __read_mostly = 64; /* old backlog weight */
static int __init net_dev_init(void)
{
...
// 為了高效,每個CPU都有獨立收發隊列,這樣可以減少并發需要的持鎖
for_each_possible_cpu(i) {
struct softnet_data *queue;
queue = &per_cpu(softnet_data, i);
// 初始化非NAPI模式的接收隊列
skb_queue_head_init(&queue->input_pkt_queue);
// 初始化NAPI模式的輪詢隊列
INIT_LIST_HEAD(&queue->poll_list);
// 初始化非NAPI模式對應的backlog,其poll()函數為process_backlog
queue->backlog.poll = process_backlog;
queue->backlog.weight = weight_p;
}
// 注冊網絡接收和發送軟中斷處理函數
open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);
open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);
}
如上,整個struct softnet_data對象是每個CPU一份,這意味其中的接收隊列也是每個CPU一份,是以接收過程中對這些隊列的操作時無需持鎖的。
系統參數:weight_p是一個系統級參數,和/proc/sys/net/core/dev_weight檔案對應,預設為64。
接收軟中斷
從上面的介紹可以發現,無論是哪種接收模式,都需要接收軟中斷參與,下面先看看接收軟中斷的邏輯實作:
int netdev_budget __read_mostly = 300;
static void net_rx_action(struct softirq_action *h)
{
// 擷取目前CPU上的待輪詢裝置隊列
struct list_head *list = &__get_cpu_var(softnet_data).poll_list;
// 記錄本次輪詢的開始時間
unsigned long start_time = jiffies;
// 本次軟中斷接收可以接收的最大資料包數目,即配額,這是一個總配額,
// 即所有網卡的接收總數不能超過該值
int budget = netdev_budget;
void *have;
// 由于網卡中斷處理函數也會操作poll_list(添加napi_struct),是以雖然接收隊列
// 是每個CPU一份,但是還是需要關閉本地CPU的中斷
local_irq_disable();
while (!list_empty(list)) {
struct napi_struct *n;
int work, weight;
// cond1:本次接收軟中斷的可接收配額已經用完了,是以停止接收,等待下次排程
// cond2: 本次接收軟中斷的執行時間已經超過一個時鐘嘀嗒,是以也停止接收,等待下次排程
// 顯然這種設計從接收包數和處理時長兩個次元來控制軟中斷的執行時長,避免其長時間執行(因為關閉本地CPU中斷了)
// 進而影響整個系統的響應速度
if (unlikely(budget <= 0 || jiffies != start_time))
goto softnet_break;
// 下面的注釋解釋了為什麼這裡可以開啟中斷
local_irq_enable();
/* Even though interrupts have been re-enabled, this
* access is safe because interrupts can only add new
* entries to the tail of this list, and only ->poll()
* calls can remove this head entry from the list.
*/
n = list_entry(list->next, struct napi_struct, poll_list);
// netpoll相關
have = netpoll_poll_lock(n);
// 擷取該網絡裝置自己的一次輪詢的接收配額
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidently calling ->poll() when NAPI is not scheduled.
*/
// 調用驅動程式提供的poll接口進行資料的接收,傳回值work代表實際讀取到的資料包數
// 如果資料已經全部讀取完畢,poll的實作應該将該裝置從輪詢隊列中移除
work = 0;
// 隻有網絡裝置設定了NAPI_STATE_SCHED比特位才能被真正排程
if (test_bit(NAPI_STATE_SCHED, &n->state))
work = n->poll(n, weight);
// bug提示: 驅動程式承諾一次輪詢的接收的資料包數不會超過weight,但實際卻超過了
WARN_ON_ONCE(work > weight);
// 從總的配額中減去該網絡裝置消耗的配額
budget -= work;
// 下面可能會修改poll_list,是以需要重新關閉本地CPU中斷
local_irq_disable();
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
// 這段代碼的邏輯在下面解釋
if (unlikely(work == weight)) {
if (unlikely(napi_disable_pending(n)))
__napi_complete(n);
else
list_move_tail(&n->poll_list, list);
}
netpoll_poll_unlock(have);
}
out:
local_irq_enable();
return;
softnet_break:
// 增加time_squeeze,如果該值過大,表示每次軟中斷都沒有處理完資料包,
// 說明網卡接收是非常忙碌的,是以這裡可能是性能瓶頸
__get_cpu_var(netdev_rx_stat).time_squeeze++;
// 因為poll_list中還有裝置需要接收資料,是以需要再次激活軟中斷
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}
系統參數 :netdev_budget是一個系統級參數,和/proc/sys/net/core/netdev_budget檔案對應,預設為300。
NAPI模式接收資料包
首先需要說明的是,驅動程式可以選擇不支援這種方式,但是對于高速網絡裝置,實作它是必要的,如前面所講,這種方式可以減少中斷CPU的次數,對接收效率有很大的提升。NAPI方式的資料包處理方式如下圖所示:
實作要點有兩個:
- 中斷處理程式的實作;
- 輪詢接口poll()的實作;
下面以e100網卡為例,看這兩個過程的核心代碼實作。
中斷處理程式
static irqreturn_t e100_intr(int irq, void *dev_id)
{
struct net_device *netdev = dev_id;
struct nic *nic = netdev_priv(netdev);
...
// 需要接收資料包,檢查是否可以激活,如果需要激活則關閉硬中斷,然後開始排程
if(likely(netif_rx_schedule_prep(netdev, &nic->napi))) {
e100_disable_irq(nic);
__netif_rx_schedule(netdev, &nic->napi);
}
return IRQ_HANDLED;
}
激活接收軟中斷
激活接收軟中斷之前,先得判斷下是否真的需要激活,netif_rx_schedule_prep()用于檢查這種必要性:
/* Test if receive needs to be scheduled but only if up */
static inline int netif_rx_schedule_prep(struct net_device *dev, struct napi_struct *napi)
{
return napi_schedule_prep(napi);
}
static inline int napi_disable_pending(struct napi_struct *n)
{
return test_bit(NAPI_STATE_DISABLE, &n->state);
}
/**
* napi_schedule_prep - check if napi can be scheduled
* @n: napi context
*
* Test if NAPI routine is already running, and if not mark
* it as running. This is used as a condition variable
* insure only one NAPI poll instance runs. We also make
* sure there is no pending NAPI disable.
*/
static inline int napi_schedule_prep(struct napi_struct *n)
{
// 裝置的排程沒有被DISABLE,裝置也還沒有處于排程狀态(NAPI_STATE_SCHED标志位置位)
return !napi_disable_pending(n) &&
!test_and_set_bit(NAPI_STATE_SCHED, &n->state);
}
實際上就是state中的兩個标記都沒有設定,這種情況下說明滿足激活條件(沒有NAPI_STATE_DISABLE置位說明裝置還可以工作;沒有NAPI_STATE_SCHED置位說明目前裝置還沒有被排程),那麼設定NAPI_STATE_SCHED标記。
__netif_rx_schedule()進行真正的排程處理,代碼如下:
/* Add interface to tail of rx poll list. This assumes that _prep has
* already been called and returned 1.
*/
static inline void __netif_rx_schedule(struct net_device *dev, struct napi_struct *napi)
{
__napi_schedule(napi);
}
/**
* __napi_schedule - schedule for receive
* @n: entry to schedule
*
* The entry's receive function will be scheduled to run
*/
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;
local_irq_save(flags);
// 将裝置加入到接收輪詢隊列中
list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);
// 激活接收軟中斷
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
local_irq_restore(flags);
}
EXPORT_SYMBOL(__napi_schedule);
還提供了一個函數來同時完成上面的兩步:
/* Try to reschedule poll. Called by irq handler. */
static inline void netif_rx_schedule(struct net_device *dev, struct napi_struct *napi)
{
if (netif_rx_schedule_prep(dev, napi))
__netif_rx_schedule(dev, napi);
}
poll()接口的實作
static int e100_poll(struct napi_struct *napi, int budget)
{
struct nic *nic = container_of(napi, struct nic, napi);
struct net_device *netdev = nic->netdev;
unsigned int work_done = 0;
// 進行資料包的接收處理,work_done儲存接收了幾個資料包
e100_rx_clean(nic, &work_done, budget);
e100_tx_clean(nic);
// 接收的資料包量小于配額,說明硬體中的資料已經收完了,那麼結束排程過程,并且重新使能硬體中斷
if (work_done < budget) {
netif_rx_complete(netdev, napi);
e100_enable_irq(nic);
}
// 傳回實際接收的資料包個數
return work_done;
}
回頭再來了解下前面接收軟中斷對poll()回調傳回值的處理。實際上這裡要實作的目标很簡單:如果網卡已經接收完了所有的資料,那麼停止排程它,否則将其放入poll_list的末尾等待再次輪詢到。針對傳回值work和一次輪詢可接收的資料包配額weight的大小關系,有如下三種情況:
- work小于weight,這種情況網卡肯定已經接收完了資料,由驅動程式負責結束排程即可;
- work等于weight,這種情況網卡是否是剛好接收完了全部資料,隻有驅動程式知道,但是架構不能做這樣的假設,因為它和沒有接收完的情況的傳回值相同,是以架構是按照沒有接收完處理,有架構負責将裝置加入poll_list的末尾繼續排程;
- work大于weight,這種情況是bug,不允許出現的,見前面的poll()傳回後的BUG_ON()檢查,這也是第二種情況的原因。
static void net_rx_action(struct softirq_action *h)
{
...
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(work == weight)) {
// 這是一種特殊情況處理,如果裝置已經被Disable了,也結束排程
if (unlikely(napi_disable_pending(n)))
__napi_complete(n);
else
list_move_tail(&n->poll_list, list);
}
...
}
結束排程
/* Remove interface from poll list: it must be in the poll list
* on current cpu. This primitive is called by dev->poll(), when
* it completes the work. The device cannot be out of poll list at this
* moment, it is BUG().
*/
// 驅動程式的poll()要調用該接口,因為調用poll()之前本地CPU中斷已經被關掉了
static inline void netif_rx_complete(struct net_device *dev, struct napi_struct *napi)
{
unsigned long flags;
local_irq_save(flags);
__netif_rx_complete(dev, napi);
local_irq_restore(flags);
}
/* same as netif_rx_complete, except that local_irq_save(flags)
* has already been issued
*/
static inline void __netif_rx_complete(struct net_device *dev, struct napi_struct *napi)
{
__napi_complete(napi);
}
/**
* napi_complete - NAPI processing complete
* @n: napi context
*
* Mark NAPI processing as complete.
*/
static inline void __napi_complete(struct napi_struct *n)
{
// 當裡面的條件成立時觸發BUG_ON宏
BUG_ON(!test_bit(NAPI_STATE_SCHED, &n->state));
list_del(&n->poll_list);
smp_mb__before_clear_bit();
clear_bit(NAPI_STATE_SCHED, &n->state);
}
非NAPI模式接收資料
這種模式下,驅動程式在收到資料後會調用架構提供的netif_rx()将資料包放入input_pkt_queue中待網絡接收軟中斷程式進一步處理。
netif_rx()
int netdev_max_backlog __read_mostly = 1000;
/**
* netif_rx - post buffer to the network code
* @skb: buffer to post
*
* This function receives a packet from a device driver and queues it for
* the upper (protocol) levels to process. It always succeeds. The buffer
* may be dropped during processing for congestion control or by the
* protocol layers.
*
* return values:
* NET_RX_SUCCESS (no congestion)
* NET_RX_DROP (packet was dropped)
*
*/
int netif_rx(struct sk_buff *skb)
{
struct softnet_data *queue;
unsigned long flags;
// 如果被netpoll處理了,直接傳回DROP,協定棧不處理。
// 這裡忽略netpoll機制,認為其沒有處理
if (netpoll_rx(skb))
return NET_RX_DROP;
// 如果驅動程式沒有為資料包設定接收時間戳,在這裡設定它
if (!skb->tstamp.tv64)
net_timestamp(skb);
/*
* The code is rearranged so that the path is the most
* short when CPU is congested, but is still operating.
*/
// 下面要把SKB放入input_pkt_queue隊列,是以要先關閉本地CPU的硬中斷,理由同上面poll_list
local_irq_save(flags);
// 擷取本地CPU的收發隊列
queue = &__get_cpu_var(softnet_data);
// 網卡裝置接收資料包個數統計值+1
__get_cpu_var(netdev_rx_stat).total++;
// 判斷input_pkt_queue隊列中資料包個數是否超過了系統限制netdev_max_backlog
if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
// 如果隊列不為空,說明軟中斷處理程式已經在處理該隊列了,這時隻需将資料包放入input_pkt_queue中即可
if (queue->input_pkt_queue.qlen) {
enqueue:
dev_hold(skb->dev);
__skb_queue_tail(&queue->input_pkt_queue, skb);
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
// 如果隊列為空,說明第一個資料包,那麼需要将backlog排程到poll_list中,如果需要還要激活軟中斷處理程式,
// 這裡核心巧妙的将NAPI和非NAPI模式的處理流程進行了相容
napi_schedule(&queue->backlog);
// 激活後把資料包加入到input_pkt_queue中
goto enqueue;
}
// 到這裡說明input_pkt_queue隊列中的資料包超過了系統限制,這時會統計後丢棄該資料包
__get_cpu_var(netdev_rx_stat).dropped++;
local_irq_restore(flags);
kfree_skb(skb);
return NET_RX_DROP;
}
從代碼中可以看出,netif_rx()所做的工作就是将資料包放入input_pkt_queue中,如果沒有激活接收軟中斷,那麼激活它。
系統參數:netdev_max_backlog是一個系統級參數,和/proc/sys/net/core/netdev_max_backlog檔案對應,預設為1000。
非NAPI模式的poll()接口
看到這裡就能看得出來,核心将非NAPI方式的處理方式也抽象成一種特殊的NAPI,并為其實作了特殊的poll()函數,該poll()函數需要做的事情就是周遊input_pkt_queue隊列,将其中的資料包遞交給上層協定,這就是struct softnet_data中backlog的設計思想,代碼如下:
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
// 擷取本地CPU的接收隊列
struct softnet_data *queue = &__get_cpu_var(softnet_data);
unsigned long start_time = jiffies;
// 讀取資料量超過配額或者到達1個jiffies後結束
napi->weight = weight_p;
do {
struct sk_buff *skb;
struct net_device *dev;
local_irq_disable();
// 取出一個資料包
skb = __skb_dequeue(&queue->input_pkt_queue);
// 如果隊列已空,則将這個特殊的napi_struct從輪詢隊列中移除,結束排程
if (!skb) {
__napi_complete(napi);
local_irq_enable();
break;
}
local_irq_enable();
dev = skb->dev;
// 直接将資料包遞交給高層協定
netif_receive_skb(skb);
// 裝置引用計數-1,可以看到,一旦SKB離開層二,它與裝置的綁定關系就被解除了,是以在上層協定中如過
// 想要通過skb->dev通路其網絡裝置是有風險的,因為底層的net_device可能已經被删除了
dev_put(dev);
} while (++work < quota && jiffies == start_time);
// 傳回本次實際接收的資料包數
return work;
}