作者:陶輝
這篇文章将試圖說明應用程式如何接收網絡上發送過來的tcp消息流,由于篇幅所限,暫時忽略ack封包的回複和接收視窗的滑動。
為了快速掌握本文所要表達的思想,我們可以帶着以下問題閱讀:
1、應用程式調用read、recv等方法時,socket套接字可以設定為阻塞或者非阻塞,這兩種方式是如何工作的?
2、若socket為預設的阻塞套接字,此時recv方法傳入的len參數,是表示必須逾時(so_rcvtimeo)或者接收到len長度的消息,recv方法才會傳回嗎?而且,socket上可以設定一個屬性叫做so_rcvlowat,它會與len産生什麼樣的交集,又是決定recv等接收方法什麼時候傳回?
3、應用程式開始收取tcp消息,與程式所在的機器網卡上接收到網絡裡發來的tcp消息,這是兩個獨立的流程。它們之間是如何互相影響的?例如,應用程式正在收取消息時,核心通過網卡又在這條tcp連接配接上收到消息時,究竟是如何處理的?若應用程式沒有調用read或者recv時,核心收到tcp連接配接上的消息後又是怎樣處理的?
4、recv這樣的接收方法還可以傳入各種flags,例如msg_waitall、msg_peek、msg_trunk等等。它們是如何工作的?
5、1個socket套接字可能被多個程序在使用,出現并發通路時,核心是怎麼處理這種狀況的?
6、linux的sysctl系統參數中,有類似tcp_low_latency這樣的開關,預設為0或者配置為1時是如何影響tcp消息處理流程的?
書接上文。本文将通過三幅圖講述三種典型的接收tcp消息場景,理清核心為實作tcp消息的接收所實作的4個隊列容器。當然,了解核心的實作并不是目的,而是如何使用socket接口、如何配置作業系統核心參數,才能使tcp傳輸消息更高效,這才是最終目的。
很多同學不希望被核心代碼擾亂了思維,如何閱讀本文呢?
我會在圖1的步驟都介紹完了才來從代碼上說明tcp_v4_rcv等主要方法。像flags參數、非阻塞套接字會産生怎樣的效果我是在代碼介紹中說的。然後我會介紹圖2、圖3,介紹它們的步驟時我會穿插一些上文沒有涉及的少量代碼。不喜歡了解核心代碼的同學請直接看完圖1的步驟後,請跳到圖2、圖3中,我認為這3幅圖覆寫了主要的tcp接收場景,能夠幫助你理清其流程。
接收消息時調用的系統方法要比上一篇發送tcp消息複雜許多。接收tcp消息的過程可以一分為二:首先是pc上的網卡接收到網線傳來的封包,通過軟中斷核心拿到并且解析其為tcp封包,然後tcp子產品決定如何處理這個tcp封包。其次,使用者程序調用read、recv等方法擷取tcp消息,則是将核心已經從網卡上收到的消息流拷貝到使用者程序裡的記憶體中。
第一幅圖描述的場景是,tcp連接配接上将要收到的消息序号是s1(tcp上的每個封包都有序号,詳見《tcp/ip協定詳解》),此時作業系統核心依次收到了序号s1-s2的封包、s3-s4、s2-s3的封包,注意後兩個包亂序了。之後,使用者程序配置設定了一段len大小的記憶體用于接收tcp消息,此時,len是大于s4-s1的。另外,使用者程序始終沒有對這個socket設定過so_rcvlowat參數,是以,接收閥值so_rcvlowat使用預設值1。另外,系統參數tcp_low_latency設定為0,即從作業系統的總體效率出發,使用prequeue隊列提升吞吐量。當然,由于使用者程序收消息時,并沒有新包來臨,是以此圖中prequeue隊列始終為空。先不細表。
圖1如下:
上圖中有13個步驟,應用程序使用了阻塞套接字,調用recv等方法時flag标志位為0,使用者程序讀取套接字時沒有發生程序睡眠。核心在處理接收到的tcp封包時使用了4個隊列容器(當連結清單了解也可),分别為receive、out_of_order、prequeue、backlog隊列,本文會說明它們存在的意義。下面詳細說明這13個步驟。
1、當網卡接收到封包并判斷為tcp協定後,将會調用到核心的tcp_v4_rcv方法。此時,這個tcp連接配接上需要接收的下一個封包序号恰好就是s1,而這一步裡,網卡上收到了s1-s2的封包,是以,tcp_v4_rcv方法會把這個封包直接插入到receive隊列中。
注意:receive隊列是允許使用者程序直接讀取的,它是将已經接收到的tcp封包,去除了tcp頭部、排好序放入的、使用者程序可以直接按序讀取的隊列。由于socket不在程序上下文中(也就是沒有程序在讀socket),由于我們需要s1序号的封包,而恰好收到了s1-s2封包,是以,它進入了receive隊列。
2、接着,我們收到了s3-s4封包。在第1步結束後,這時我們需要收到的是s2序号,但到來的封包卻是s3打頭的,怎麼辦呢?進入out_of_order隊列!從這個隊列名稱就可以看出來,所有亂序的封包都會暫時放在這。
3、仍然沒有進入來讀取socket,但又過來了我們期望的s2-s3封包,它會像第1步一樣,直接進入receive隊列。不同的時,由于此時out_of_order隊列不像第1步是空的,是以,引發了接來的第4步。
4、每次向receive隊列插入封包時都會檢查out_of_order隊列。由于收到s2-s3封包後,期待的序号成為了s3,這樣,out_of_order隊列裡的唯一封包s3-s4封包将會移出本隊列而插入到receive隊列中(這件事由tcp_ofo_queue方法完成)。
5、終于有使用者程序開始讀取socket了。做過應用端程式設計的同學都知道,先要在程序裡配置設定一塊記憶體,接着調用read或者recv等方法,把記憶體的首位址和記憶體長度傳入,再把建立好連接配接的socket也傳入。當然,對這個socket還可以配置其屬性。這裡,假定沒有設定任何屬性,都使用預設值,是以,此時socket是阻塞式,它的so_rcvlowat是預設的1。當然,recv這樣的方法還會接收一個flag參數,它可以設定為msg_waitall、msg_peek、msg_trunk等等,這裡我們假定為最常用的0。程序調用了recv方法。
6、無論是何種接口,c庫和核心經過層層封裝,接收tcp消息最終一定會走到tcp_recvmsg方法。下面介紹代碼細節時,它會是重點。
7、在tcp_recvmsg方法裡,會首先鎖住socket。為什麼呢?是以socket是可以被多程序同時使用的,同時,核心中斷也會操作它,而下面的代碼都是核心的、操作資料的、有狀态的代碼,不可以被重入的,鎖住後,再有使用者程序進來時拿不到鎖就要休眠在這了。核心中斷看到被鎖住後也會做不同的處理,參見圖2、圖3。
8、此時,第1-4步已經為receive隊列裡準備好了3個封包。最上面的封包是s1-s2,将它拷貝到使用者态記憶體中。由于第5步flag參數并沒有攜帶msg_peek這樣的标志位,是以,再将s1-s2封包從receive隊列的頭部移除,從核心态釋放掉。反之,msg_peek标志位會導緻receive隊列不會删除封包。是以,msg_peek主要用于多程序讀取同一套接字的情形。
9、如第8步,拷貝s2-s3封包到使用者态記憶體中。當然,執行拷貝前都會檢查使用者态記憶體的剩餘空間是否足以放下目前這個封包,不足以時會直接傳回已經拷貝的位元組數。
10、同上。
11、receive隊列為空了,此時會先來檢查so_rcvlowat這個閥值。如果已經拷貝的位元組數到現在還小于它,那麼可能導緻程序會休眠,等待拷貝更多的資料。第5步已經說明過了,socket套接字使用的預設的so_rcvlowat,也就是1,這表明,隻要讀取到封包了,就認為可以傳回了。
做完這個檢查了,再檢查backlog隊列。backlog隊列是程序正在拷貝資料時,網卡收到的封包會進這個隊列。此時若backlog隊列有資料,就順帶處理下。圖3會覆寫這種場景。
12、在本圖對應的場景中,backlog隊列是沒有資料的,已經拷貝的位元組數為s4-s1,它是大于1的,是以,釋放第7步裡加的鎖,準備傳回使用者态了。
13、使用者程序代碼開始執行,此時recv等方法傳回的就是s4-s1,即從核心拷貝的位元組數。
圖1描述的場景是最簡單的1種場景,下面我們來看看上述步驟是怎樣通過核心代碼實作的(以下代碼為2.6.18核心代碼)。
我們知道,linux對中斷的處理是分為上半部和下半部的,這是處于系統整體效率的考慮。我們将要介紹的都是在網絡軟中斷的下半部裡,例如這個tcp_v4_rcv方法。圖1中的第1-4步都是在這個方法裡完成的。
<b>[cpp]</b> view plaincopy
1. <b>int</b> tcp_v4_rcv(<b>struct</b> sk_buff *skb)
2. {
3. … …
4. //是否有程序正在使用這個套接字,将會對處理流程産生影響
5. //或者從代碼層面上,隻要在tcp_recvmsg裡,執行lock_sock後隻能進入else,而release_sock後會進入if
6. <b>if</b> (!sock_owned_by_user(sk)) {
7. {
8. //當 tcp_prequeue 傳回0時,表示這個函數沒有處理該封包
9. <b>if</b> (!tcp_prequeue(sk, skb))//如果封包放在prequeue隊列,即表示延後處理,不占用軟中斷過長時間
10. ret = tcp_v4_do_rcv(sk, skb);//不使用prequeue或者沒有使用者程序讀socket時(圖3進入此分支),立刻開始處理這個封包
11. }
12. } <b>else</b>
13. sk_add_backlog(sk, skb);//如果程序正在操作套接字,就把skb指向的tcp封包插入到backlog隊列(圖3涉及此分支)
14. … …
15. }
圖1第1步裡,我們從網絡上收到了序号為s1-s2的包。此時,沒有使用者程序在讀取套接字,是以,sock_owned_by_user(sk)會傳回0。是以,tcp_prequeue方法将得到執行。簡單看看它:
1. <b>static</b> <b>inline</b> <b>int</b> tcp_prequeue(<b>struct</b> sock *sk, <b>struct</b> sk_buff *skb)
3. <b>struct</b> tcp_sock *tp = tcp_sk(sk);
4.
5. //檢查tcp_low_latency,預設其為0,表示使用prequeue隊列。tp->ucopy.task不為0,表示有程序啟動了拷貝tcp消息的流程
6. <b>if</b> (!sysctl_tcp_low_latency && tp->ucopy.task) {
7. //到這裡,通常是使用者程序讀資料時沒讀到指定大小的資料,休眠了。直接将封包插入prequeue隊列的末尾,延後處理
8. __skb_queue_tail(&tp->ucopy.prequeue, skb);
9. tp->ucopy.memory += skb->truesize;
10. //當然,雖然通常是延後處理,但如果tcp的接收緩沖區不夠用了,就會立刻處理prequeue隊列裡的所有封包
11. <b>if</b> (tp->ucopy.memory > sk->sk_rcvbuf) {
12. <b>while</b> ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != null) {
13. //sk_backlog_rcv就是下文将要介紹的tcp_v4_do_rcv方法
14. sk->sk_backlog_rcv(sk, skb1);
15. }
16. } <b>else</b> <b>if</b> (skb_queue_len(&tp->ucopy.prequeue) == 1) {
17. //prequeue裡有封包了,喚醒正在休眠等待資料的程序,讓程序在它的上下文中處理這個prequeue隊列的封包
18. wake_up_interruptible(sk->sk_sleep);
19. }
20.
21. <b>return</b> 1;
22. }
23. //prequeue沒有處理
24. <b>return</b> 0;
25. }
由于tp->ucopy.task此時是null,是以我們收到的第1個封包在tcp_prequeue函數裡直接傳回了0,是以,将由 tcp_v4_do_rcv方法處理。
1. <b>int</b> tcp_v4_do_rcv(<b>struct</b> sock *sk, <b>struct</b> sk_buff *skb)
3. <b>if</b> (sk->sk_state == tcp_established) { /* fast path */
4. //當tcp連接配接已經建立好時,是由tcp_rcv_established方法處理接收封包的
5. <b>if</b> (tcp_rcv_established(sk, skb, skb->h.th, skb->len))
6. <b>goto</b> reset;
7.
8. <b>return</b> 0;
9. }
10. … …
11. }
tcp_rcv_established方法在圖1裡,主要調用tcp_data_queue方法将封包放入隊列中,繼續看看它又幹了些什麼事:
1. <b>static</b> <b>void</b> tcp_data_queue(<b>struct</b> sock *sk, <b>struct</b> sk_buff *skb)
5. //如果這個封包是待接收的封包(看seq),它有兩個出路:進入receive隊列,正如圖1;直接拷貝到使用者記憶體中,如圖3
6. <b>if</b> (tcp_skb_cb(skb)->seq == tp->rcv_nxt) {
7. //滑動視窗外的包暫不考慮,篇幅有限,下次再細談
8. <b>if</b> (tcp_receive_window(tp) == 0)
9. <b>goto</b> out_of_window;
10.
11. //如果有一個程序正在讀取socket,且正準備要拷貝的序号就是目前封包的seq序号
12. <b>if</b> (tp->ucopy.task == current &&
13. tp->copied_seq == tp->rcv_nxt && tp->ucopy.len &&
14. sock_owned_by_user(sk) && !tp->urg_data) {
15. //直接将封包内容拷貝到使用者态記憶體中,參見圖3
16. <b>if</b> (!skb_copy_datagram_iovec(skb, 0, tp->ucopy.iov, chunk)) {
17. tp->ucopy.len -= chunk;
18. tp->copied_seq += chunk;
19. }
20. }
21.
22. <b>if</b> (eaten <= 0) {
23. queue_and_out:
24. //如果沒有能夠直接拷貝到使用者記憶體中,那麼,插入receive隊列吧,正如圖1中的第1、3步
25. __skb_queue_tail(&sk->sk_receive_queue, skb);
26. }
27. //更新待接收的序号,例如圖1第1步中,更新為s2
28. tp->rcv_nxt = tcp_skb_cb(skb)->end_seq;
29.
30. //正如圖1第4步,這時會檢查out_of_order隊列,若它不為空,需要處理它
31. <b>if</b> (!skb_queue_empty(&tp->out_of_order_queue)) {
32. //tcp_ofo_queue方法會檢查out_of_order隊列中的所有封包
33. tcp_ofo_queue(sk);
34. }
35. }
36. … …
37.
38. //這個包是無序的,又在接收滑動視窗内,那麼就如圖1第2步,把封包插入到out_of_order隊列吧
39. <b>if</b> (!skb_peek(&tp->out_of_order_queue)) {
40. __skb_queue_head(&tp->out_of_order_queue,skb);
41. } <b>else</b> {
42. … …
43. __skb_append(skb1, skb, &tp->out_of_order_queue);
44. }
45. }
圖1第4步時,正是通過tcp_ofo_queue方法把之前亂序的s3-s4封包插入receive隊列的。
1. <b>static</b> <b>void</b> tcp_ofo_queue(<b>struct</b> sock *sk)
4. __u32 dsack_high = tp->rcv_nxt;
5. <b>struct</b> sk_buff *skb;
6. //周遊out_of_order隊列
7. <b>while</b> ((skb = skb_peek(&tp->out_of_order_queue)) != null) {
8. … …
9. //若這個封包可以按seq插入有序的receive隊列中,則将其移出out_of_order隊列
10. __skb_unlink(skb, &tp->out_of_order_queue);
11. //插入receive隊列
12. __skb_queue_tail(&sk->sk_receive_queue, skb);
13. //更新socket上待接收的下一個有序seq
14. tp->rcv_nxt = tcp_skb_cb(skb)->end_seq;
15. }
16. }
下面再介紹圖1第6步提到的tcp_recvmsg方法。
1. //參數裡的len就是read、recv方法裡的記憶體長度,flags正是方法的flags參數,nonblock則是阻塞、非阻塞标志位
2. <b>int</b> tcp_recvmsg(<b>struct</b> kiocb *iocb, <b>struct</b> sock *sk, <b>struct</b> msghdr *msg,
3. <b>size_t</b> len, <b>int</b> nonblock, <b>int</b> flags, <b>int</b> *addr_len)
4. {
5. //鎖住socket,防止多程序并發通路tcp連接配接,告知軟中斷目前socket在程序上下文中
6. lock_sock(sk);
8. //初始化errno這個錯誤碼
9. err = -enotconn;
11. //如果socket是阻塞套接字,則取出so_rcvtimeo作為讀逾時時間;若為非阻塞,則timeo為0。下面會看到timeo是如何生效的
12. timeo = sock_rcvtimeo(sk, nonblock);
13.
14. //擷取下一個要拷貝的位元組序号
15. //注意:seq的定義為u32 *seq;,它是32位指針。為何?因為下面每向使用者态記憶體拷貝後,會更新seq的值,這時就會直接更改套接字上的copied_seq
16. seq = &tp->copied_seq;
17. //當flags參數有msg_peek标志位時,意味着這次拷貝的内容,當再次讀取socket時(比如另一個程序)還能再次讀到
18. <b>if</b> (flags & msg_peek) {
19. //是以不會更新copied_seq,當然,下面會看到也不會删除封包,不會從receive隊列中移除封包
20. peek_seq = tp->copied_seq;
21. seq = &peek_seq;
23.
24. //擷取so_rcvlowat最低接收閥值,當然,target實際上是使用者态記憶體大小len和so_rcvlowat的最小值
25. //注意:flags參數中若攜帶msg_waitall标志位,則意味着必須等到讀取到len長度的消息才能傳回,此時target隻能是len
26. target = sock_rcvlowat(sk, flags & msg_waitall, len);
27.
28. //以下開始讀取消息
29. <b>do</b> {
30. //從receive隊列取出1個封包
31. skb = skb_peek(&sk->sk_receive_queue);
32. <b>do</b> {
33. //沒取到退出目前循環
34. <b>if</b> (!skb)
35. <b>break</b>;
36.
38. //offset是待拷貝序号在目前這個封包中的偏移量,在圖1、2、3中它都是0,隻有因為使用者記憶體不足以接收完1個封包時才為非0
39. offset = *seq - tcp_skb_cb(skb)->seq;
40. //有些時候,三次握手的syn包也會攜帶消息内容的,此時seq是多出1的(syn占1個序号),是以offset減1
41. <b>if</b> (skb->h.th->syn)
42. offset–;
43. //若偏移量還有這個封包之内,則認為它需要處理
44. <b>if</b> (offset < skb->len)
45. <b>goto</b> found_ok_skb;
46.
47. skb = skb->next;
48. } <b>while</b> (skb != (<b>struct</b> sk_buff *)&sk->sk_receive_queue);
49.
50. //如果receive隊列為空,則檢查已經拷貝的位元組數,是否達到了so_rcvlowat或者長度len。滿足了,且backlog隊列也為空,則可以傳回使用者态了,正如圖1的第11步
51. <b>if</b> (copied >= target && !sk->sk_backlog.tail)
52. <b>break</b>;
53.
54. //在tcp_recvmsg裡,copied就是已經拷貝的位元組數
55. <b>if</b> (copied) {
56. … …
57. } <b>else</b> {
58. //一個位元組都沒拷貝到,但如果shutdown關閉了socket,一樣直接傳回。當然,本文不涉及關閉連接配接
59. <b>if</b> (sk->sk_shutdown & rcv_shutdown)
60. <b>break</b>;
61.
62. //如果使用了非阻塞套接字,此時timeo為0
63. <b>if</b> (!timeo) {
64. //非阻塞套接字讀取不到資料時也會傳回,錯誤碼正是eagain
65. copied = -eagain;
66. <b>break</b>;
67. }
68. … …
69. }
70.
71. //tcp_low_latency預設是關閉的,圖1、圖2都是如此,圖3則例外,即圖3不會走進這個if
72. <b>if</b> (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
73. //prequeue隊列就是為了提高系統整體效率的,即prequeue隊列有可能不為空,這是因為程序休眠等待時可能有新封包到達prequeue隊列
74. <b>if</b> (!skb_queue_empty(&tp->ucopy.prequeue))
75. <b>goto</b> do_prequeue;
76. }
77.
78. //如果已經拷貝了的位元組數超過了最低閥值
79. <b>if</b> (copied >= target) {
80. //release_sock這個方法會周遊、處理backlog隊列中的封包
81. release_sock(sk);
82. lock_sock(sk);
83. } <b>else</b>
84. sk_wait_data(sk, &timeo);//沒有讀取到足夠長度的消息,是以會程序休眠,如果沒有被喚醒,最長睡眠timeo時間
85.
86. <b>if</b> (user_recv) {
87. <b>if</b> (tp->rcv_nxt == tp->copied_seq &&
88. !skb_queue_empty(&tp->ucopy.prequeue)) {
89. do_prequeue:
90. //接上面代碼段,開始處理prequeue隊列裡的封包
91. tcp_prequeue_process(sk);
92. }
93. }
94.
95. //繼續處理receive隊列的下一個封包
96. <b>continue</b>;
97.
98. found_ok_skb:
99. /* ok so how much can we use? */
100. //receive隊列的這個封包從其可以使用的偏移量offset,到總長度len之間,可以拷貝的長度為used
101. used = skb->len - offset;
102. //len是使用者态空閑記憶體,len更小時,當然隻能拷貝len長度消息,總不能導緻記憶體溢出吧
103. <b>if</b> (len < used)
104. used = len;
105.
106. //msg_trunc标志位表示不要管len這個使用者态記憶體有多大,隻管拷貝資料吧
107. <b>if</b> (!(flags & msg_trunc)) {
108. {
109. //向使用者态拷貝資料
110. err = skb_copy_datagram_iovec(skb, offset,
111. msg->msg_iov, used);
112. }
113. }
114.
115. //因為是指針,是以同時更新copied_seq–下一個待接收的序号
116. *seq += used;
117. //更新已經拷貝的長度
118. copied += used;
119. //更新使用者态記憶體的剩餘空閑空間長度
120. len -= used;
121.
122. … …
123. } <b>while</b> (len > 0);
124.
125. //已經裝載了接收器
126. <b>if</b> (user_recv) {
127. //prequeue隊列不為空則處理之
128. <b>if</b> (!skb_queue_empty(&tp->ucopy.prequeue)) {
129. tcp_prequeue_process(sk);
130. }
131.
132. //準備傳回使用者态,socket上不再裝載接收任務
133. tp->ucopy.task = null;
134. tp->ucopy.len = 0;
135. }
136.
137. //釋放socket時,還會檢查、處理backlog隊列中的封包
138. release_sock(sk);
139. //向使用者傳回已經拷貝的位元組數
140. <b>return</b> copied;
141. }
圖2給出了第2種場景,這裡涉及到prequeue隊列。使用者程序調用recv方法時,連接配接上沒有任何接收并緩存到核心的封包,而socket是阻塞的,是以程序睡眠了。然後網卡中收到了tcp連接配接上的封包,此時prequeue隊列開始産生作用。圖2中tcp_low_latency為預設的0,套接字socket的so_rcvlowat是預設的1,仍然是阻塞socket,如下圖:
簡單描述上述11個步驟:
1、使用者程序配置設定了一塊len大小的記憶體,将其傳入recv這樣的函數,同時socket參數皆為預設,即阻塞的、so_rcvlowat為1。調用接收方法,其中flags參數為0。
2、c庫和核心最終調用到tcp_recvmsg方法來處理。
3、鎖住socket。
4、由于此時receive、prequeue、backlog隊列都是空的,即沒有拷貝1個位元組的消息到使用者記憶體中,而我們的最低要求是拷貝至少so_rcvlowat為1長度的消息。此時,開始進入阻塞式套接字的等待流程。最長等待時間為so_rcvtimeo指定的時間。
這個等待函數叫做sk_wait_data,有必要看下其實作:
1. <b>int</b> sk_wait_data(<b>struct</b> sock *sk, <b>long</b> *timeo)
3. //注意,它的自動喚醒條件有兩個,要麼timeo時間到達,要麼receive隊列不為空
4. rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
5. }
sk_wait_event也值得我們簡單看下:
1. #define sk_wait_event(__sk, __timeo, __condition) \
2. ({ <b>int</b> rc; \
3. release_sock(__sk); \
4. rc = __condition; \
5. <b>if</b> (!rc) { \
6. *(__timeo) = schedule_timeout(*(__timeo)); \
7. } \
8. lock_sock(__sk); \
9. rc = __condition; \
10. rc; \
11. })
注意,它在睡眠前會調用release_sock,這個方法會釋放socket鎖,使得下面的第5步中,新到的封包不再隻能進入backlog隊列。
5、這個套接字上期望接收的序号也是s1,此時網卡恰好收到了s1-s2的封包,在tcp_v4_rcv方法中,通過調用tcp_prequeue方法把封包插入到prequeue隊列中。
6、插入prequeue隊列後,此時會接着調用wake_up_interruptible方法,喚醒在socket上睡眠的程序。參見tcp_prequque方法。
7、使用者程序被喚醒後,重新調用lock_sock接管了這個socket,此後再進來的封包都隻能進入backlog隊列了。
8、程序醒來後,先去檢查receive隊列,當然仍然是空的;再去檢查prequeue隊列,發現有一個封包s1-s2,正好是socket連接配接待拷貝的起始序号s1,于是,從prequeue隊列中取出這個封包并把内容複制到使用者記憶體中,再釋放核心中的這個封包。
9、目前已經拷貝了s2-s1個位元組到使用者态,檢查這個長度是否超過了最低閥值(即len和so_rcvlowat的最小值)。
10、由于so_rcvlowat使用了預設的1,是以準備傳回使用者。此時會順帶再看看backlog隊列中有沒有資料,若有,則檢查這個無序的隊列中是否有可以直接拷貝給使用者的封包。當然,此時是沒有的。是以準備傳回,釋放socket鎖。
11、傳回使用者已經拷貝的位元組數。
圖3給出了第3種場景。這個場景中,我們把系統參數tcp_low_latency設為1,socket上設定了so_rcvlowat屬性的值。伺服器先是收到了s1-s2這個封包,但s2-s1的長度是小于so_rcvlowat的,使用者程序調用recv方法讀套接字時,雖然讀到了一些,但沒有達到最小閥值,是以程序睡眠了,與此同時,在睡眠前收到的亂序的s3-s4包直接進入backlog隊列。此時先到達了s2-s3包,由于沒有使用prequeue隊列,而它起始序号正是下一個待拷貝的值,是以直接拷貝到使用者記憶體中,總共拷貝位元組數已滿足so_rcvlowat的要求!最後在傳回使用者前把backlog隊列中s3-s4封包也拷貝給使用者了。如下圖:
簡明描述上述15個步驟:
1、核心收到封包s1-s2,s1正是這個socket連接配接上待接收的序号,是以,直接将它插入有序的receive隊列中。
2、使用者程序所處的linux作業系統上,将sysctl中的tcp_low_latency設定為1。這意味着,這台伺服器希望tcp程序能夠更及時的接收到tcp消息。使用者調用了recv方法接收socket上的消息,這個socket上設定了so_rcvlowat屬性為某個值n,這個n是大于s2-s1,也就是第1步收到的封包大小。這裡,仍然是阻塞socket,使用者依然是配置設定了足夠大的len長度記憶體以接收tcp消息。
3、通過tcp_recvmsg方法來完成接收工作。先鎖住socket,避免并發程序讀取同一socket的同時,也在告訴核心網絡軟中斷處理到這一socket時要有不同行為,如第6步。
4、準備處理核心各個接收隊列中的封包。
5、receive隊列中的有序封包可直接拷貝,在檢查到s2-s1是小于len之後,将封包内容拷貝到使用者态記憶體中。
6、在第5步進行的同時,socket是被鎖住的,這時核心又收到了一個s3-s4封包,是以封包直接進入backlog隊列。注意,這個封包不是有序的,因為此時連接配接上期待接收序号為s2。
7、在第5步,拷貝了s2-s1個位元組到使用者記憶體,它是小于so_rcvlowat的,是以,由于socket是阻塞型套接字(逾時時間在本文中忽略),程序将不得不轉入睡眠。轉入睡眠之前,還會幹一件事,就是處理backlog隊列裡的封包,圖2的第4步介紹過休眠方法sk_wait_data,它在睡眠前會執行release_sock方法,看看是如何實作的:
1. <b>void</b> fastcall release_sock(<b>struct</b> sock *sk)
3. mutex_release(&sk->sk_lock.dep_map, 1, _ret_ip_);
5. spin_lock_bh(&sk->sk_lock.slock);
6. //這裡會周遊backlog隊列中的每一個封包
7. <b>if</b> (sk->sk_backlog.tail)
8. __release_sock(sk);
9. //這裡是網絡中斷執行時,告訴核心,現在socket并不在程序上下文中
10. sk->sk_lock.owner = null;
11. <b>if</b> (waitqueue_active(&sk->sk_lock.wq))
12. wake_up(&sk->sk_lock.wq);
13. spin_unlock_bh(&sk->sk_lock.slock);
14. }
再看看__release_sock方法是如何周遊backlog隊列的:
1. <b>static</b> <b>void</b> __release_sock(<b>struct</b> sock *sk)
3. <b>struct</b> sk_buff *skb = sk->sk_backlog.head;
5. //周遊backlog隊列
6. <b>do</b> {
7. sk->sk_backlog.head = sk->sk_backlog.tail = null;
8. bh_unlock_sock(sk);
9.
10. <b>do</b> {
11. <b>struct</b> sk_buff *next = skb->next;
12.
13. skb->next = null;
14. //處理封包,其實就是tcp_v4_do_rcv方法,上文介紹過,不再贅述
15. sk->sk_backlog_rcv(sk, skb);
16.
17. cond_resched_softirq();
18.
19. skb = next;
20. } <b>while</b> (skb != null);
22. bh_lock_sock(sk);
23. } <b>while</b>((skb = sk->sk_backlog.head) != null);
24. }
此時周遊到s3-s4封包,但因為它是失序的,是以從backlog隊列中移入out_of_order隊列中(參見上文說過的tcp_ofo_queue方法)。
8、程序休眠,直到逾時或者receive隊列不為空。
9、核心接收到了s2-s3封包。注意,這裡由于打開了tcp_low_latency标志位,這個封包是不會進入prequeue隊列以待程序上下文處理的。
10、此時,由于s2是連接配接上正要接收的序号,同時,有一個使用者程序正在休眠等待接收資料中,且它要等待的資料起始序号正是s2,于是,這種種條件下,使得這一步同時也是網絡軟中斷執行上下文中,把s2-s3封包直接拷貝進使用者記憶體。
11、上文介紹tcp_data_queue方法時大家可以看到,每處理完1個有序封包(無論是拷貝到receive隊列還是直接複制到使用者記憶體)後都會檢查out_of_order隊列,看看是否有封包可以處理。那麼,s3-s4封包恰好是待處理的,于是拷貝進使用者記憶體。然後喚醒使用者程序。
12、使用者程序被喚醒了,當然喚醒後會先來拿到socket鎖。以下執行又在程序上下文中了。
13、此時會檢查已拷貝的位元組數是否大于so_rcvlowat,以及backlog隊列是否為空。兩者皆滿足,準備傳回。
14、釋放socket鎖,退出tcp_recvmsg方法。
15、傳回使用者已經複制的位元組數s4-s1。
好了,這3個場景讀完,想必大家對于tcp的接收流程是怎樣的已經非常清楚了,本文起始的6個問題也在這一大篇中都涉及到了。下一篇我們來讨論tcp連接配接的關閉。