開篇
回想研究所學生期間在H3C做項目的時候第一次接觸epoll的異步事件,心血來潮看了下java的NIO的實作,希望同樣感興趣的人一起看看。Netty是java NIO的集大成者,一定要看看。
java NIO server demo

socket server端工作标準流程
- 建立socket: 建立ServerSocketChannel,通過ServerSocketChannel.open()方法。
- 綁定socket:ServerSocketChannel綁定端口,通過serverSocketChannel.bind()方法。
- 前置準備: 建立selector對象,通過Selector.open()方法。
- 前置準備: 注冊Channel到selector并綁定事件,通過serverSocketChannel.register()。
- 監聽端口号: 通過listen()方法開始進入監聽。
- 處理事件: while循環中等待select操作傳回區分連接配接還是資料進行不同處理。
public class NIOServer {
private Selector selector;
public void initServer(int port) throws IOException {
// 獲得一個ServerSocketChannel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 設定通道為非阻塞
serverSocketChannel.configureBlocking(false);
// 将該通道對應的ServerSocket綁定到port端口
serverSocketChannel.bind(new InetSocketAddress(port));
// 獲得一個通道管理器
this.selector = Selector.open();
// 将通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件後,
// 當該事件到達時,selector.select()會傳回,如果該事件沒到達selector.select()會一直阻塞。
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void listen() throws IOException {
System.out.println("服務端啟動成功!");
// 輪詢通路selector
while (true) {
// 當注冊的事件到達時,方法傳回;否則,該方法會一直阻塞
selector.select();
// 獲得selector中選中的項的疊代器,選中的項為注冊的事件
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已選的key,以防重複處理
ite.remove();
if (key.isAcceptable()) {// 用戶端請求連接配接事件
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 獲得和用戶端連接配接的通道
SocketChannel channel = server.accept();
// 設定成非阻塞
channel.configureBlocking(false);
// 在這裡可以給用戶端發送資訊哦
channel.write(ByteBuffer.wrap(new String("向用戶端發送了一條資訊")
.getBytes("utf-8")));
// 在和用戶端連接配接成功之後,為了可以接收到用戶端的資訊,需要給通道設定讀的權限。
channel.register(this.selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {// 獲得了可讀的事件
read(key);
}
}
}
}
public void read(SelectionKey key) throws IOException {
// 伺服器可讀取消息:得到事件發生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 建立讀取的緩沖區
ByteBuffer buffer = ByteBuffer.allocate(512);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("服務端收到資訊:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes("utf-8"));
channel.write(outBuffer);// 将消息回送給用戶端
}
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8000);
server.listen();
}
}
ServerSocketChannel和Selector初始化過程
在java NIO Server的标準過程中,有兩個核心的操作需要深入分析一下,分别是ServerSocketChannel.open() 和 Selector.open()兩個過程,這裡針對這兩個對象的初始化流程進行下細緻的分解。
// ServerSocketChannel的初始化過程
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// Selector的初始化過程
selector = Selector.open();
通用邏輯抽取
- ServerSocketChannel.open()=SelectorProvider.provider().openServerSocketChannel()
- Selector.open()=SelectorProvider.provider().openSelector()
兩者有共同點在于都調用了SelectorProvider.provider()方法,是以先把相同部分進行分析。
public abstract class ServerSocketChannel extends AbstractSelectableChannel
implements NetworkChannel
{
protected ServerSocketChannel(SelectorProvider provider) {
super(provider);
}
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
}
public abstract class Selector implements Closeable {
protected Selector() { }
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
}
SelectorProvider對象建立
- SelectorProvider.provider()方法會在内部建立唯一的SelectorProvider對象,通過鎖來保證建立唯一對象。
- SelectorProvider對象通過DefaultSelectorProvider.create()方法進行建立。
- DefaultSelectorProvider.create()方法内部根據實際系統建立不同的對象,以linux環境中EPollSelectorProvider對象為例繼續分析。
public abstract class SelectorProvider {
private static final Object lock = new Object();
private static SelectorProvider provider = null;
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
public class DefaultSelectorProvider {
public static SelectorProvider create() {
String osname = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
// use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
// format not recognized
}
}
}
return new sun.nio.ch.PollSelectorProvider();
}
}
EPollSelectorProvider的操作過程
- SelectorProvider.provider().openServerSocketChannel()調用EPollSelectorProvider的openServerSocketChannel()方法傳回EPollSelectorImpl對象。
- SelectorProvider.provider().openSelector()調用EPollSelectorProvider的openSelector()方法傳回ServerSocketChannelImpl對象。
繼續分析ServerSocketChannelImpl對象和EPollSelectorImpl對象。
public class EPollSelectorProvider extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
public abstract class SelectorProviderImpl extends SelectorProvider
{
public DatagramChannel openDatagramChannel() throws IOException {
return new DatagramChannelImpl(this);
}
public DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException {
return new DatagramChannelImpl(this, family);
}
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
public abstract AbstractSelector openSelector() throws IOException;
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
}
EPollSelectorImpl對象
- EPollSelectorImpl構造函數建立内部通信的socket對IOUtil.makePipe(false)。
- EPollSelectorImpl的doSelect方法負責傳回事件到來的fds。
- EPollSelectorImpl的fdToKey的map儲存fd和SelectionKey的映射。
class EPollSelectorImpl extends SelectorImpl
{
// File descriptors used for interrupt
protected int fd0;
protected int fd1;
// The poll object
EPollArrayWrapper pollWrapper;
// Maps from file descriptors to keys
private Map<Integer,SelectionKeyImpl> fdToKey;
// True if this Selector has been closed
private volatile boolean closed = false;
// Lock for interrupt triggering and clearing
private Object interruptLock = new Object();
private boolean interruptTriggered = false;
EPollSelectorImpl(SelectorProvider sp) {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<Integer,SelectionKeyImpl>();
}
protected int doSelect(long timeout) throws IOException
{
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
// 等待事件到來,收集事件到來的socket的fd并用來處理
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
// 更新需要寫入的keys
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
// selectedKeys儲存ski也就是事件到的socket連接配接
// ski的對象資料結構需要好好研究一下
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
}
ServerSocketChannelImpl對象
- ServerSocketChannelImpl extends ServerSocketChannel
- ServerSocketChannel extends AbstractSelectableChannel
- ServerSocketChannelImpl對象提供bind()&accept()方法
- ServerSocketChannelImpl的accept方法内部建立新連接配接的SocketChannelImpl對象傳回
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl
{
private final Object stateLock = new Object();
private SocketAddress localAddress;
ServerSocket socket;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
ServerSocketChannelImpl(SelectorProvider sp,
FileDescriptor fd,
boolean bound) throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
if (bound)
localAddress = Net.localAddress(fd);
}
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
// 省略相關代碼
}
public SocketChannel accept() throws IOException {
// 省略相關代碼
}
}
public abstract class AbstractSelectableChannel extends SelectableChannel
{
protected AbstractSelectableChannel(SelectorProvider provider) {
this.provider = provider;
}
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
}
select過程
select執行過程
- 執行selector.select()操作時實際是調用了子類實作的doSelect()方法。
- 進一步跟進子類的doSelect()方法。
abstract class SelectorImpl extends AbstractSelector
{
// 儲存事件到來的keys
protected Set<SelectionKey> selectedKeys;
protected HashSet<SelectionKey> keys;
private Set<SelectionKey> publicKeys; // Immutable
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected abstract int doSelect(long timeout) throws IOException;
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
public int select(long timeout) throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
public int select() throws IOException {
return select(0);
}
public Set<SelectionKey> selectedKeys() {
if (!isOpen() && !Util.atBugLevel("1.4"))
throw new ClosedSelectorException();
return publicSelectedKeys;
}
}
- pollWrapper.poll(timeout)以逾時等待的形式等待epoll的消息通知。
- 通過updateSelectedKeys方法收集有事件到達的fds儲存到selectedKeys。
class EPollSelectorImpl extends SelectorImpl
{
protected int doSelect(long timeout) throws IOException
{
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
// 等待事件到來,收集事件到來的socket的fd并用來處理
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
// 更新需要寫入的keys
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
// selectedKeys儲存ski也就是事件到的socket連接配接
// ski的對象資料結構需要好好研究一下
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
}
accept過程
- accept的過程很簡單就是accept新socket并建立SocketChannelImpl傳回即可。
- SocketChannelImpl對象後面需要注冊到Selector當中是以需要進一步分析。
public SocketChannel accept() throws IOException {
// 省略相關代碼
try {
// 省略相關代碼
// 新accept的socket放在newfd當中
n = accept0(this.fd, newfd, isaa);
}
}
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
// 通過SocketChannelImpl包裝newfd對象
sc = new SocketChannelImpl(provider(), newfd, isa);
// 省略相關代碼
return sc;
}
}
SocketChannelImpl對象
- SocketChannelImpl可以了解為普通Socket的封裝,包括read/write等方法
- SocketChannelImpl extends SocketChannel extends AbstractSelectableChannel
- AbstractSelectableChannel提供register到selector對象的方法
class SocketChannelImpl extends SocketChannel implements SelChImpl
{
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.socket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_UNCONNECTED;
}
SocketChannelImpl(SelectorProvider sp,
FileDescriptor fd,
boolean bound)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_UNCONNECTED;
if (bound)
this.localAddress = Net.localAddress(fd);
}
SocketChannelImpl(SelectorProvider sp,
FileDescriptor fd, InetSocketAddress remote)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_CONNECTED;
this.localAddress = Net.localAddress(fd);
this.remoteAddress = remote;
}
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
// 讀資料的邏輯
}
public int write(ByteBuffer buf) throws IOException {
// 寫資料的邏輯
}
}
register過程
- register過程并沒有調用epollCtl方法添加fd到selector當中
- register過程真正是儲存fd到待綁定的清單當中
- 在SelectorImpl中執行pollWrapper.poll(timeout)方法先把fd清單執行epollCtl添加selector當中,在通過epollWait擷取事件到來
public abstract class AbstractSelectableChannel extends SelectableChannel
{
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
}
abstract class SelectorImpl extends AbstractSelector
{
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
}
abstract class AbstractPollSelectorImpl extends SelectorImpl
{
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();
// Check to see if the array is large enough
if (channelArray.length == totalChannels) {
// Make a larger array
int newSize = pollWrapper.totalChannels * 2;
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
// Copy over
for (int i=channelOffset; i<totalChannels; i++)
temp[i] = channelArray[i];
channelArray = temp;
// Grow the NativeObject poll array
pollWrapper.grow(newSize);
}
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
// 核心的将channel添加到pollWrapper當中
pollWrapper.addEntry(ski.channel);
totalChannels++;
keys.add(ski);
}
}
}
class EPollArrayWrapper {
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
void updateRegistrations() {
synchronized (updateList) {
Updator u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
// if the events are 0 then file descriptor is put into "idle
// set" to prevent it being polled
if (u.events == 0) {
boolean added = idleSet.add(u.channel);
// if added to idle set then remove from epoll if registered
if (added && (u.opcode == EPOLL_CTL_MOD))
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
} else {
// events are specified. If file descriptor was in idle set
// it must be re-registered (by converting opcode to ADD)
boolean idle = false;
if (!idleSet.isEmpty())
idle = idleSet.remove(u.channel);
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
}
}
}
}
}