天天看點

網絡收包流程-軟中斷中process_backlog和poll方式處理流程(二)

      在硬中斷中觸發了軟中斷後,最終會調用軟中斷處理函數 net_rx_action,注意:硬中斷流程觸發軟中斷後退出中斷上下文,但是并不會立刻進入軟中斷,具體的實作需要了解軟中斷處理流程。

1.軟中斷處理函數net_rx_action

網絡收包流程-軟中斷中process_backlog和poll方式處理流程(二)

具體實作詳解:

static void net_rx_action(struct softirq_action *h)
{
    struct softnet_data *sd = this_cpu_ptr(&softnet_data);//擷取目前cpu的sd變量
    unsigned long time_limit = jiffies + 2;
    int budget = netdev_budget;這個值就是 net.core.netdev_max_backlog,通過sysctl來修改,表示一次軟中斷處理skb的數目,系統預設定義為300
    LIST_HEAD(list);
    LIST_HEAD(repoll);

    local_irq_disable();//禁止中斷(中斷響應的時候會把特定于裝置的poll_list放入到sd中)會把擷取sd的poll_list連結清單 
    list_splice_init(&sd->poll_list, &list);//将sd->poll_list接到list的開頭
    local_irq_enable();//打開中斷,正常處理poll_list

    for (;;) {
        struct napi_struct *n;

        if (list_empty(&list)) {
            if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))//檢查POLL隊列(poll_list)上是否有裝置在準備等待輪詢
                return;
            break;
        }

        n = list_first_entry(&list, struct napi_struct, poll_list);//輪詢sd->poll_list上的所有裝置
        budget -= napi_poll(n, &repoll);//調用poll函數從網卡驅動中讀取一定數量的skb

        /* If softirq window is exhausted then punt.
         * Allow this to run for 2 jiffies since which will allow
         * an average latency of 1.5/HZ.
         */
        if (unlikely(budget <= 0 ||
                 time_after_eq(jiffies, time_limit))) {//如果讀取的數量超過300或者時間超過一個jiffies,則終止中斷處理
            sd->time_squeeze++;
            break;
        }
    }

    __kfree_skb_flush();
    local_irq_disable();//同上

    list_splice_tail_init(&sd->poll_list, &list);
    list_splice_tail(&repoll, &list);
    list_splice(&list, &sd->poll_list);//将未處理完的list裝置連結清單接到sd->poll_list開頭
    if (!list_empty(&sd->poll_list))  //如果poll list中不為空,表示還有skb沒有讀取完成,則繼續讀取,觸發下一次軟中斷
        __raise_softirq_irqoff(NET_RX_SOFTIRQ);

    net_rps_action_and_irq_enable(sd);//本地中斷開啟,根據條件發送IPI給其他CPU
}
           

2.napi_poll

static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
    void *have;
    int work, weight;

    list_del_init(&n->poll_list);//從連結清單中拿掉n

    have = netpoll_poll_lock(n);

    weight = n->weight;//讀取配額,表示裝置能讀取的分組數,此權重可由裝置驅動指定,但都不能超過該裝置可以在Rx緩沖區中存儲的分組的數目

    /* This NAPI_STATE_SCHED test is for avoiding a race
     * with netpoll's poll_napi().  Only the entity which
     * obtains the lock and sees NAPI_STATE_SCHED set will
     * actually make the ->poll() call.  Therefore we avoid
     * accidentally calling ->poll() when NAPI is not scheduled.
     */
    work = 0;
    if (test_bit(NAPI_STATE_SCHED, &n->state)) {//如果napi poll被排程狀态
        work = n->poll(n, weight);//執行目前裝置n的poll回調,非NAPI調用process_backlog,NAPI則調用特定裝置的poll函數
        trace_napi_poll(n);
    }

    WARN_ON_ONCE(work > weight);

    if (likely(work < weight))//讀取小于配額,全部讀出,退出
        goto out_unlock;

    //讀取數等于配額表示尚未讀完
    /* Drivers must not modify the NAPI state if they
     * consume the entire weight.  In such cases this code
     * still "owns" the NAPI instance and therefore can
     * move the instance around on the list at-will.
     */
    if (unlikely(napi_disable_pending(n))) {//如果napi狀态為disable,則執行完成項
        napi_complete(n);
        goto out_unlock;
    }

    if (n->gro_list) {//如果等待合并的skb連結清單存在,清理過時的節點
        /* flush too old packets
         * If HZ < 1000, flush all packets.
         */
        napi_gro_flush(n, HZ >= 1000);
    }

    /* Some drivers may have called napi_schedule
     * prior to exhausting their budget.
     */
    if (unlikely(!list_empty(&n->poll_list))) {
        pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
                 n->dev ? n->dev->name : "backlog");
        goto out_unlock;
    }

    list_add_tail(&n->poll_list, repoll);//未處理完,核心接下來将該裝置移動到輪詢表末尾,在連結清單中所有其他裝置都處理過之後,繼續輪詢該裝置。

out_unlock:
    netpoll_poll_unlock(have);

    return work;
}
           

3.對于非NAPI方式的網卡收報最終會調用process_backlog來處理網路分組。

網絡收包流程-軟中斷中process_backlog和poll方式處理流程(二)

process_backlog主要完成二項工作:

1) __skb_dequeue從等待隊列移除一個套接字緩沖區,該緩沖區管理着一個接收到的分組。

2) 調用netif_receive_skb函數分析分組類型,以便根據分組類型将分組傳遞給網絡層的接收函數(即傳遞到網絡系統的更高一層)。為此,該函數周遊所有可能負責目前分組類型的所有網絡層函數,一一調用deliver_skb函數( deliver_skb函數使用一個特定于分組類型的處理程式func,承擔對分組的更高層(例如網際網路絡層)的處理)。

具體實作詳解:

static int process_backlog(struct napi_struct *napi, int quota)
{
    int work = 0;
    struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);//擷取目前backlog所在的sd

    /* Check if we have pending ipi, its better to send them now,
     * not waiting net_rx_action() end.
     */
    if (sd_has_rps_ipi_waiting(sd)) {//是否有rps ipi等待,如果是需要發送ipi中斷給其他CPU
        local_irq_disable();
        net_rps_action_and_irq_enable(sd);
    }

    napi->weight = weight_p;//設定每次處理的最大資料包數,預設為6
    local_irq_disable();//關閉中斷
    while (work < quota) {//如果處理的分組小于配額則一直接收分組
        struct sk_buff *skb;
        unsigned int qlen;

        while ((skb = __skb_dequeue(&sd->process_queue))) {//從緩存隊列中取skb向上層輸入,直到process隊列處理完或者裝置配額用完。
            rcu_read_lock();
            local_irq_enable();//開中斷
            __netif_receive_skb(skb);//處理封包
            rcu_read_unlock();
            local_irq_disable();
            input_queue_head_incr(sd);//将隊列頭部往後偏移一個機關
            if (++work >= quota) {//如果處理封包數超過裝置配額,則退出
                local_irq_enable();
                return work;//傳回處理封包分則數
            }
        }
        //如果process隊列被處理完,則需要繼續合并input隊列到process隊列。
        rps_lock(sd);
        qlen = skb_queue_len(&sd->input_pkt_queue);//擷取input隊列長度
        if (qlen)  //input隊列不為空
            skb_queue_splice_tail_init(&sd->input_pkt_queue,
                           &sd->process_queue); //把input隊列合并到process隊列中,繼續處理

        if (qlen < quota - work) {//如果剩餘配合還是大于待處理分組隊列的長度,則調整配合大小(減小)
            /*
             * Inline a custom version of __napi_complete().
             * only current cpu owns and manipulates this napi,
             * and NAPI_STATE_SCHED is the only possible flag set on backlog.
             * we can use a plain write instead of clear_bit(),
             * and we dont need an smp_mb() memory barrier.
             */
            napi->state = 0;

            quota = work + qlen;
        }
        rps_unlock(sd);
    }
    local_irq_enable();

    return work;
}
           

3.NAPI的poll函數處理,以gro_cell_poll為例

/* called under BH context */
static inline int gro_cell_poll(struct napi_struct *napi, int budget)
{
    struct gro_cell *cell = container_of(napi, struct gro_cell, napi);
    struct sk_buff *skb;
    int work_done = 0;

    while (work_done < budget) {//小于配額就不斷接收
        skb = __skb_dequeue(&cell->napi_skbs);//從隊列中取出一個分組
        if (!skb)
            break;//接收完,退出
        napi_gro_receive(napi, skb);//接收
        work_done++;//接收分組計數
    }

    if (work_done < budget)//如果分組處理完成,則退出poll
        napi_complete_done(napi, work_done);
    return work_done;//傳回接收分組數目
}
           

總結:

 poll函數最多允許處理budget個分組。該函數傳回實際上處理的分組的數目。他的處理存在以下兩種情況:

1)如果處理分組的數目小于預算,那麼沒有更多的分組, Rx緩沖區為空,否則,肯定還需要處理剩餘的分組(亦即,傳回值不可能小于預算)。是以, netif_rx_complete将該情況通知核心,核心将從輪詢表移除該裝置。接下來,驅動程式必須通過特定于硬體的适當方法來重新啟用IRQ。

2)已經完全用掉了預算,但仍然有更多的分組需要處理。裝置仍然留在輪詢表上,不啟用中斷。

繼續閱讀