天天看點

Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

  核心版本:2.6.34

UDP封包接收

       UDP封包的接收可以分為兩個部分:協定棧收到udp封包,插入相應隊列中;使用者調用recvfrom()或recv()系統調用從隊列中取出封包,這裡的隊列就是sk->sk_receive_queue,它是封包中轉的紐帶,兩部分的聯系如下圖所示。

Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

第一部分:協定棧如何收取udp封包的。

      udp子產品的注冊在inet_init()中,當收到的是udp封包,會調用udp_protocol中的handler函數udp_rcv()。

if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
 printk(KERN_CRIT "inet_init: Cannot add UDP protocol\n");
           

      udp_rcv() -> __udp4_lib_rcv() 完成udp封包接收,初始化udp的校驗和,并不驗證校驗和的正确性。

if (udp4_csum_init(skb, uh, proto))
 goto csum_error;
           

      在udptable中以套接字的[saddr, sport, daddr, dport]查找相應的sk,在上一篇中已詳細講過”sk的查找”,這裡封包的source源端口相當于源主機的端口,dest目的端口相當于本地端口。

sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);
           

      如果udptable中存在相應的sk,即有socket在接收,則通過udp_queue_rcv_skb()将封包skb入隊列,該函數稍後分析,總之,封包會被放到sk->sk_receive_queue隊列上,然後sock_put()減少sk的引用計算,并傳回。之後的接收工作的完成将有賴于使用者的操作。

if (sk != NULL) {
 int ret = udp_queue_rcv_skb(sk, skb);
 sock_put(sk);
if (ret > 0)
  return -ret;
 return 0;
}
           

      當沒有在udptable中找到sk時,則本機沒有socket會接收它,是以要發送icmp不可達封包,在此之前,還要驗證校驗和udp_lib_checksum_complete(),如果校驗和錯誤,則直接丢棄封包;如果校驗和正确,則會增加mib中的統計,并發送icmp端口不可達封包,然後丢棄該封包。

if (udp_lib_checksum_complete(skb))
 goto csum_error;
UDP_INC_STATS_BH(net, UDP_MIB_NOPORTS, proto == IPPROTO_UDPLITE);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0);
kfree_skb(skb);
           

udp_queue_rcv_skb() 封包入隊列

      sock_woned_by_user()判斷sk->sk_lock.owned的值,如果等于1,表示sk處于占用狀态,此時不能向sk接收隊列中添加skb,執行else if部分,sk_add_backlog()将skb添加到sk->sk_backlog隊列上;如果等于0,表示sk沒被占用,執行if部分,__udp_queue_rcv_skb()将skb添加到sk->sk_receive_queue隊列上。

bh_lock_sock(sk);
if (!sock_owned_by_user(sk))
 rc = __udp_queue_rcv_skb(sk, skb);
else if (sk_add_backlog(sk, skb)) {
 bh_unlock_sock(sk);
 goto drop;
}
bh_unlock_sock(sk);
           

      那麼何時sk會被占用?何時sk->sk_backlog上的skb被處理的?

      建立socket時,sys_socket() -> inet_create() -> sk_alloc() -> sock_lock_init() -> sock_lock_init_class_and_name()初始化sk->sk_lock_owned=0。

      比如當銷毀socket時,udp_destroy_sock()會調用lock_sock()對sk加鎖,操作完後,調用release_sock()對sk解鎖。

void udp_destroy_sock(struct sock *sk)
{
 lock_sock(sk);
 udp_flush_pending_frames(sk);
 release_sock(sk);
}
           

      實際上,lock_sock()設定sk->sk_lock.owned=1;而release_sock()設定sk->sk_lock.owned=0,并處理sk_backlog隊列上的封包,release_sock() -> __release_sock(),對于sk_backlog隊列上的每個封包,調用sk_backlog_rcv() -> sk->sk_backlog_rcv()。同樣是在socket的建立中,sk->sk_backlog_rcv = sk->sk_prot->backlog_rcv()即__udp_queue_rcv_skb(),這個函數的作用上面已經講過,将skb添加到sk_receive_queue,這樣,所有的sk_backlog上的封包轉移到了sk_receive_queue上。簡單來說,sk_backlog隊列的作用就是,鎖定時封包臨時存放在此,解鎖時,封包移到sk_receive_queue隊列。 

Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

第二部分:使用者如何收取封包

      使用者可以調用sys_recvfrom()或sys_recv()來接收封包,所不同的是,sys_recvfrom()可能通過參數獲得封包的來源位址,而sys_recv()則不可以,但對接收封包并沒有影響。在使用者調用recvfrom()或recv()接收封包前,發給該socket的封包都會被添加到sk->sk_receive_queue上,recvfrom()和recv()要做的就是從sk_receive_queue上取出封包,拷貝到使用者空間,供使用者使用。

      sys_recv() -> sys_recvfrom()

      sys_recvfrom() -> sk->ops->recvmsg() 

                            ==> sock_common_recvmsg() -> sk->sk_prot->recvmsg()

                            ==> udp_recvmsg()

sys_recvfrom()

      調用sock_recvmsg()接收udp封包,存放在msg中,如果接收到封包,從核心到使用者空間拷貝封包的源位址到addr中,addr是recvfrom()調用的傳入參數,表示封包源的位址。而封包的内容是在udp_recvmsg()中從核心拷貝到使用者空間的。

err = sock_recvmsg(sock, &msg, size, flags);
if (err >= 0 && addr != NULL) {
 err2 = move_addr_to_user((struct sockaddr *)&address,
    msg.msg_namelen, addr, addr_len);
 if (err2 < 0)
  err = err2;
}
           

udp_recvmsg() 接收udp封包

      這個函數有三個關鍵操作:

        1. 取到資料包 -- __skb_recv_datagram()

        2. 拷貝資料 -- skb_copy_datagram_iovec()或skb_copy_and csum_datagram_iovec()

        3. 必要時計算校驗和 – skb_copy_and_csum_datagram_iovec() 

Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

      __skb_recv_datagram(),它會從sk->sk_receive_queue上取出一個skb,前面已經分析到,核心收到發往該socket的封包會放在sk->sk_receive_queue。

skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0), &peeked, &err);
           

      如果沒有封包,有兩種情況:使用了非阻塞接收,且使用者接收時還沒有封包到來;使用阻塞接收,但之前沒有封包,且在sk->sk_rcvtimeo時間内都沒有封包到來。沒有封包,傳回錯誤值。

if (!skb)
 goto out;
           

      len是recvfrom()傳入buf的大小,ulen是封包内容的長度,如果ulen > len,那麼隻需要使用buf的ulen長度就可以了;如果len < ulen,那麼buf不夠封包填充,隻能對封包截斷,取前len個位元組。

ulen = skb->len - sizeof(struct udphdr);
if (len > ulen)
 len = ulen;
else if (len < ulen)
 msg->msg_flags |= MSG_TRUNC;
           

      如果封包被截斷或使用UDP-Lite,那麼需要提前驗證校驗和,udp_lib_checksum_complete()完成校驗和計算,函數在下面具體分析。

if (len < ulen || UDP_SKB_CB(skb)->partial_cov) {
 if (udp_lib_checksum_complete(skb))
  goto csum_copy_err;
}
           

      如果封包不用驗證校驗和,那麼執行if部分,調用skb_copy_datagram_iovec()直接拷貝封包到buf中就可以了;如果封包需要驗證校驗和,那麼執行else部分,調用skb_copy_and_csum_datagram_iovec()拷貝封包到buf,并在拷貝過程中計算校驗和。這也是為什麼在核心收到udp封包時為什麼先驗證校驗和再處理的原因,udp封包可能很大,校驗和的計算可能很耗時,将其放在拷貝過程中可以節約開銷,當然它的代價是一些校驗和錯誤的封包也會被添加到socket的接收隊列上,直到使用者真正接收時它們才會被丢棄。

if (skb_csum_unnecessary(skb))
 err = skb_copy_datagram_iovec(skb, sizeof(struct udphdr), msg->msg_iov, len);
else {
 err = skb_copy_and_csum_datagram_iovec(skb, sizeof(struct udphdr), msg->msg_iov);
 if (err == -EINVAL)
  goto csum_copy_err;
}
           

      拷貝位址到msg->msg_name中,在sys_recvfrom()中msg->msg_name=&address,然後address會從核心拷貝給使用者空間的addr。

if (sin) {
 sin->sin_family = AF_INET;
 sin->sin_port = udp_hdr(skb)->source;
 sin->sin_addr.s_addr = ip_hdr(skb)->saddr;
 memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
}
           

      下面來重點看核心操作的三個函數:

__skb_recv_datagram()   從sk_receive_queue上取一個skb

      核心代碼段如下,skb_peek()從sk->sk_receive_queue中取出一個skb,如果有的話,則傳回skb,作為使用者此次接收的封包,當然還有對skb的後續處理,但該函數隻是取出一個skb;如果還沒有的話,則使用wait_for_packet()等待封包到來,其中參數timeo代表等待的時間,如果使用非阻塞接收的話,timeo會設定為0(即目前沒有skb的話則直接傳回,不進行等待),否則設定為sk->sk_rcvtimeo。

do {
 ……
 skb = skb_peek(&sk->sk_receive_queue);
 if (skb) {
  *peeked = skb->peeked;
  if (flags & MSG_PEEK) {
   skb->peeked = 1;
   atomic_inc(&skb->users);
  } else
   __skb_unlink(skb, &sk->sk_receive_queue);
 }
 if (skb)
  return skb;
……
} while (!wait_for_packet(sk, err, &timeo));
           

skb_copy_datagram_iovec()   拷貝skb内容到msg中

      拷貝可以分三部分:線性位址空間的拷貝,聚合/發散位址空間的拷貝,非線性位址空間的拷貝。第二部分需要硬體的支援,這裡讨論另兩部分。

      在skb的buff中的是線性位址空間,在skb的frag_list上的是非線性位址空間;當沒有分片發生的,用線性位址空間就足夠了,但是當封包過長而分片時,第一個分片會使用線性位址空間,其餘的分片将被鍊到skb的frag_list上,即非線性位址空間,具體可以參考”ipv4子產品”中分片部分。

      拷貝封包内容時,就要将線性和非線性空間的内容都拷貝過去。下面是拷貝線性位址空間的代碼段,start是封包的線性部分長度(skb->len-skb->datalen),copy是線性位址空間的大小,offset是相對skb的偏移(即此次拷貝從哪裡開始),以udp封包為例,這幾個值如下圖所示。memcpy_toiovec()拷貝核心到to中,要注意的是它改變了to的成員變量。

int start = skb_headlen(skb);
int i, copy = start - offset;
if (copy > 0) {
 if (copy > len)
  copy = len;
 if (memcpy_toiovec(to, skb->data + offset, copy))
  goto fault;
 if ((len -= copy) == 0)
  return 0;
 offset += copy;
}
           
Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

      下面是拷貝非線性位址空間的代碼段,周遊skb的frag_list連結清單,對上面的每個分片,拷貝内容到to中,這裡start, end的值不重要,重要的是它們的內插補點end-start,表示了目前分片frag_iter的長度,使用skb_copy_datagram_iovec()拷貝目前分片内容,即把每個分片都作為單獨封包來處理。不過對于分片,感覺隻有拷貝的第一部分和第二部分,在IP層分片重組時,并沒有将分片鍊在分片的frag_list上的情況,而都鍊在頭分片的frag_list上。

skb_walk_frags(skb, frag_iter) {
 int end;
 end = start + frag_iter->len;
 if ((copy = end - offset) > 0) {
  if (copy > len)
   copy = len;
  if (skb_copy_datagram_iovec(frag_iter,
    offset - start, to, copy))
   goto fault;
  if ((len -= copy) == 0)
   return 0;
  offset += copy;
 }
 start = end;
}
           

      還是以一個例子來說明,主機收到一個udp封包,内容長度為4000 bytes,MTU是1500,傳入buff數組大小也為4000。根據MTU,封包會會被分成三片,分片IP報内容大小依次是1480, 1480, 1040。每個分片都有一個20節字的IP封包,第一個分片還有一個8節字的udp報頭。接收時資料拷貝情況如下: 

Linux核心分析 - 網絡[十二]:UDP子產品 - 收發
Linux核心分析 - 網絡[十二]:UDP子產品 - 收發
Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

      分片一是第一個分片,包含UDP封包,在拷貝時要跳過,因為使用的是udp socket接收,隻要封包内容就可以了。三張圖檔代表了三次調用skb_copy_datagram_iovec()的情況,iov是存儲内容的buff,最終結果是三個分片共4000位元組拷貝到了iov中。

memcpy_toiovec()函數需要注意,不僅因為它改變了iovec的成員值,還因為最後的iov++。在udp socket的接收recvfrom()中,msg.msg_iov = &iov,而iov定義成struct iovec iov,即傳入參數iov實際隻有一個的空間,那麼在iov++後,iov将指向非法的位址。這裡隻考慮udp使用時的情況,memcpy_toiovec()調用的前一句是,這裡len是接收buff的長度:

if (copy > len)
 copy = len;
           

      而memcpy_toiovec()中又有int copy = min_t(unsigned int, iov->iov_len, len),這裡len是上面傳入的copy,iov_len是接收buff長度,這兩句保證了函數中copy值與len相等,即完成一次拷貝後,len-=copy會使len==0,雖然iov++指向了非法記憶體,但由于while(len > 0)已退出,是以不會使用iov做任何事情。其次,函數中的iov++并不會對參數iov産生影響,即函數完成iov還是傳入的值。最後,拷貝完後會修改iov_len和iov_base的值,iov_len表示可用長度,iov_base表示起始拷貝位置。

int memcpy_toiovec(struct iovec *iov, unsigned char *kdata, int len)
{
 while (len > 0) {
  if (iov->iov_len) {
   int copy = min_t(unsigned int, iov->iov_len, len);
   if (copy_to_user(iov->iov_base, kdata, copy))
    return -EFAULT;
   kdata += copy;
   len -= copy;
   iov->iov_len -= copy;
   iov->iov_base += copy;
  }
  iov++;
 }
 return 0;
}
           

skb_copy_and_csum_datagram_iovec()   拷貝skb内容到msg中,同時計算校驗和

      這個函數提高了校驗和計算效率,因為它合并了拷貝與計算操作,這樣隻要一次周遊操作就可以了。與skb_copy_datagram_iovec()相比,它在每次拷貝skb内容時,計算下這次拷貝内容的校驗和。

csum = csum_partial(skb->data, hlen, skb->csum);
if (skb_copy_and_csum_datagram(skb, hlen, iov->iov_base, chunk, &csum))
 goto fault; 
           

UDP封包發送

      發送時有兩種調用方式:sys_send()和sys_sendto(),兩者的差別在于sys_sendto()需要給入目的位址的參數;而sys_send()調用前需要調用sys_connect()來綁定目的位址資訊;兩者的後續調用是相同的。如果調用sys_sendto()發送,位址資訊在sys_sendto()中從使用者空間拷貝到核心空間,而封包内容在udp_sendmsg()中從使用者空間拷貝到核心空間。

      sys_send() -> sys_sendto()

      sys_sendto() -> sock_sendmsg() -> __sock_sendmsg() -> sock->ops->sendmsg()

                         ==> inet_sendmsg() -> sk->sk_prot->sendmsg()

                         ==> udp_sendmsg()

      udp_sendmsg()的核心流程如下圖所示,隻列出了核心的函數調用了參數指派,大緻步驟是:擷取資訊 -> 擷取路由項rt -> 添加資料 -> 發送資料。 

Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

      udp_sock結構體中的pending用于辨別目前udp_sock上是否有待發送資料,如果有的話,則直接goto do_append_data繼續添加資料;否則先要做些初始化工作,再才添加資料。實際上,pending!=0表示此調用前已經有資料在udp_sock中的,每次調和sendto()發送資料時,pending初始等于0;在添加資料時,設定up->pending = AF_INET。直到最後調用udp_push_pending_frames()将資料發送給IP層或skb_queue_empty(&sk->sk_write_queue)發送連結清單上為空,這時設定up->pending = 0。是以,這裡可以看到,封包發送時pending值的變化: 

Linux核心分析 - 網絡[十二]:UDP子產品 - 收發

      通常使用sendto()發送都是一次調用對應一個封包,即pending=0->AF_INET->0;但如果調用sendto()時參數用到了MSG_MORE标志,則pending=0->AF_INET,直到調用sendto()時未使用MSG_MORE标志,表示此次發送資料是最後一部分資料時,pending=AF_INET->0。

if (up->pending) {
 lock_sock(sk);
 if (likely(up->pending)) {
  if (unlikely(up->pending != AF_INET)) {
   release_sock(sk);
   return -EINVAL;
  }
  goto do_append_data;
 }
 release_sock(sk);
}
           

      如果pending=0沒有待發送資料,執行初始化操作:封包長度、位址資訊、路由項。

      ulen初始為sendto()傳入的資料長度,由于是第一部分資料(如果沒有後續資料,則就是封包),ulen要添加udp報頭的8位元組。

ulen += sizeof(struct udphdr);
           

      這段代碼擷取要發送資料的目的位址和端口号。一種情況是調用sendto()發送資料,此時目的的資訊以參數傳入,存儲在msg->msg_name中,是以從中取出daddr和dport;另一種情況是調用connect(), send()發送資料,在connect()調用時綁定了目的的資訊,存儲在inet中,并且由于是調用了connect(),sk->sk_state會設定為TCP_ESTABLISHED。以後調用send()發送資料時,無需要再給入目的資訊參數,是以從inet中取出dadr和dport。而connected表示了該socket是否已綁定目的。

if (msg->msg_name) {
 struct sockaddr_in * usin = (struct sockaddr_in *)msg->msg_name;
 if (msg->msg_namelen < sizeof(*usin))
  return -EINVAL;
 if (usin->sin_family != AF_INET) {
  if (usin->sin_family != AF_UNSPEC)
   return -EAFNOSUPPORT;
 }

 daddr = usin->sin_addr.s_addr;
 dport = usin->sin_port;
 if (dport == 0)
  return -EINVAL;
} else {
 if (sk->sk_state != TCP_ESTABLISHED)
  return -EDESTADDRREQ;
 daddr = inet->inet_daddr;
 dport = inet->inet_dport;
 connected = 1;
}
           

      下一步是擷取路由項rt,如果已連接配接(調用過connect),則路由資訊在connect()時已擷取,直接拿就可以了;如果未連接配接或拿到的路由項已被删除,則需要重新在路由表中查找,還是使用ip_route_output_flow()來查找,如果是連接配接狀态的socket,則要用新找到的rt來更新socket,當然,前提條件是之前的rt已過期。

if (rt == NULL) {
 ……
 err = ip_route_output_flow(net, &rt, &fl, sk, 1);
 ……
 if (connected)
  sk_dst_set(sk, dst_clone(&rt->u.dst));
}
           

      存儲資訊daddr, dport, saddr, sport到cork.fl中,它們會在生成udp報頭和計算udp校驗和時用到。up->pending=AF_INET辨別了資料添加的開始,下面将開始資料的添加工作。

inet->cork.fl.fl4_dst = daddr;
inet->cork.fl.fl_ip_dport = dport;
inet->cork.fl.fl4_src = saddr;
inet->cork.fl.fl_ip_sport = inet->inet_sport;
up->pending = AF_INET;
           

      如果pending!=0或執行完初始化操作,則直接執行添加資料操作:

      up->len表示要發送資料的總長度,包括udp報頭,是以每發送一部分資料就要累加它的長度,在發送後up->len被清0。然後調用ip_append_data()添加資料到sk->sk_write_queue,它會處理資料分片等問題,在 ”ICMP子產品” 中有詳細分析過。

up->len += ulen;
getfrag  =  is_udplite ?  udplite_getfrag : ip_generic_getfrag;
err = ip_append_data(sk, getfrag, msg->msg_iov, ulen,
  sizeof(struct udphdr), &ipc, &rt,
  corkreq ? msg->msg_flags|MSG_MORE : msg->msg_flags);
           

      ip_append_data()添加資料正确會傳回0,否則udp_flush_pending_frames()丢棄将添加的資料;如果添加資料正确,且沒有後續的資料到來(由MSG_MORE來辨別),則udp_push_pending_frames()将資料發送給IP層,下面将詳細分析這個函數。最後一種情況是當sk_write_queue上為空時,它觸發的條件必須是發送多個封包且sk_write_queue上為空,而實際上在ip_append_data過後sk_write_queue不會為空的,是以正常情況下并不會發生。哪種情況會發生呢?重置pending值為0就是在這裡完成的,三個條件語句都會将pending設定為0。

if (err)
 udp_flush_pending_frames(sk);
else if (!corkreq)
 err = udp_push_pending_frames(sk);
else if (unlikely(skb_queue_empty(&sk->sk_write_queue)))
 up->pending = 0;
           

       資料已經處理完成,釋放取到的路由項rt,如果有IP選項,也釋放它。如果發送資料成功,傳回發送的長度len;否則根據錯誤值err進行錯誤處理并傳回err。

ip_rt_put(rt);
if (free)
 kfree(ipc.opt);
if (!err)
 return len;
if (err == -ENOBUFS || test_bit(SOCK_NOSPACE, &sk->sk_socket->flags)) {
 UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_SNDBUFERRORS, is_udplite);
}
return err;
           

      在 “ICMP子產品” 中往IP層發送資料使用的是ip_push_pending_frames()。而在UDP子產品中往IP層發送資料使用的是ip_push_pending_frames()。而在UDP子產品中往IP層發送資料的udp_push_pending_frames()隻是對ip_push_pending_frames()的封裝,主要是增加對UDP的報頭的處理。同理,udp_flush_pending_frames()也是,隻是它更簡單,僅僅重置了up->len和up->pending的值,重置後可以開始一個新封包。那麼udp_push_pending_frames()封裝了哪些處理呢。

udp_push_pending_frames() 發送資料給IP層

      設定udp報頭,包括源端口source,目的端口dest,封包長度len。

uh = udp_hdr(skb);
uh->source = fl->fl_ip_sport;
uh->dest = fl->fl_ip_dport;
uh->len = htons(up->len);
uh->check = 0;
           

      計算udp報頭中的校驗和,包括了僞報頭、udp報頭和封包内容。

if (is_udplite)
 csum  = udplite_csum_outgoing(sk, skb);
else if (sk->sk_no_check == UDP_CSUM_NOXMIT) {   /* UDP csum disabled */
 skb->ip_summed = CHECKSUM_NONE;
 goto send;
} else if (skb->ip_summed == CHECKSUM_PARTIAL) { /* UDP hardware csum */
 udp4_hwcsum_outgoing(sk, skb, fl->fl4_src, fl->fl4_dst, up->len);
 goto send;
} else       /*   `normal' UDP    */
 csum = udp_csum_outgoing(sk, skb);
uh->check = csum_tcpudp_magic(fl->fl4_src, fl->fl4_dst, up->len, sk->sk_protocol, csum);
           

      将封包發送給IP層,這個函數已經分析過了。

err = ip_push_pending_frames(sk);
           

      同樣,在發送完封包後,重置len和pending的值,以便開始下一個封包發送。

up->len = 0;
up->pending = 0;
           

繼續閱讀