天天看點

epoll與 Nio淺析

epoll 是 Linux 下 IO多路複用的機制,可以監視多個描述符的讀/寫等事件,一旦某個描述符就緒(一般是讀或者寫事件發生了),就能夠将發生的事件通知給關心的應用程式去處理該事件。

以前的網絡程式設計方式

拿使用 socket 實作的聊天程式舉例。

伺服器端:

public static void main(String[] args) {
        ServerSocket server = null;
        try {
            server = new ServerSocket(PROT);
            System.out.println(" server start .. ");
            //進行阻塞
            while (true) {//這裡應該循環,使得可以接受多個用戶端的請求。
                Socket socket = server.accept();//會阻塞,直到有用戶端來連結
                //建立一個線程執行用戶端的任務
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (server != null) {
                try {
                    server.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            server = null;
        }
    }           

每連接配接一個用戶端,就新啟動一個線程,如果有1萬個用戶端,就會産生一萬個線程,會嚴重消耗掉 CPU 性能。

當然可以使用線程池,但是無法根本性地解決問題

使用 Nio

while (true) {
            try {
                //1 必須要讓多路複用器開始監聽
                this.seletor.select();
                //2 傳回多路複用器已經選擇的結果集
                Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                //3 進行周遊
                while (keys.hasNext()) {
                    //4 擷取一個選擇的元素
                    SelectionKey key = keys.next();
                    //5 直接從容器中移除就可以了
                    keys.remove();
                    //6 如果是有效的
                    if (key.isValid()) {
                        //7 如果為阻塞狀态
                        if (key.isAcceptable()) {
                            this.accept(key);
                        }
                        //8 如果為可讀狀态
                        if (key.isReadable()) {
                            this.read(key);
                        }
                        //9 寫資料
                        if (key.isWritable()) {
                            this.write(key); //ssc
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }           

如果有用戶端連接配接成功:

private void accept(SelectionKey key) {
        try {
            //1 擷取服務通道
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //2 執行阻塞方法
            SocketChannel sc = ssc.accept();
            //3 設定阻塞模式
            sc.configureBlocking(false);
            //4 注冊到多路複用器上,并設定讀取辨別
            sc.register(this.seletor, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }           

我們看到 始終隻有一個線程,不管有多少個用戶端來連接配接。

注意:不是沒有任何阻塞。

seletor.select()就會阻塞,但是其他的讀寫事件都不會,不像傳統的

inputStream.read()

就會卡死在那裡,直到有資料可讀。

Nio和傳統 io 的差別

傳統 io

  1. 每連接配接一個用戶端,就會産生一個 socket,有多少個 socket 就會建立多少個線程;
  2. 判斷 socket 是否可讀或可寫,需要我們程式自己輪詢;
  3. 讀寫操作可能會阻塞直到可處理;
  4. 傳統 socket 是面向流的。

Nio

  1. 一個線程就可以處理 n 個 socket得讀寫;
  2. 不需要輪詢所有的 socket,隻需要輪詢

    this.seletor.select()

  3. 面向緩沖區的。

為什麼 Nio 不需要輪詢所有的 socket 就知道哪些 socket 就緒(可讀或可寫)呢?

因為在 Nio 中,任何 socket 就緒都會回調一個鈎子方法,應用程式就會馬上知道。

epoll

參考:

http://man7.org/linux/man-pages/man7/epoll.7.html

epoll 是對 poll 的增強

epoll 提供了三個系統調用:

epoll_create

建立一個 epoll 執行個體,也是一個檔案描述符,所有後續調用

到的epoll接口都會使用此檔案描述符。

epoll_ctl

epoll執行個體的操作接口

方法簽名:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

共有四個參數:

參數名 含義
epfd epoll 執行個體
op 操作類型,枚舉:EPOLL_CTL_ADD,EPOLL_CTL_MOD,EPOLL_CTL_DEL, op為EPOLL_CTL_ADD 表示注冊一個目标檔案描述符 到 epoll 執行個體
fd 目标檔案描述符
event 目标檔案描述符感興趣的事件,比如可讀,可寫,event 結構如下
epoll與 Nio淺析

events 是數字,可以是下面的枚舉值由 or 組成的掩碼:

EPOLLIN:可讀;

EPOLLOUT:可寫;

EPOLLERR:有異常發生;

等等,具體參考:

epoll_wait

等待 epoll 執行個體上的 io 事件發生。

方法簽名如下:

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout);
           
maxevents 傳回的最大的可處理的事件數量,必須大于0
timeout epoll_wait 方法阻塞的逾時時間
目标檔案描述符待處理的事件,比如可讀,可寫

逾時什麼時候結束呢

  1. 任何一個檔案描述符回調了事件(前面通過epoll_ctl 注冊的事件);
  2. 被signal handler 中斷;
  3. 逾時

epoll 和 poll 的最大的差別(優點)

  1. 能監控更多的檔案描述符;
  2. 不需要每次監控都要把所有的檔案描述符 從使用者态拷貝到核心态;
  3. 不需要每次周遊所有的檔案描述符。

epoll為什麼判斷是否有可處理的事件時不用周遊所有的檔案描述符

說白了,epoll 采用了事件回調機制(類似 [觀察者模式]()),其實後面有很多架構都采用了這種事件回調機制,比如 Nodejs 等。

epoll 監聽 fd 事件時,有一個就緒隊列,一旦某個 fd 就緒(即有待處理的事件,例如可讀,可寫),就會放在這個就緒隊列,應用程式調用.select() 時,不用重新周遊所有的 fd,隻需要查詢這個就緒隊列就行。

Nio select 源碼分析

注冊 channel(套接字)

see /Users/xxx/Downloads/jdk_src2/sun/nio/ch/SelectorImpl.java

protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
    {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        synchronized (publicKeys) {
            implRegister(k);
        }
        k.interestOps(ops);
        return k;
    }
           

其中,

  1. implRegister(k) 是為了寫入 channel 檔案描述符的位置;
  2. k.interestOps(ops) 為了寫入監聽的channel 可處理的操作

ops 的取值

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_WRITE
  3. SelectionKey.OP_READ

implRegister的實作

見/Users/whuanghkl/Downloads/rt.jar.source/classes/sun/nio/ch/AbstractPollSelectorImpl.java

protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed)
                throw new ClosedSelectorException();

            // Check to see if the array is large enough
            if (channelArray.length == totalChannels) {
                // Make a larger array
                int newSize = pollWrapper.totalChannels * 2;
                SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
                // Copy over
                for (int i=channelOffset; i<totalChannels; i++)
                    temp[i] = channelArray[i];
                channelArray = temp;
                // Grow the NativeObject poll array
                pollWrapper.grow(newSize);
            }
            channelArray[totalChannels] = ski;
            ski.setIndex(totalChannels);
            pollWrapper.addEntry(ski.channel);
            totalChannels++;
            keys.add(ski);
        }
    }
    
    
    void addEntry(SelChImpl var1) {
        this.putDescriptor(this.totalChannels, IOUtil.fdVal(var1.getFD()));
        this.putEventOps(this.totalChannels, 0);
        this.putReventOps(this.totalChannels, 0);
        ++this.totalChannels;
    }
               

Windows系統 實作

見 /Users/xxx/Downloads/openjdk-8u40-src-b25-10_feb_2015/openjdk/jdk/src/windows/classes/sun/nio/ch/WindowsSelectorImpl.java

protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (pollWrapper == null)
                throw new ClosedSelectorException();
            growIfNeeded();
            channelArray[totalChannels] = ski;
            ski.setIndex(totalChannels);
            fdMap.put(ski);
            keys.add(ski);
            pollWrapper.addEntry(totalChannels, ski);
            totalChannels++;
        }
    }           

重點方法: pollWrapper.addEntry(totalChannels, ski);

void addEntry(SelChImpl var1) {
        this.putDescriptor(this.totalChannels, IOUtil.fdVal(var1.getFD()));
        this.putEventOps(this.totalChannels, 0);
        this.putReventOps(this.totalChannels, 0);
        ++this.totalChannels;
    }           

注冊(監聽)channel感興趣的操作

k.interestOps(ops)

public SelectionKey interestOps(int ops) {
        ensureValid();
        return nioInterestOps(ops);
    }
    
    public SelectionKey nioInterestOps(int ops) {
        if ((ops & ~channel().validOps()) != 0)
            throw new IllegalArgumentException();
        channel.translateAndSetInterestOps(ops, this);
        interestOps = ops;
        return this;
    }
               

方法channel.translateAndSetInterestOps(ops, this)中調用了 void translateAndSetInterestOps(int ops, SelectionKeyImpl sk);

translateAndSetInterestOps 見/Users/xxx/Downloads/jdk_src2/sun/nio/ch/SocketChannelImpl.java

/**
     * Translates an interest operation set into a native poll event set
     */
    public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
        int newOps = 0;
        if ((ops & SelectionKey.OP_READ) != 0)
            newOps |= Net.POLLIN;
        if ((ops & SelectionKey.OP_WRITE) != 0)
            newOps |= Net.POLLOUT;
        if ((ops & SelectionKey.OP_CONNECT) != 0)
            newOps |= Net.POLLCONN;
        sk.selector.putEventOps(sk, newOps);
    }           

/Users/xxx/Downloads/jdk_src2/sun/nio/ch/AbstractPollSelectorImpl.java 中

public void putEventOps(SelectionKeyImpl sk, int ops) {
        synchronized (closeLock) {
            if (closed)
                throw new ClosedSelectorException();
            pollWrapper.putEventOps(sk.getIndex(), ops);
        }
    }

  void putEventOps(int i, int event) {
        int offset = SIZE_POLLFD * i + EVENT_OFFSET;
        pollArray.putShort(offset, (short)event);
    }
               

unsafe 常用操作解析

putInt 表示在指定位置寫入一個 int類型資料

/**
     * Writes an int at the specified offset from this native object's
     * base address.
     *
     * @param  offset
     *         The offset at which to write the int
     *
     * @param  value
     *         The int value to be written
     */
    final void putInt(int offset, int value) {
        unsafe.putInt(offset + address, value);
    }           

poll

private int poll() throws IOException{ // poll for the main thread
            return poll0(pollWrapper.pollArrayAddress,
                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
                         readFds, writeFds, exceptFds, timeout);
        }

        private int poll(int index) throws IOException {
            // poll for helper threads
            return  poll0(pollWrapper.pollArrayAddress +
                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
                     Math.min(MAX_SELECTABLE_FDS,
                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
                     readFds, writeFds, exceptFds, timeout);
        }           

調用作業系統的能力來監聽socket

select

select做了哪些事?

  1. poll,阻塞,擷取channel 清單中可操作的channel;
  2. 如果有可以操作的channel,則poll 會傳回;
  3. 根據作業系統調用的傳回readFds, writeFds, exceptFds,來更新selectedKeys

    見 /Users/xxx/Downloads/jdk_src2/sun/nio/ch/AbstractPollSelectorImpl.java

/**
     * Copy the information in the pollfd structs into the opss
     * of the corresponding Channels. Add the ready keys to the
     * ready queue.
     */
    protected int updateSelectedKeys() {
        int numKeysUpdated = 0;
        // Skip zeroth entry; it is for interrupts only
        for (int i=channelOffset; i<totalChannels; i++) {
            int rOps = pollWrapper.getReventOps(i);
            if (rOps != 0) {
                SelectionKeyImpl sk = channelArray[i];
                pollWrapper.putReventOps(i, 0);
                if (selectedKeys.contains(sk)) {
                    if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
                        numKeysUpdated++;
                    }
                } else {
                    sk.channel.translateAndSetReadyOps(rOps, sk);
                    if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
                        selectedKeys.add(sk);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }
              

處理監聽結果

private int processSelectedKeys(long updateCount) {
            int numKeysUpdated = 0;
            numKeysUpdated += processFDSet(updateCount, readFds,
                                           Net.POLLIN,
                                           false);
            numKeysUpdated += processFDSet(updateCount, writeFds,
                                           Net.POLLCONN |
                                           Net.POLLOUT,
                                           false);
            numKeysUpdated += processFDSet(updateCount, exceptFds,
                                           Net.POLLIN |
                                           Net.POLLCONN |
                                           Net.POLLOUT,
                                           true);
            return numKeysUpdated;
        }           
epoll與 Nio淺析

AbstractPollArrayWrapper 源碼

/**
 * Manipulates a native array of pollfd structs.
 *
 * @author Mike McCloskey
 * @since 1.4
 */

public abstract class AbstractPollArrayWrapper {

    // Miscellaneous constants
    static final short SIZE_POLLFD   = 8;
    static final short FD_OFFSET     = 0;
    static final short EVENT_OFFSET  = 4;
    static final short REVENT_OFFSET = 6;

    // The poll fd array
    protected AllocatedNativeObject pollArray;

    // Number of valid entries in the pollArray
    protected int totalChannels = 0;

    // Base address of the native pollArray
    protected long pollArrayAddress;

    // Access methods for fd structures
    int getEventOps(int i) {
        int offset = SIZE_POLLFD * i + EVENT_OFFSET;
        return pollArray.getShort(offset);
    }

    int getReventOps(int i) {
        int offset = SIZE_POLLFD * i + REVENT_OFFSET;
        return pollArray.getShort(offset);
    }

    int getDescriptor(int i) {
        int offset = SIZE_POLLFD * i + FD_OFFSET;
        return pollArray.getInt(offset);
    }

    void putEventOps(int i, int event) {
        int offset = SIZE_POLLFD * i + EVENT_OFFSET;
        pollArray.putShort(offset, (short)event);
    }

    void putReventOps(int i, int revent) {
        int offset = SIZE_POLLFD * i + REVENT_OFFSET;
        pollArray.putShort(offset, (short)revent);
    }

    void putDescriptor(int i, int fd) {
        int offset = SIZE_POLLFD * i + FD_OFFSET;
        pollArray.putInt(offset, fd);
    }

}
           

int 是四個位元組

見/Users/xxx/Downloads/jdk_src2/sun/nio/ch/NativeObject.java

/**
     * Reads an address from this native object at the given offset and
     * constructs a native object using that address.
     *
     * @param  offset
     *         The offset of the address to be read.  Note that the size of an
     *         address is implementation-dependent.
     *
     * @return The native object created using the address read from the
     *         given offset
     */
    NativeObject getObject(int offset) {
        long newAddress = 0L;
        switch (addressSize()) {
            case 8:
                newAddress = unsafe.getLong(offset + address);
                break;
            case 4:
                newAddress = unsafe.getInt(offset + address) & 0x00000000FFFFFFFF;
                break;
            default:
                throw new InternalError("Address size not supported");
        }

        return new NativeObject(newAddress);
    }           

poll file description 的結構

見 /Users/xxx/Downloads/jdk_src2/sun/nio/ch/PollArrayWrapper.java

Manipulates a native array of pollfd structs on Solaris:
 
  typedef struct pollfd {
     int fd;
    short events;
     short revents;
  } pollfd_t;
 
           

一個描述符占用8個位元組

epoll與 Nio淺析

jdk源碼

https://yddmax.github.io/2017/06/05/openjdk%E6%BA%90%E7%A0%81%E7%9B%AE%E5%BD%95/

後記

epoll 屬于偏底層的,不太好了解。

為了加深了解,可以了解下 JavaScript 的 Event Loop 或 NodeJs 的 Event Loop

https://juejin.im/entry/5b6058fde51d45348a2ffc65

)

https://linux.die.net/man/2/epoll_wait https://juejin.im/post/5b0524f8518825428a2631ee