天天看點

linux網絡實作分析(3)——資料包的發送(IP層到鍊路層)

        當上層準備好一個包之後,交給鍊路層,鍊路層資料包發送主要通過dev_queue_xmit函數處理。資料包的發送可分為兩種,一種是正常的傳輸流程,即通過網卡驅動,另一種是通過軟中斷(見注3)。為了了解友善,首先看一下dev_queue_xmi函數的整體調用關系圖。

linux網絡實作分析(3)——資料包的發送(IP層到鍊路層)

    <b>dev_queue_xmit</b>

    本函數用來将帶發送的skb加入一個dev的隊列(Queue),調用這個函數前必須設定好skb的device和priority,本函數可以在中斷上下文中被調用。

傳回值:

      傳回非0(正數或負數)表示函數出錯,<b>傳回</b><b>0</b><b>表示成功,但是并不表示資料包被成功發送出去,因為資料包可能因為限速等原因被丢掉</b>。

    <b>函數執行後傳入的skb</b><b>将被釋放,是以如果想控制資料包,實作對skb</b><b>的重傳時需要增加skb</b><b>的引用計數。</b>

<b> </b>

  <b>當調用此函數時中斷必須是打開的,因為BH enable</b><b>必須要求IRQ enable</b><b>,否則會造成死鎖</b>。

int dev_queue_xmit(struct sk_buff *skb)

{

struct net_device *dev = skb-&gt;dev;

struct netdev_queue *txq;

struct Qdisc *q;

int rc = -ENOMEM;

/* GSO will handle the following emulations directly. */

if (netif_needs_gso(dev, skb))

goto gso;

if (skb_has_frags(skb) &amp;&amp;

!(dev-&gt;features &amp; NETIF_F_FRAGLIST) &amp;&amp;

__skb_linearize(skb))

goto out_kfree_skb;

//如果skb有分片但是發送裝置不支援分片,或分片中有分片在高端記憶體但發送裝置不支援DMA,需要将所有段重新組合成一個段 ,這裡__skb_linearize其實就是__pskb_pull_tail(skb, skb-&gt;data_len),這個函數基本上等同于pskb_may_pull  ,pskb_may_pull的作用就是檢測skb對應的主buf中是否有足夠的空間來pull出len長度,如果不夠就重新配置設定skb并将frags中的資料拷貝入新配置設定的主buff中,而這裡将參數len設定為skb-&gt;datalen, 也就是會将所有的資料全部拷貝到主buff中,以這種方式完成skb的線性化。

if (skb_shinfo(skb)-&gt;nr_frags &amp;&amp;

(!(dev-&gt;features &amp; NETIF_F_SG) || illegal_highdma(dev, skb))

&amp;&amp;

//如果資料包沒有被計算校驗和并且發送裝置不支援這個協定的校驗,則在此進行校驗和的計算(注1)。如果上面已經線性化了一次,這裡的__skb_linearize就會直接傳回,注意差別frags和frag_list,前者是将多的資料放到單獨配置設定的頁面中,sk_buff隻有一個。而後者則是連接配接多個sk_buff 

    if

(skb-&gt;ip_summed == CHECKSUM_PARTIAL) {

skb_set_transport_header(skb, skb-&gt;csum_start -

                          skb_headroom(skb));

if (!dev_can_checksum(dev, skb) &amp;&amp; skb_checksum_help(skb))

    }

gso:

//關閉軟中斷,禁止cpu搶占

rcu_read_lock_bh();

//選擇一個發送隊列,如果裝置提供了select_queue回調函數就使用它,否則由核心選擇一個隊列,這裡隻是Linux核心多隊列的實作,但是要真正的使用都隊列,需要網卡支援多隊列才可以,一般的網卡都隻有一個隊列。在調用alloc_etherdev配置設定net_device是,設定隊列的個數

txq =

dev_pick_tx(dev, skb);

// 從netdev_queue結構上擷取裝置的qdisc 

    q

= rcu_dereference(txq-&gt;qdisc);

//如果該裝置有隊列可用,就調用__dev_xmit_skb

if (q-&gt;enqueue) {

<b>        rc = __dev_xmit_skb(skb, q, dev, txq);</b>

goto out;

//下面的處理是在沒有發送隊列的情況,軟裝置一般沒有發送隊列:如lo、tunnle;我們所要做的就是直接調用驅動的hard_start_xmit将它發送出去  如果發送失敗就直接丢棄,因為沒有隊列可以儲存它  

if (dev-&gt;flags &amp; IFF_UP) { //确定裝置是否開啟

int cpu = smp_processor_id(); /* ok because BHs are off */

if (txq-&gt;xmit_lock_owner != cpu) {//是否在同一個cpu上

HARD_TX_LOCK(dev, txq, cpu);

if (!netif_tx_queue_stopped(txq)) {//确定隊列是運作狀态

                rc = NET_XMIT_SUCCESS;

                if (!<b>dev_hard_start_xmit(skb, dev, txq)</b>) {

                    HARD_TX_UNLOCK(dev, txq);

                    goto out;

                }

}

HARD_TX_UNLOCK(dev, txq);

            if (net_ratelimit())

                printk(KERN_CRIT "Virtual

device %s asks to "

                       "queue

packet!\n", dev-&gt;name);

} else {// txq-&gt;xmit_lock_owner == cpu的情況,說明發生遞歸

if (net_ratelimit())

                printk(KERN_CRIT "Dead

loop on virtual device "

                       "%s, fix it

urgently!\n", dev-&gt;name);

rc = -ENETDOWN;

rcu_read_unlock_bh();

out_kfree_skb:

kfree_skb(skb);

return rc;

out:

 <b>__dev_xmit_skb</b>

_dev_xmit_skb函數主要做兩件事情:

(1)如果流控對象為空的,試圖直接發送資料包。

(2)如果流控對象不空,将資料包加入流控對象,并運作流控對象。

static inline int __dev_xmit_skb(struct

sk_buff *skb, struct Qdisc *q,

                 struct net_device *dev,

                 struct netdev_queue *txq)

spinlock_t *root_lock = qdisc_lock(q);//見注2

int rc;

spin_lock(root_lock);   //鎖qdisc

if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &amp;q-&gt;state))) {//判斷隊列是否失效

rc = NET_XMIT_DROP;

else if ((q-&gt;flags &amp; TCQ_F_CAN_BYPASS) &amp;&amp; !qdisc_qlen(q)

!test_and_set_bit(__QDISC_STATE_RUNNING, &amp;q-&gt;state)) {

/*

* This is a work-conserving queue; there are no old skbs

* waiting to be sent out; and the qdisc is not running -

         * xmit the skb directly.

*/

__qdisc_update_bstats(q, skb-&gt;len);

if (sch_direct_xmit(skb, q, dev, txq, root_lock))

<b>__qdisc_run(q);</b>

else

clear_bit(__QDISC_STATE_RUNNING, &amp;q-&gt;state);

rc = NET_XMIT_SUCCESS;

else {

rc = qdisc_enqueue_root(skb, q);

<b>qdisc_run(q);</b>

spin_unlock(root_lock);

   <b>qdisc_run</b>

有兩個時機将會調用qdisc_run():

1.__dev_xmit_skb()

2. 軟中斷服務線程NET_TX_SOFTIRQ

static inline void qdisc_run(struct Qdisc

*q)

if (!test_and_set_bit(__QDISC_STATE_RUNNING, &amp;q-&gt;state))//将隊列設定為運作狀态

__qdisc_run(q);

   <b>__qdisc_run</b>

void __qdisc_run(struct Qdisc *q)

unsigned long start_time = jiffies;

while (<b>qdisc_restart(q)</b>) { //傳回值大于0,說明流控對象非空

        /*如果發現本隊列運作的時間太長了,将會停止隊列的運作,并将隊列加入output_queue連結清單頭

         * Postpone processing if  (延遲處理)

         * 1. another process needs the CPU;

         * 2. we've been doing it for too long.

         */

if (need_resched() || jiffies != start_time) {

//已經不允許繼續運作本流控對象

__netif_schedule(q); //将本qdisc加入每cpu變量softnet_data的output_queue連結清單中

break;

  //清除隊列的運作辨別

循環調用qdisc_restart發送資料,下面這個函數qdisc_restart是真正發送資料包的函數,它從隊列上取下一個幀,然後嘗試将它發送出去,若發送失敗則一般是重新入隊。

此函數傳回值為:發送成功時傳回剩餘隊列長度,發送失敗時傳回0(若發送成功且剩餘隊列長度為0也傳回0)

   <b>qdisc_restart</b>

__QDISC_STATE_RUNNING狀态保證同一時刻隻有一個cpu在處理這個qdisc,qdisc_lock(q)用來保證對這個隊列的順序通路。

通常netif_tx_lock使用來保本<b>裝置驅動</b>的順序(獨占)通路的,qdisc_lock(q)用來保證qdisc的順序通路,這兩個是互斥的,獲得其中一個必須釋放另一個。

static inline int qdisc_restart(struct

Qdisc *q)

struct net_device *dev;

spinlock_t *root_lock;

struct sk_buff *skb;

    /* Dequeue packet */

skb = dequeue_skb(q); //一開始就調用dequeue函數

if (unlikely(!skb))

return 0;  //傳回0說明隊列是空的或者被限制

root_lock = qdisc_lock(q);

dev = qdisc_dev(q);

txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

return <b>sch_direct_xmit(skb, q,</b>

dev, txq, root_lock); //用于發送資料包

   <b>sch_direct_xmit</b>

   發送一個skb,将隊列置為__QDISC_STATE_RUNNING狀态,保證隻有一個cpu運作這個函數,傳回0表示隊列為空或者發送受限,大于0表示隊列非空。

int sch_direct_xmit(struct sk_buff *skb,

struct Qdisc *q,

struct net_device *dev, struct netdev_queue *txq,

spinlock_t *root_lock)

int ret = NETDEV_TX_BUSY;

spin_unlock(root_lock);// release qdisc,因為後面要擷取裝置鎖

   // 調用__netif_tx_lockà spin_lock(&amp;txq-&gt;_xmit_lock,,保證裝置驅動的獨占通路

HARD_TX_LOCK(dev, txq, smp_processor_id());

if (!netif_tx_queue_stopped(txq) &amp;&amp; //裝置沒有被停止,且發送隊列沒有被當機

!netif_tx_queue_frozen(txq))

ret = <b>dev_hard_start_xmit(skb,</b>

dev, txq); //發送資料包

HARD_TX_UNLOCK(dev, txq);  // 調用__netif_tx_unlock

spin_lock(root_lock);

switch (ret) {

case NETDEV_TX_OK:  //如果裝置成功将資料包發送出去

ret = qdisc_qlen(q); //傳回剩餘的隊列長度

case NETDEV_TX_LOCKED: //擷取裝置鎖失敗

ret = handle_dev_cpu_collision(skb, txq, q);

default: //裝置繁忙,重新入隊發送(利用softirq)

if (unlikely (ret != NETDEV_TX_BUSY &amp;&amp; net_ratelimit()))

printk(KERN_WARNING "BUG %s code %d qlen %d\n",

                   dev-&gt;name, ret,

q-&gt;q.qlen);

ret = dev_requeue_skb(skb, q);

if (ret &amp;&amp; (netif_tx_queue_stopped(txq) ||

netif_tx_queue_frozen(txq)))

ret = 0;

return ret;

   <b>dev_hard_start_xmit</b>

int dev_hard_start_xmit(struct sk_buff

*skb, struct net_device *dev,

struct netdev_queue *txq)

const struct net_device_ops *ops = dev-&gt;netdev_ops;

if

(likely(!skb-&gt;next)) {

//從這裡可以看出,對于每一個發送的包也會發給ptype_all一份,  而packet套接字建立時對于proto為ETH_P_ALL的會在ptype_all中注冊一個成員,是以對于協定号為ETH_P_ALL的packet套接字來說,發送和接受的資料都能收到

if (!list_empty(&amp;ptype_all))

<b> dev_queue_xmit_nit</b>(skb, dev);

if (netif_needs_gso(dev, skb)) {

if (unlikely(dev_gso_segment(skb)))

                goto out_kfree_skb;

if (skb-&gt;next)

                goto gso;

       //如果發送裝置不需要skb-&gt;dst,則在此将其釋放

if (dev-&gt;priv_flags &amp; IFF_XMIT_DST_RELEASE)

skb_dst_drop(skb);

       //調用裝置注冊的發送函數,即dev-&gt;netdev_ops-&gt;<b> ndo_start_xmit(skb, dev)</b>

rc = ops-&gt;<b>ndo_start_xmit(skb,</b>

dev);

if (rc == NETDEV_TX_OK)

txq_trans_update(txq);

……

   <b>dev_queue_xmit_nit</b>

static void dev_queue_xmit_nit(struct

sk_buff *skb, struct net_device *dev)

struct packet_type *ptype;

#ifdef CONFIG_NET_CLS_ACT

if (!(skb-&gt;tstamp.tv64 &amp;&amp; (G_TC_FROM(skb-&gt;tc_verd) &amp;

AT_INGRESS)))

net_timestamp(skb); //記錄該資料包輸入的時間戳

#else

net_timestamp(skb);

#endif

rcu_read_lock();

list_for_each_entry_rcu(ptype, &amp;ptype_all, list) {

 /*

Never send packets back to the socket they originated from */

       //周遊ptype_all連結清單,查找所有符合輸入條件的原始套接口,并循環将資料包輸入到滿足條件的套接口

if ((ptype-&gt;dev == dev || !ptype-&gt;dev) &amp;&amp;

(ptype-&gt;af_packet_priv == NULL ||

(struct sock

*)ptype-&gt;af_packet_priv != skb-&gt;sk)) {

       //由于該資料包是額外輸入到這個原始套接口的,是以需要克隆一個資料包

struct sk_buff *skb2 = skb_clone(skb, GFP_ATOMIC);

if (!skb2)

                break;

      /*

skb-&gt;nh should be correctly(確定頭部偏移正确)

               set by sender, so that the

second statement is

               just protection against buggy

protocols.

             */

skb_reset_mac_header(skb2);

if (skb_network_header(skb2) &lt; skb2-&gt;data ||

                skb2-&gt;network_header &gt;

skb2-&gt;tail) {

                if (net_ratelimit())//net_ratelimit用來保證網絡代碼中printk的頻率

                    printk(KERN_CRIT

"protocol %04x is "

                           "buggy, dev

%s\n",

                           skb2-&gt;protocol,

dev-&gt;name);

                skb_reset_network_header(skb2);

//重新設定L3頭部偏移

skb2-&gt;transport_header = skb2-&gt;network_header;

skb2-&gt;pkt_type = PACKET_OUTGOING;

ptype-&gt;func(skb2, skb-&gt;dev, ptype, skb-&gt;dev);//調用協定(ptype_all)接受函數

rcu_read_unlock();

<b>Ø  環回裝置</b>

對于環回裝置loopback,裝置的ops-&gt;ndo_start_xmit被初始化為loopback_xmit函數。

static const struct net_device_ops

loopback_ops = {

.ndo_init      =

loopback_dev_init,

.ndo_start_xmit= loopback_xmit,

.ndo_get_stats = loopback_get_stats,

};

drivers/net/loopback.c

static netdev_tx_t loopback_xmit(struct

sk_buff *skb,

                 struct net_device *dev)

struct pcpu_lstats *pcpu_lstats, *lb_stats;

int len;

skb_orphan(skb);

    skb-&gt;protocol = eth_type_trans(skb, dev);

/* it's OK to use per_cpu_ptr() because BHs are off */

pcpu_lstats = dev-&gt;ml_priv;

lb_stats = per_cpu_ptr(pcpu_lstats, smp_processor_id());

len = skb-&gt;len;

if (likely(<b>netif_rx(skb) </b>==

NET_RX_SUCCESS)) { //直接調用了netif_rx進行了接收處理

lb_stats-&gt;bytes += len;

lb_stats-&gt;packets++;

lb_stats-&gt;drops++;

return NETDEV_TX_OK;

<b>注:</b>

1. CHECKSUM_PARTIAL表示使用硬體checksum ,L4層的僞頭的校驗已經完畢,并且已經加入uh-&gt;check字段中,此時隻需要裝置計算整個頭4層頭的校驗值。

2. 整個資料包發送邏輯中會涉及到三個用于互斥通路的代碼:

(1)spinlock_t *root_lock = qdisc_lock(q);

(2)test_and_set_bit(__QDISC_STATE_RUNNING, &amp;q-&gt;state)

(3)__netif_tx_lockà spin_lock(&amp;txq-&gt;_xmit_lock)

     其中(1)(3)分别對應一個spinlock,(2)對應一個隊列狀态。在了解代碼中如何使用這三個同步方法時,首先看一下相關資料結構的關系,如下。

linux網絡實作分析(3)——資料包的發送(IP層到鍊路層)

    圖中綠色部分表示(1)(3)兩處spinlock。首先看(1)處對應的代碼:

static inline spinlock_t *qdisc_lock(struct

Qdisc *qdisc)

return &amp;qdisc-&gt;q.lock;

是以root_lock是用于控制<b>qdisc</b><b>中skb</b><b>隊列</b>通路的鎖,當需要對skb隊列進行enqueue、dequeue、requeue時,就需要加鎖。

__QDISC_STATE_RUNNING标志用于保證一個流控對象(qdisc)不會同時被多個cpu通路。

而(3)處的spinlock,即struct netdev_queue中的_xmit_lock,則用于保證dev的注冊函數的互斥通路,即deriver的同步。

另外,核心代碼注釋中寫到,(1)和(3)是互斥的,獲得(1)處的鎖時必須先保證釋放(3)處的鎖,反之亦然,為什麼要這樣還沒有想明白。。。。

3. 已經有了dev_queue_xmit函數,為什麼還需要軟中斷來發送呢?

我們可以看到在dev_queue_xmit中将skb進行了一些處理(比如合并成一個包,計算校驗和等),處理完的skb是可以直接發送的了,這時dev_queue_xmit也會先将skb入隊(skb一般都是在這個函數中入隊的),并且調用qdisc_run嘗試發送,但是有可能發送失敗,這時就将skb重新入隊,排程軟中斷,并且自己直接傳回。

軟中斷隻是發送隊列中的skb以及釋放已經發送的skb,它無需再對skb進行線性化或者校驗和處理。另外在隊列被停止的情況下,dev_queue_xmit仍然可以把包加入隊列,但是不能發送,這樣在隊列被喚醒的時候就需要通過軟中斷來發送停止期間積壓的包。簡而言之,dev_queue_xmit是對skb做些最後的處理并且第一次嘗試發送,軟中斷是将前者發送失敗或者沒發完的包發送出去。(其實發送軟中斷還有一個作用,就是釋放已經發送的包,因為某些情況下發送是在硬體中斷中完成的,為了提高硬體中斷處理效率,核心提供一種方式将釋放skb放到軟中斷中進行,這時隻要調用dev_kfree_skb_irq,它将skb加入softnet_data的completion_queue中,然後開啟發送軟中斷,net_tx_action會在軟中斷中将completion_queue中的skb全部釋放掉)

繼續閱讀