天天看點

Linux TCP資料包接收流程

   TCP接收方存在3種隊列:

1 Backlog Queue (sk->backlog)

2 Prequeue Queue (tp->ucopy.prequeue)

3 Receive Queue (sk->receive_queue)

然後來看3個隊列的差別。

首先sk_backlog隊列是當目前的sock在程序上下文中被使用時,如果這個時候有資料到來,則将資料拷貝到sk_backlog.

prequeue則是資料buffer第一站,一般都是這裡,如果prequeue已滿,則會拷貝資料到receive_queue隊列種。

最後一個receive_queue也就是程序上下文第一個取buffer的隊列

這裡為什麼要有prequeue呢,直接放到receive_queue不就好了.因為receive_queue的處理比較繁瑣

(看tcp_rcv_established的實作就知道了,分為slow path和fast path),而軟中斷每次隻能處理一個資料包

(在一個cpu上),是以為了軟中斷能盡快完成,我們就可以先将資料放到prequeue中(tcp_prequeue),然後軟

中斷就直接傳回. 而處理prequeue就放到程序上下文(tcp_recvmsg調用中)去處理了.

最後在分析tcp_v4_rcv和tcp_recvmsg之前,我們要知道tcp_v4_rcv還是處于軟中斷上下文,

而tcp_recvmsg是處于程序上下文,是以比如socket_lock_t才會提供一個owned來鎖住對應的sock。

而我們也就是需要這3個隊列來進行軟中斷上下文和程序上下文之間的通信。最終當資料拷貝到對應隊列,

則軟中斷調用傳回。這裡要注意的是相同的函數在軟中斷上下文和程序上下文種調用是不同的,我們下面就會看到(比如tcp_rcv_established函數) 。

首先資料包進入軟中斷上下文的tcp_v4_rcv函數

int tcp_v4_rcv(struct sk_buff *skb)

{

const struct iphdr *iph;

struct tcphdr *th;

struct sock *sk;

int ret;

struct net *net = dev_net(skb->dev);

if (skb->pkt_type != PACKET_HOST)

goto discard_it;

/* Count it even if it's bad */

TCP_INC_STATS_BH(net, TCP_MIB_INSEGS);

if (!pskb_may_pull(skb, sizeof(struct tcphdr)))

goto discard_it;

/* 。。。。。。*/

bh_lock_sock_nested(sk);

ret = 0;

if (!sock_owned_by_user(sk)) {

#ifdef CONFIG_NET_DMA

struct tcp_sock *tp = tcp_sk(sk);

if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)

tp->ucopy.dma_chan = dma_find_channel(DMA_MEMCPY);

if (tp->ucopy.dma_chan)

ret = tcp_v4_do_rcv(sk, skb);

else

#endif

{

if (!tcp_prequeue(sk, skb))

ret = tcp_v4_do_rcv(sk, skb);

}

} else

sk_add_backlog(sk, skb);

bh_unlock_sock(sk);

sock_put(sk);

return ret;

該函數的處理過程是:

首先bh_lock_sock_nested調用加自旋鎖

然後判斷目前sock是否被使用者程序占用(sock_owned_by_user函數判斷)。如果沒有的話,就調用tcp_prequeue将資料包加入

prequeue隊列中;否則調用sk_add_backlog将它加入backlog隊列中。

tcp_prequeue調用流程如下

static inline int tcp_prequeue(struct sock *sk, struct sk_buff *skb)

{

struct tcp_sock *tp = tcp_sk(sk);

if (!sysctl_tcp_low_latency && tp->ucopy.task) {

__skb_queue_tail(&tp->ucopy.prequeue, skb);

tp->ucopy.memory += skb->truesize;

if (tp->ucopy.memory > sk->sk_rcvbuf) {

struct sk_buff *skb1;

BUG_ON(sock_owned_by_user(sk));

while ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != NULL) {

sk_backlog_rcv(sk, skb1);

NET_INC_STATS_BH(sock_net(sk), LINUX_MIB_TCPPREQUEUEDROPPED);

}

tp->ucopy.memory = 0;

} else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {

wake_up_interruptible(sk->sk_sleep);

if (!inet_csk_ack_scheduled(sk))

inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,

(3 * TCP_RTO_MIN) / 4,

TCP_RTO_MAX);

}

return 1;

}

return 0;

}

該函數的本意是将資料包加入prequeue隊列,以待tcp_recvmsg函數調用(通過函數tcp_prequeue_process),這時就傳回0;

如果ucopy.task為NULL的話,表示目前沒有pending的程序,函數傳回1,資料包在軟中斷(函數tcp_v4_do_rcv)中處理。

還有一種情況是prequeue已滿,則在軟中斷上下文中處理該隊列中的所有資料包(函數 sk_backlog_rcv)。

最後,如果發現該skb使得prequeue從空變為非空,則調用wake_up_interruptible(sk->sk_sleep)喚醒在該sock上的等待程序

(該程序在tcp_recvmsg函數中通過sk_wait_data調用進入該sock的等待隊列)。

不管是軟中斷中的資料包處理還是系統調用中的資料包的處理,都是調用tcp_v4_do_rcv。在連接配接建立後,該函數的作用是處理資料包,

資料包加入receive queue中。

先分析資料包如何接收到使用者程序的——tcp_recvmsg函數。

 /*

* This routine copies from a sock struct into the user buffer.

*

* Technical note: in 2.3 we work on _locked_ socket, so that

* tricks with *seq access order and skb->users are not required.

* Probably, code can be easily improved even more.

*/

int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,

size_t len, int nonblock, int flags, int *addr_len)

{

struct tcp_sock *tp = tcp_sk(sk);

int copied = 0;

u32 peek_seq;

u32 *seq;

unsigned long used;

int err;

int target; /* Read at least this many bytes */

long timeo;

struct task_struct *user_recv = NULL;

int copied_early = 0;

struct sk_buff *skb;

u32 urg_hole = 0;

/* 需要強調的是這裡的鎖操作隻是把sk->sk_lock.owned置為1,表示目前sock上有使用者程序

而沒有對其spinlock加鎖,是以軟中斷可以把資料包加入backlog中,但此時軟中斷不能對prequeue和receive queue 操作*/

lock_sock(sk);

TCP_CHECK_TIMER(sk);

err = -ENOTCONN;

if (sk->sk_state == TCP_LISTEN)

goto out;

timeo = sock_rcvtimeo(sk, nonblock);

/* Urgent data needs to be handled specially. */

if (flags & MSG_OOB)

goto recv_urg;

/* copied_seq表示下一被應用程式讀取的序号,當設定了MSG_PEEK時,seq指針指向一個自動變量,其含義在于當讀取後,下次讀取任然從原有的位置開始 */

seq = &tp->copied_seq;

if (flags & MSG_PEEK) {

peek_seq = tp->copied_seq;

seq = &peek_seq;

}

/* 如果設定了MSG_WAITALL,則讀取資料直到指定長度len */

target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);

下一步從receive queue中讀取資料包

do {

u32 offset;

/* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */

if (tp->urg_data && tp->urg_seq == *seq) {

if (copied)

break;

if (signal_pending(current)) {

copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;

break;

}

}

/* Next get a buffer. */

/* 從receive queue 中找到可用的資料包 */

skb = skb_peek(&sk->sk_receive_queue);

do {

if (!skb)

break;

/* Now that we have two receive queues this

* shouldn't happen.

*/

/* 存在hole,不可能發生*/

if (before(*seq, TCP_SKB_CB(skb)->seq)) {

printk(KERN_INFO "recvmsg bug: copied %X "

"seq %X/n", *seq, TCP_SKB_CB(skb)->seq);

break;

}

/* offset不為0,表示資料有重疊 */

offset = *seq - TCP_SKB_CB(skb)->seq;

/* SYN占用一個序号 */

if (tcp_hdr(skb)->syn)

offset--;

/* 目前skb有資料可讀 */

if (offset < skb->len)

goto found_ok_skb;

if (tcp_hdr(skb)->fin)

goto found_fin_ok;

WARN_ON(!(flags & MSG_PEEK));

skb = skb->next;

} while (skb != (struct sk_buff *)&sk->sk_receive_queue);

receive queue的特征是

(1) already acked

(2) guaranteed in order

(3) contain no holes but

(4) apparently may contain overlapping data(資料可能重疊)

當receive queue沒有可用資料或已經讀取完後,進入下面流程

/* Well, if we have backlog, try to process it now yet. */

/* copied表示已經讀取的資料量,target表示最少讀取量,如果copied大于target并且backlog隊列為空,則接收過程結束*/

if (copied >= target && !sk->sk_backlog.tail)

break;

/* 下面是出錯以及信号處理*/

if (copied) {

if (sk->sk_err ||

sk->sk_state == TCP_CLOSE ||

(sk->sk_shutdown & RCV_SHUTDOWN) ||

!timeo ||

signal_pending(current))

break;

} else {

if (sock_flag(sk, SOCK_DONE))

break;

if (sk->sk_err) {

copied = sock_error(sk);

break;

}

if (sk->sk_shutdown & RCV_SHUTDOWN)

break;

if (sk->sk_state == TCP_CLOSE) {

if (!sock_flag(sk, SOCK_DONE)) {

/* This occurs when user tries to read

* from never connected socket.

*/

copied = -ENOTCONN;

break;

}

break;

}

if (!timeo) {

copied = -EAGAIN;

break;

}

if (signal_pending(current)) {

copied = sock_intr_errno(timeo);

break;

}

}

接下來程式調用函數

tcp_cleanup_rbuf(sk, copied);

該函數的主要作用是發送一個通告視窗更新的ACK,因為使用者程序消費了讀緩存中的資料。

流程到此的條件是:

● the receive queue is empty, 

● no serious errors or state changes were noted and

● we haven't consumed sufficient data to return to the caller.

/* 第一次到來時,task和user_recv都為NULL,是以裝載該程序為sock的目前任務*/

if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {

/* Install new reader */

if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {

user_recv = current;

tp->ucopy.task = user_recv;

tp->ucopy.iov = msg->msg_iov;

}

tp->ucopy.len = len;

WARN_ON(tp->copied_seq != tp->rcv_nxt &&

!(flags & (MSG_PEEK | MSG_TRUNC)));

/* Ugly... If prequeue is not empty, we have to

* process it before releasing socket, otherwise

* order will be broken at second iteration.

* More elegant solution is required!!!

*

* Look: we have the following (pseudo)queues:

*

* 1. packets in flight

* 2. backlog

* 3. prequeue

* 4. receive_queue

*

* Each queue can be processed only if the next ones

* are empty. At this point we have empty receive_queue.

* But prequeue _can_ be not empty after 2nd iteration,

* when we jumped to start of loop because backlog

* processing added something to receive_queue.

* We cannot release_sock(), because backlog contains

* packets arrived _after_ prequeued ones.

*

* Shortly, algorithm is clear --- to process all

* the queues in order. We could make it more directly,

* requeueing packets from backlog to prequeue, if

* is not empty. It is more elegant, but eats cycles,

* unfortunately.

*/

/* prequeue不為空,先處理 */

if (!skb_queue_empty(&tp->ucopy.prequeue))

goto do_prequeue;

/* __ Set realtime policy in scheduler __ */

}

if (copied >= target) {

/* 讀取了足夠的資料,但是backlog中還有資料,是以調用release_sock來處理該

隊列中的資料包(tcp_v4_do_recv函數)*/

/* Do not sleep, just process backlog. */

release_sock(sk);

lock_sock(sk);

} else

/* 資料讀取未完成,也不确定backlog中是否有資料,是以需要一個等待的操作*/

sk_wait_data(sk, &timeo);

分析sk_wait_data函數

int sk_wait_data(struct sock *sk, long *timeo)

{

int rc;

DEFINE_WAIT(wait);

/* 加入等待隊列,該等待隊列當有資料包進入prequeue或receive queue時喚醒*/

prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);

set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);

rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));

clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);

finish_wait(sk->sk_sleep, &wait);

return rc;

}

#define sk_wait_event(__sk, __timeo, __condition) /

({ int __rc;

/* 解鎖時可能會處理backlog中資料包,如果有的話,__rc就為1,無需等待

;沒有可處理的話,就置used成員為0,這樣軟中斷可以接收資料到prequeue隊列中,重而喚醒本程序 */ /

release_sock(__sk); /

__rc = __condition; /

if (!__rc) { /

*(__timeo) = schedule_timeout(*(__timeo)); /

} /

lock_sock(__sk); /

__rc = __condition; /

__rc; /

})

if (user_recv) {

int chunk;

/* ucopy.len初始值為len,但在tcp_rcv_established中會減小,減少量為copy到使用者程序中的資料大小*/

/* __ Restore normal policy in scheduler __ */

if ((chunk = len - tp->ucopy.len) != 0) {

NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);

len -= chunk;

copied += chunk;

}

/* 確定資料有序*/

if (tp->rcv_nxt == tp->copied_seq &&

!skb_queue_empty(&tp->ucopy.prequeue)) {

do_prequeue: /* 調用 tcp_v4_do_rcv處理prequeue中skb */

tcp_prequeue_process(sk);

if ((chunk = len - tp->ucopy.len) != 0) {

NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);

len -= chunk;

copied += chunk;

}

}

}

if ((flags & MSG_PEEK) &&

(peek_seq - copied - urg_hole != tp->copied_seq)) {

if (net_ratelimit())

printk(KERN_DEBUG "TCP(%s:%d): Application bug, race in MSG_PEEK./n",

current->comm, task_pid_nr(current));

peek_seq = tp->copied_seq;

}

/* 開始下一次循環,接下來的是receive queue中skb的資料讀取,是以不進入 */

continue;

接下來的是receive queue中skb的資料讀取

found_ok_skb:

/* Ok so how much can we use? */

used = skb->len - offset;

if (len < used)

used = len;

/* Do we have urgent data here? */

if (tp->urg_data) {

u32 urg_offset = tp->urg_seq - *seq;

if (urg_offset < used) {

if (!urg_offset) {

if (!sock_flag(sk, SOCK_URGINLINE)) {

++*seq;

urg_hole++;

offset++;

used--;

if (!used)

goto skip_copy;

}

} else

used = urg_offset;

}

}

if (!(flags & MSG_TRUNC)) {

#ifdef CONFIG_NET_DMA

#endif

{

err = skb_copy_datagram_iovec(skb, offset,

msg->msg_iov, used);

if (err) {

/* Exception. Bailout! */

if (!copied)

copied = -EFAULT;

break;

}

}

}

*seq += used;

copied += used;

len -= used;

/* 調整TCP接收緩存空間 */

tcp_rcv_space_adjust(sk);

skip_copy:

if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {

tp->urg_data = 0;

tcp_fast_path_check(sk);

}

if (used + offset < skb->len)

continue;

if (tcp_hdr(skb)->fin)

goto found_fin_ok;

if (!(flags & MSG_PEEK)) {

sk_eat_skb(sk, skb, copied_early);

copied_early = 0;

}

continue;

found_fin_ok:

/* Process the FIN. */

++*seq;

if (!(flags & MSG_PEEK)) {

sk_eat_skb(sk, skb, copied_early);

copied_early = 0;

}

break;

} while (len > 0);

最後在跳出循環後,prequeue隊列又一次被處理(因為其中可能還有資料,可以讀取到本程序中)

if (user_recv) {

if (!skb_queue_empty(&tp->ucopy.prequeue)) {

int chunk;

tp->ucopy.len = copied > 0 ? len : 0;

tcp_prequeue_process(sk);

if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) {

NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);

len -= chunk;

copied += chunk;

}

}

/* 處理完後task複位為NULL, 表示目前sock沒有程序占用 */

tp->ucopy.task = NULL;

tp->ucopy.len = 0;

}

資料包處理函數将在後面分析

繼續閱讀