天天看點

IO模型-bio 多路複用epoll

主要研究 NIO與EPOLL

探讨深度 Java代碼 -> HotSpot源碼 -> linux函數

一. BIO

1 public static void main(String[] args) throws IOException {
2	ServerSocket serverSocket = new ServerSocket(9000);
3	
4	while (true) {
5	    Socket socket = serverSocket.accept();	
6	    byte[] buffer = new byte[1024];
7	    int read = socket.getInputStream().read(buffer);
8	    if (read != 0) {
9	        String data = new String(buffer, 0, read);
10	        System.out.println(data);
11	        OutputStream outputStream = socket.getOutputStream();
12	
13	        outputStream.write(data.getBytes());
14	        outputStream.flush();
15	        socket.close();
	    }
	}
}
           

終端指令:

IO模型-bio 多路複用epoll

debug啟動時, 會發現pc指針會在第5行阻塞, 我們啟終端 telnet localhost 9000

指針會繼續執行, 再次阻塞在第7行, 這時我們在終端 send bio test, pc會繼續執行

期間終端再啟一個控制台, 去telnet發現也是被阻塞的 後發送會等第一個執行完再執行後面的

idea console如下:

IO模型-bio 多路複用epoll

二. NIO

  1. 不使用多路複用, 單純的非阻塞
public static void main(String[] args) throws IOException {
	List<SocketChannel> clientSocketList = new ArrayList<>(1024);
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(9000));
    serverSocketChannel.configureBlocking(false);

    while (true) {
        SocketChannel clientSocket = serverSocketChannel.accept();
        if (clientSocket != null) {
            clientSocket.configureBlocking(false);
            clientSocketList.add(clientSocket);
        }

        Iterator<SocketChannel> iterator = clientSocketList.iterator();

        while (iterator.hasNext()) {
            SocketChannel sc = iterator.next();
            ByteBuffer dst = ByteBuffer.allocate(1024);
            int len = sc.read(dst);

            if (len > 0) {
                System.out.println("接收到消息: " + new String(dst.array()));
            }
        }
    }
}
           

從上面代碼我們就發現設定了一個很重要的參數configureBlocking(false), 設定為非阻塞, accept和read都不阻塞, 進來的socket都放到了一個容器了, 然後循環便利這個容器, 看是否有inputStream流資料, 有則輸出, 繼續下次循環.

開啟兩個終端telnet localhost 9000

第一個send a

第二個send b

IDEA console如下:

IO模型-bio 多路複用epoll

這段代碼例子, (可以了解為)就是linux的select的工作原理, 而poll就是把clientSocketList容器的最大值設定為Integer.MAX_VALUE.

  1. 重點在這裡!!!

    重點在這裡!!!

    重點在這裡!!!

    重點在這裡!!!

    重點在這裡!!!

    使用多路複用Selector

public static void main(String[] args) throws IOException {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(9000));
    serverSocketChannel.configureBlocking(false);
    // linux epoll_create
    Selector selector = Selector.open();
    // linux epoll_ctl
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        // linux epoll_wait rdList epoll的就緒隊列有事件 才會繼續執行
        selector.select();

        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();


        while (iterator.hasNext()) {
            SelectionKey sk = iterator.next();

            if (sk.isAcceptable()) {
                ServerSocketChannel server = (ServerSocketChannel) sk.channel();
                SocketChannel sc = server.accept();
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);
                System.out.println("用戶端連接配接成功");
            } else if (sk.isReadable()) {
                SocketChannel sc = (SocketChannel) sk.channel();
                ByteBuffer dst = ByteBuffer.allocate(1024);
                int len = sc.read(dst);

                if (len > 0) {
                    System.out.println("接收到消息: " + new String(dst.array()));
                }
            }
            iterator.remove();
        }
    }
}
           

第一個終端 telnet localhost 9000

第二個終端 telnet localhost 9000

第一個終端 send a

第二個終端 seng b

第一個終端 send c

IDEA console如下:

IO模型-bio 多路複用epoll

!!! 這段代碼, 主要引入了一個在linux運作jdk時候, 就是EPOLL的相關操作. 服務端和連接配接進來的socket分别注冊了OPEN和READ事件, 沒有事件時會阻塞在select()方法裡, 當有相關事件就會進行接下來的代碼邏輯執行.

接下來我們重點看Selector的 open, register, select三個方法, 到底做了什麼東西.

// open方法 jdk源碼
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        // 重點create方法 這裡直接點進去 會根據下載下傳的jdk pc版本進入不同方法 我們直接去openjdk源碼裡看Epoll的實作
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
           

!!! 源碼分析

// open jdk源碼
public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    // 到這個類裡去看
    return new sun.nio.ch.PollSelectorProvider();
}

public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
    	// return了一個實作類
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

EPollSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    long pipeFds = IOUtil.makePipe(false);
    fd0 = (int) (pipeFds >>> 32);
    fd1 = (int) pipeFds;
    try {
    	// new了一個EPollArrayWrapper
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<>();
    } catch (Throwable t) {
        try {
            FileDispatcherImpl.closeIntFD(fd0);
        } catch (IOException ioe0) {
            t.addSuppressed(ioe0);
        }
        try {
            FileDispatcherImpl.closeIntFD(fd1);
        } catch (IOException ioe1) {
            t.addSuppressed(ioe1);
        }
        throw t;
    }
}

// EPollArrayWrapper裡
EPollArrayWrapper() throws IOException {
    // creates the epoll file descriptor
    // 我們都知道linux裡 一切皆是檔案 是以建立了一個epoll
    epfd = epollCreate();

    // the epoll_event array passed to epoll_wait
    int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
    pollArray = new AllocatedNativeObject(allocationSize, true);
    pollArrayAddress = pollArray.address();

    // eventHigh needed when using file descriptors > 64k
    if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
        eventsHigh = new HashMap<>();
}

// 三個核心native方法
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;

// 我們先拿epollCreate()舉例, 我們要去看c語言如何實作這個方法, 那麼拿類名加下劃線和方法名, 
// EPollArrayWrapper_epollCreate
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    /*
     * epoll_create expects a size as a hint to the kernel about how to
     * dimension internal structures. We can't predict the size in advance.
     */
     // 就是這個epoll_create linux中的方法 C語言可以直接調用它 
     // 在linux裡用man指令可以檢視相關方法的較長的描述
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}
           

以上就是selector的open分析 jdk源碼 -> hotspot源碼 -> linux文法概要

register 對應 epollCtl, epollCtl相當于注冊一個事件

select 對應 epollWait, epollWait相當于綁定了這個事件 并且監聽

三. AIO

public static void main(String[] args) throws IOException, InterruptedException {
    AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));

    serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
        @Override
        public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
            try {
                System.out.println("2--"+Thread.currentThread().getName());
                // 再此接收用戶端連接配接,如果不寫這行代碼後面的用戶端連接配接連不上服務端
                serverChannel.accept(attachment, this);
                System.out.println(socketChannel.getRemoteAddress());
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer buffer) {
                        System.out.println("3--"+Thread.currentThread().getName());
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, result));
                        socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer buffer) {
                        exc.printStackTrace();
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();
        }
    });

    System.out.println("1--"+Thread.currentThread().getName());
    Thread.sleep(Integer.MAX_VALUE);
}
           

啟動第一個終端執行 telnet localhost 9000和send a

啟動第二個終端執行 telnet localhost 9000和send b

IDEA console如下:

IO模型-bio 多路複用epoll

BIO, NIO在代碼案例中, 都是main線程去執行的, 而這個AIO就是線程如此多.

四. 總結

BIO, NIO, AIO主要的差別分為兩個阻塞與非阻塞, 同步與非同步

從第一個和第二個例子, 我們可以了解出阻塞與非阻塞的差別.

從第三個和第四個例子, 我們可以看出同步與非同步的差別.

IO模型-bio 多路複用epoll

1.4之後出現NIO, 先是采用linux的 select 後采用poll

1.5及之後采用了epoll的IO多路複用架構

IO模型-bio 多路複用epoll

繼續閱讀