epoll 是 Linux 下 IO多路複用的機制,可以監視多個描述符的讀/寫等事件,一旦某個描述符就緒(一般是讀或者寫事件發生了),就能夠将發生的事件通知給關心的應用程式去處理該事件。
以前的網絡程式設計方式
拿使用 socket 實作的聊天程式舉例。
伺服器端:
public static void main(String[] args) {
ServerSocket server = null;
try {
server = new ServerSocket(PROT);
System.out.println(" server start .. ");
//進行阻塞
while (true) {//這裡應該循環,使得可以接受多個用戶端的請求。
Socket socket = server.accept();//會阻塞,直到有用戶端來連結
//建立一個線程執行用戶端的任務
new Thread(new ServerHandler(socket)).start();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (server != null) {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
server = null;
}
}
每連接配接一個用戶端,就新啟動一個線程,如果有1萬個用戶端,就會産生一萬個線程,會嚴重消耗掉 CPU 性能。
當然可以使用線程池,但是無法根本性地解決問題
使用 Nio
while (true) {
try {
//1 必須要讓多路複用器開始監聽
this.seletor.select();
//2 傳回多路複用器已經選擇的結果集
Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
//3 進行周遊
while (keys.hasNext()) {
//4 擷取一個選擇的元素
SelectionKey key = keys.next();
//5 直接從容器中移除就可以了
keys.remove();
//6 如果是有效的
if (key.isValid()) {
//7 如果為阻塞狀态
if (key.isAcceptable()) {
this.accept(key);
}
//8 如果為可讀狀态
if (key.isReadable()) {
this.read(key);
}
//9 寫資料
if (key.isWritable()) {
this.write(key); //ssc
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
如果有用戶端連接配接成功:
private void accept(SelectionKey key) {
try {
//1 擷取服務通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 執行阻塞方法
SocketChannel sc = ssc.accept();
//3 設定阻塞模式
sc.configureBlocking(false);
//4 注冊到多路複用器上,并設定讀取辨別
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
我們看到 始終隻有一個線程,不管有多少個用戶端來連接配接。
注意:不是沒有任何阻塞。
seletor.select()就會阻塞,但是其他的讀寫事件都不會,不像傳統的
inputStream.read()
就會卡死在那裡,直到有資料可讀。
Nio和傳統 io 的差別
傳統 io
- 每連接配接一個用戶端,就會産生一個 socket,有多少個 socket 就會建立多少個線程;
- 判斷 socket 是否可讀或可寫,需要我們程式自己輪詢;
- 讀寫操作可能會阻塞直到可處理;
- 傳統 socket 是面向流的。
Nio
- 一個線程就可以處理 n 個 socket得讀寫;
- 不需要輪詢所有的 socket,隻需要輪詢
;this.seletor.select()
- 面向緩沖區的。
為什麼 Nio 不需要輪詢所有的 socket 就知道哪些 socket 就緒(可讀或可寫)呢?
因為在 Nio 中,任何 socket 就緒都會回調一個鈎子方法,應用程式就會馬上知道。
epoll
參考:
http://man7.org/linux/man-pages/man7/epoll.7.htmlepoll 是對 poll 的增強
epoll 提供了三個系統調用:
epoll_create
建立一個 epoll 執行個體,也是一個檔案描述符,所有後續調用
到的epoll接口都會使用此檔案描述符。
epoll_ctl
epoll執行個體的操作接口
方法簽名:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
共有四個參數:
參數名 | 含義 |
---|---|
epfd | epoll 執行個體 |
op | 操作類型,枚舉:EPOLL_CTL_ADD,EPOLL_CTL_MOD,EPOLL_CTL_DEL, op為EPOLL_CTL_ADD 表示注冊一個目标檔案描述符 到 epoll 執行個體 |
fd | 目标檔案描述符 |
event | 目标檔案描述符感興趣的事件,比如可讀,可寫,event 結構如下 |

events 是數字,可以是下面的枚舉值由 or 組成的掩碼:
EPOLLIN:可讀;
EPOLLOUT:可寫;
EPOLLERR:有異常發生;
等等,具體參考:
epoll_wait
等待 epoll 執行個體上的 io 事件發生。
方法簽名如下:
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
maxevents | 傳回的最大的可處理的事件數量,必須大于0 |
timeout | epoll_wait 方法阻塞的逾時時間 |
目标檔案描述符待處理的事件,比如可讀,可寫 |
逾時什麼時候結束呢
- 任何一個檔案描述符回調了事件(前面通過epoll_ctl 注冊的事件);
- 被signal handler 中斷;
- 逾時
epoll 和 poll 的最大的差別(優點)
- 能監控更多的檔案描述符;
- 不需要每次監控都要把所有的檔案描述符 從使用者态拷貝到核心态;
- 不需要每次周遊所有的檔案描述符。
epoll為什麼判斷是否有可處理的事件時不用周遊所有的檔案描述符
說白了,epoll 采用了事件回調機制(類似 [觀察者模式]()),其實後面有很多架構都采用了這種事件回調機制,比如 Nodejs 等。
epoll 監聽 fd 事件時,有一個就緒隊列,一旦某個 fd 就緒(即有待處理的事件,例如可讀,可寫),就會放在這個就緒隊列,應用程式調用.select() 時,不用重新周遊所有的 fd,隻需要查詢這個就緒隊列就行。
Nio select 源碼分析
注冊 channel(套接字)
see /Users/xxx/Downloads/jdk_src2/sun/nio/ch/SelectorImpl.java
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
其中,
- implRegister(k) 是為了寫入 channel 檔案描述符的位置;
- k.interestOps(ops) 為了寫入監聽的channel 可處理的操作
ops 的取值
- SelectionKey.OP_CONNECT
- SelectionKey.OP_WRITE
- SelectionKey.OP_READ
implRegister的實作
見/Users/whuanghkl/Downloads/rt.jar.source/classes/sun/nio/ch/AbstractPollSelectorImpl.java
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();
// Check to see if the array is large enough
if (channelArray.length == totalChannels) {
// Make a larger array
int newSize = pollWrapper.totalChannels * 2;
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
// Copy over
for (int i=channelOffset; i<totalChannels; i++)
temp[i] = channelArray[i];
channelArray = temp;
// Grow the NativeObject poll array
pollWrapper.grow(newSize);
}
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.addEntry(ski.channel);
totalChannels++;
keys.add(ski);
}
}
void addEntry(SelChImpl var1) {
this.putDescriptor(this.totalChannels, IOUtil.fdVal(var1.getFD()));
this.putEventOps(this.totalChannels, 0);
this.putReventOps(this.totalChannels, 0);
++this.totalChannels;
}
Windows系統 實作
見 /Users/xxx/Downloads/openjdk-8u40-src-b25-10_feb_2015/openjdk/jdk/src/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
fdMap.put(ski);
keys.add(ski);
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
重點方法: pollWrapper.addEntry(totalChannels, ski);
void addEntry(SelChImpl var1) {
this.putDescriptor(this.totalChannels, IOUtil.fdVal(var1.getFD()));
this.putEventOps(this.totalChannels, 0);
this.putReventOps(this.totalChannels, 0);
++this.totalChannels;
}
注冊(監聽)channel感興趣的操作
k.interestOps(ops)
public SelectionKey interestOps(int ops) {
ensureValid();
return nioInterestOps(ops);
}
public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
方法channel.translateAndSetInterestOps(ops, this)中調用了 void translateAndSetInterestOps(int ops, SelectionKeyImpl sk);
translateAndSetInterestOps 見/Users/xxx/Downloads/jdk_src2/sun/nio/ch/SocketChannelImpl.java
/**
* Translates an interest operation set into a native poll event set
*/
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= Net.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= Net.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= Net.POLLCONN;
sk.selector.putEventOps(sk, newOps);
}
/Users/xxx/Downloads/jdk_src2/sun/nio/ch/AbstractPollSelectorImpl.java 中
public void putEventOps(SelectionKeyImpl sk, int ops) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();
pollWrapper.putEventOps(sk.getIndex(), ops);
}
}
void putEventOps(int i, int event) {
int offset = SIZE_POLLFD * i + EVENT_OFFSET;
pollArray.putShort(offset, (short)event);
}
unsafe 常用操作解析
putInt 表示在指定位置寫入一個 int類型資料
/**
* Writes an int at the specified offset from this native object's
* base address.
*
* @param offset
* The offset at which to write the int
*
* @param value
* The int value to be written
*/
final void putInt(int offset, int value) {
unsafe.putInt(offset + address, value);
}
poll
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
調用作業系統的能力來監聽socket
select
select做了哪些事?
- poll,阻塞,擷取channel 清單中可操作的channel;
- 如果有可以操作的channel,則poll 會傳回;
-
根據作業系統調用的傳回readFds, writeFds, exceptFds,來更新selectedKeys
見 /Users/xxx/Downloads/jdk_src2/sun/nio/ch/AbstractPollSelectorImpl.java
/**
* Copy the information in the pollfd structs into the opss
* of the corresponding Channels. Add the ready keys to the
* ready queue.
*/
protected int updateSelectedKeys() {
int numKeysUpdated = 0;
// Skip zeroth entry; it is for interrupts only
for (int i=channelOffset; i<totalChannels; i++) {
int rOps = pollWrapper.getReventOps(i);
if (rOps != 0) {
SelectionKeyImpl sk = channelArray[i];
pollWrapper.putReventOps(i, 0);
if (selectedKeys.contains(sk)) {
if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
numKeysUpdated++;
}
} else {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
處理監聽結果
private int processSelectedKeys(long updateCount) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
AbstractPollArrayWrapper 源碼
/**
* Manipulates a native array of pollfd structs.
*
* @author Mike McCloskey
* @since 1.4
*/
public abstract class AbstractPollArrayWrapper {
// Miscellaneous constants
static final short SIZE_POLLFD = 8;
static final short FD_OFFSET = 0;
static final short EVENT_OFFSET = 4;
static final short REVENT_OFFSET = 6;
// The poll fd array
protected AllocatedNativeObject pollArray;
// Number of valid entries in the pollArray
protected int totalChannels = 0;
// Base address of the native pollArray
protected long pollArrayAddress;
// Access methods for fd structures
int getEventOps(int i) {
int offset = SIZE_POLLFD * i + EVENT_OFFSET;
return pollArray.getShort(offset);
}
int getReventOps(int i) {
int offset = SIZE_POLLFD * i + REVENT_OFFSET;
return pollArray.getShort(offset);
}
int getDescriptor(int i) {
int offset = SIZE_POLLFD * i + FD_OFFSET;
return pollArray.getInt(offset);
}
void putEventOps(int i, int event) {
int offset = SIZE_POLLFD * i + EVENT_OFFSET;
pollArray.putShort(offset, (short)event);
}
void putReventOps(int i, int revent) {
int offset = SIZE_POLLFD * i + REVENT_OFFSET;
pollArray.putShort(offset, (short)revent);
}
void putDescriptor(int i, int fd) {
int offset = SIZE_POLLFD * i + FD_OFFSET;
pollArray.putInt(offset, fd);
}
}
int 是四個位元組
見/Users/xxx/Downloads/jdk_src2/sun/nio/ch/NativeObject.java
/**
* Reads an address from this native object at the given offset and
* constructs a native object using that address.
*
* @param offset
* The offset of the address to be read. Note that the size of an
* address is implementation-dependent.
*
* @return The native object created using the address read from the
* given offset
*/
NativeObject getObject(int offset) {
long newAddress = 0L;
switch (addressSize()) {
case 8:
newAddress = unsafe.getLong(offset + address);
break;
case 4:
newAddress = unsafe.getInt(offset + address) & 0x00000000FFFFFFFF;
break;
default:
throw new InternalError("Address size not supported");
}
return new NativeObject(newAddress);
}
poll file description 的結構
見 /Users/xxx/Downloads/jdk_src2/sun/nio/ch/PollArrayWrapper.java
Manipulates a native array of pollfd structs on Solaris:
typedef struct pollfd {
int fd;
short events;
short revents;
} pollfd_t;
一個描述符占用8個位元組
jdk源碼
https://yddmax.github.io/2017/06/05/openjdk%E6%BA%90%E7%A0%81%E7%9B%AE%E5%BD%95/後記
epoll 屬于偏底層的,不太好了解。
為了加深了解,可以了解下 JavaScript 的 Event Loop 或 NodeJs 的 Event Loop
https://juejin.im/entry/5b6058fde51d45348a2ffc65
)
https://linux.die.net/man/2/epoll_wait https://juejin.im/post/5b0524f8518825428a2631ee