天天看點

網絡IO模型演進03——poll模型示例

1. 簡介

  • 在 nio 模型中,需要在應用程式不斷的做read系統調用,消耗系統資源,這個複雜度是O(n),但是這些read中有很多是無效的,進而引出了poll模型。
  • 在poll模型中,省去了在使用者空間做O(n)複雜度的read系統調用,節省了大量的資源

2. 筆者環境

  • Ubuntu 18.04
  • JDK1.8

3. 示例代碼

  • vim SocketMultiplexingSingleThread.java
  • javac SocketMultiplexingSingleThread.java
  • strace -ff -o out java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider SocketMultiplexingSingleThread
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;

public class SocketMultiplexingSingleThread {

    private ServerSocketChannel server = null;
    private Selector selector = null;
    int port = 9090;

    public void initServer() {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));

            selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("init server finish");
        try {
            while (true) {

                Set<SelectionKey> keys = selector.keys();
                System.out.println("keys size : "+keys.size());
                
                  while (selector.select() > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();  //傳回的有狀态的fd集合
                    Iterator<SelectionKey> iter = selectionKeys.iterator();

                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);  //連read 還有 write都處理了
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); //調用accept接受用戶端
            client.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("new Client " + client.getRemoteAddress());
            System.out.println("-------------------------------------------");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    public static void main(String[] args) {
        SocketMultiplexingSingleThread service = new SocketMultiplexingSingleThread();
        service.start();
    }
}

           
  • 這裡strace 是 檢視系統調用指令,會将結果輸出到out為字首的檔案中,後面的id是線程id
  • 可以看到此時控制台輸出 init server finish
$ strace -ff -o out java   -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider  SocketMultiplexingSingleThread
init server finish
keys size : 1
           

4. 檢視系統調用

  • 打開一個新的指令行,檢視系統調用日志,查找輸出 init server 的檔案
$ grep 'init server' out.*
out.1083:write(1, "init server finish", 18)      = 18
           
  • 打開out.1083檔案,查找上面 out.1083:write(1, “init server finish”, 18) 這一行
  • vim out.1083
socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
setsockopt(4, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0
setsockopt(4, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
fcntl(4, F_GETFL)                       = 0x2 (flags O_RDWR)
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0


......

bind(4, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(4, 50)                           = 0

......

write(1, "init server finish", 18)      = 18
write(1, "\n", 1)                       = 1
write(1, "keys size : 1", 13)           = 13
write(1, "\n", 1) 

......


poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1
           

這裡有幾個關鍵的資訊,

  • socket的檔案描述符是4
  • 在bind 和 listen系統調用之後,執行了 poll ,直接将檔案描述符集合 和 對應關注的事件傳入,
  • 因為沒有用戶端連接配接,阻塞在這裡

5. 用戶端連接配接

  • 新打開一個指令行,進行連接配接
  • nc localhost 9090
  • 繼續檢視系統調用 tail -f out.1083
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])

......

accept(4, {sa_family=AF_INET6, sin6_port=htons(54076), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 7


......


fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
write(1, "--------------------------------"..., 43) = 43
write(1, "\n", 1)                       = 1
write(1, "new Client /127.0.0.1:54076", 27) = 27
write(1, "\n", 1)                       = 1
write(1, "--------------------------------"..., 43) = 43
write(1, "\n", 1)                       = 1
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1


           
  • accept 傳回的檔案描述符是7
  • 繼續将 檔案描述符集合 和 關注的事件傳入 poll 系統調用中

6. 用戶端發送資料

  • 在nc 視窗下發送資料
$ nc localhost 9090
abcdefg
           
  • tail -f out.1083
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1) = 1 ([{fd=7, revents=POLLIN}])


read(7, "abcdefg\n", 8192)              = 8

......

write(7, "abcdefg\n", 8)                = 8
read(7, 0x7fd2800bcc50, 8192)           = -1 EAGAIN (Resource temporarily unavailable)
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1

           
  • 用戶端發送資料 poll 系統調用傳回 有事件發生的檔案描述符集合 ([{fd=7, revents=POLLIN}])
  • read資料之後,再次read 發現傳回-1之後,繼續調用 poll
  • 這時poll會阻塞

7. 再連接配接一個用戶端,檢視系統調用

  • 新打開一個指令行,進行連接配接
  • nc localhost 9090
  • tail -f out.1083
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1) = 1 ([{fd=4, revents=POLLIN}])
accept(4, {sa_family=AF_INET6, sin6_port=htons(54982), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 9
fcntl(9, F_GETFL)                       = 0x2 (flags O_RDWR)
getsockname(9, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 0
getsockname(9, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 0
fcntl(9, F_GETFL)                       = 0x2 (flags O_RDWR)
fcntl(9, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
write(1, "--------------------------------"..., 43) = 43
write(1, "\n", 1)                       = 1
write(1, "new Client /127.0.0.1:54982", 27) = 27
write(1, "\n", 1)                       = 1
write(1, "--------------------------------"..., 43) = 43
write(1, "\n", 1)                       = 1
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}, {fd=9, events=POLLIN}], 4, -1

           

可以看出,發生的系統調用和第一個用戶端連接配接時, 基本相同

8. 兩個用戶端交替發送資料

poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}, {fd=9, events=POLLIN}], 4, -1) = 1 ([{fd=9, revents=POLLIN}])
read(9, "12345678\n", 8192)             = 9
write(9, "12345678\n", 9)               = 9
read(9, 0x7fd2800bcc50, 8192)           = -1 EAGAIN (Resource temporarily unavailable)
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}, {fd=9, events=POLLIN}], 4, -1) = 1 ([{fd=7, revents=POLLIN}])
read(7, "abcdefg\n", 8192)              = 8
write(7, "abcdefg\n", 8)                = 8
read(7, 0x7fd2800bcc50, 8192)           = -1 EAGAIN (Resource temporarily unavailable)
poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}, {fd=9, events=POLLIN}], 4, -1
           

9.總結

  1. poll涉及的系統調用
  • socket
  • bind
  • listen
  • poll 傳入檔案描述符集合和對應的關注的事件,是一個阻塞調用,如果這些檔案描述符都沒有發生對應的事件,則poll會阻塞
  • accept 在poll 傳回之後,再執行accept
  • read 在poll 傳回之後,再執行read

繼續閱讀