主要研究 NIO與EPOLL
探讨深度 Java代碼 -> HotSpot源碼 -> linux函數
一. BIO
1 public static void main(String[] args) throws IOException {
2 ServerSocket serverSocket = new ServerSocket(9000);
3
4 while (true) {
5 Socket socket = serverSocket.accept();
6 byte[] buffer = new byte[1024];
7 int read = socket.getInputStream().read(buffer);
8 if (read != 0) {
9 String data = new String(buffer, 0, read);
10 System.out.println(data);
11 OutputStream outputStream = socket.getOutputStream();
12
13 outputStream.write(data.getBytes());
14 outputStream.flush();
15 socket.close();
}
}
}
終端指令:
debug啟動時, 會發現pc指針會在第5行阻塞, 我們啟終端 telnet localhost 9000
指針會繼續執行, 再次阻塞在第7行, 這時我們在終端 send bio test, pc會繼續執行
期間終端再啟一個控制台, 去telnet發現也是被阻塞的 後發送會等第一個執行完再執行後面的
idea console如下:
二. NIO
- 不使用多路複用, 單純的非阻塞
public static void main(String[] args) throws IOException {
List<SocketChannel> clientSocketList = new ArrayList<>(1024);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
serverSocketChannel.configureBlocking(false);
while (true) {
SocketChannel clientSocket = serverSocketChannel.accept();
if (clientSocket != null) {
clientSocket.configureBlocking(false);
clientSocketList.add(clientSocket);
}
Iterator<SocketChannel> iterator = clientSocketList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer dst = ByteBuffer.allocate(1024);
int len = sc.read(dst);
if (len > 0) {
System.out.println("接收到消息: " + new String(dst.array()));
}
}
}
}
從上面代碼我們就發現設定了一個很重要的參數configureBlocking(false), 設定為非阻塞, accept和read都不阻塞, 進來的socket都放到了一個容器了, 然後循環便利這個容器, 看是否有inputStream流資料, 有則輸出, 繼續下次循環.
開啟兩個終端telnet localhost 9000
第一個send a
第二個send b
IDEA console如下:
這段代碼例子, (可以了解為)就是linux的select的工作原理, 而poll就是把clientSocketList容器的最大值設定為Integer.MAX_VALUE.
-
重點在這裡!!!
重點在這裡!!!
重點在這裡!!!
重點在這裡!!!
重點在這裡!!!
使用多路複用Selector
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
serverSocketChannel.configureBlocking(false);
// linux epoll_create
Selector selector = Selector.open();
// linux epoll_ctl
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// linux epoll_wait rdList epoll的就緒隊列有事件 才會繼續執行
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey sk = iterator.next();
if (sk.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) sk.channel();
SocketChannel sc = server.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println("用戶端連接配接成功");
} else if (sk.isReadable()) {
SocketChannel sc = (SocketChannel) sk.channel();
ByteBuffer dst = ByteBuffer.allocate(1024);
int len = sc.read(dst);
if (len > 0) {
System.out.println("接收到消息: " + new String(dst.array()));
}
}
iterator.remove();
}
}
}
第一個終端 telnet localhost 9000
第二個終端 telnet localhost 9000
第一個終端 send a
第二個終端 seng b
第一個終端 send c
IDEA console如下:
!!! 這段代碼, 主要引入了一個在linux運作jdk時候, 就是EPOLL的相關操作. 服務端和連接配接進來的socket分别注冊了OPEN和READ事件, 沒有事件時會阻塞在select()方法裡, 當有相關事件就會進行接下來的代碼邏輯執行.
接下來我們重點看Selector的 open, register, select三個方法, 到底做了什麼東西.
// open方法 jdk源碼
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 重點create方法 這裡直接點進去 會根據下載下傳的jdk pc版本進入不同方法 我們直接去openjdk源碼裡看Epoll的實作
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
!!! 源碼分析
// open jdk源碼
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
// 到這個類裡去看
return new sun.nio.ch.PollSelectorProvider();
}
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
// return了一個實作類
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
// new了一個EPollArrayWrapper
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
} catch (Throwable t) {
try {
FileDispatcherImpl.closeIntFD(fd0);
} catch (IOException ioe0) {
t.addSuppressed(ioe0);
}
try {
FileDispatcherImpl.closeIntFD(fd1);
} catch (IOException ioe1) {
t.addSuppressed(ioe1);
}
throw t;
}
}
// EPollArrayWrapper裡
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
// 我們都知道linux裡 一切皆是檔案 是以建立了一個epoll
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
// 三個核心native方法
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
// 我們先拿epollCreate()舉例, 我們要去看c語言如何實作這個方法, 那麼拿類名加下劃線和方法名,
// EPollArrayWrapper_epollCreate
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
// 就是這個epoll_create linux中的方法 C語言可以直接調用它
// 在linux裡用man指令可以檢視相關方法的較長的描述
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
以上就是selector的open分析 jdk源碼 -> hotspot源碼 -> linux文法概要
register 對應 epollCtl, epollCtl相當于注冊一個事件
select 對應 epollWait, epollWait相當于綁定了這個事件 并且監聽
三. AIO
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
System.out.println("2--"+Thread.currentThread().getName());
// 再此接收用戶端連接配接,如果不寫這行代碼後面的用戶端連接配接連不上服務端
serverChannel.accept(attachment, this);
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
System.out.println("3--"+Thread.currentThread().getName());
buffer.flip();
System.out.println(new String(buffer.array(), 0, result));
socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
System.out.println("1--"+Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
啟動第一個終端執行 telnet localhost 9000和send a
啟動第二個終端執行 telnet localhost 9000和send b
IDEA console如下:
BIO, NIO在代碼案例中, 都是main線程去執行的, 而這個AIO就是線程如此多.
四. 總結
BIO, NIO, AIO主要的差別分為兩個阻塞與非阻塞, 同步與非同步
從第一個和第二個例子, 我們可以了解出阻塞與非阻塞的差別.
從第三個和第四個例子, 我們可以看出同步與非同步的差別.
1.4之後出現NIO, 先是采用linux的 select 後采用poll
1.5及之後采用了epoll的IO多路複用架構