IO-nioåçå®ç°è天室
æç« ç®å½
- IO-nioåçå®ç°è天室
-
- æ¦è¿°
- è°ç¨å¾
- Selectorçkeys()åselectedKeys()åæ
- ç°å¨æç¹æ³ä¸ä»£ç äº
-
- Server端
- Client
- 循ç¯å©ç¨Buffer讲解
- è¿è¡å¾ç
- 项ç®å°å
- æ»ç»
æ¦è¿°
å ¬å¸ä½¿ç¨äºnettyæ¡æ¶åäºä¸ä¸ªå¨çº¿é讯çåºç¡æ¡æ¶ï¼å®¢æ·éè¦å¨çº¿ååäºè¿è¡äº¤æµï¼å¨è¿éæéè¿åçNIO apiåäºä¸ä¸ªç®æçèå¤©æ¡æ¶ãä¸é´éå°äºå¾å¤é®é¢ï¼å¥½å¨æåé½è§£å³äºãå¨è¿éå䏿¬¡è®°å½ï¼æå ±åã
è°ç¨å¾

è°ç¨é¡ºåº
- 注æï¼Selectoråªè½ç®¡çéé»å¡çChannel
- ServerSocketChannelå¼å¯Socketç嬿å¡
- å建ä¸ä¸ªSelector
- ServerSocketChannelåSelector䏿³¨åï¼å¹¶è¡¨ææå ´è¶£çäºä»¶ï¼æå¡ç«¯æ³¨åçä¸è¬æ¯OP_ACCEPTäºä»¶ã
- 轮询询é®Selectoræ¯å¦ææ°çäºä»¶(SelectionKey)è¿æ¥ã
- 客æ·ç«¯è¿æ¥ä¸æå¡ç«¯ï¼TCP/IPï¼
- æå¡ç«¯çSelectorä¼ååºä¸ä¸ªOP_ACCEPTäºä»¶ï¼è¡¨ææå®¢æ·ç«¯è¿æ¥ä¸äºæå¡ç«¯ï¼éè¦æå¡ç«¯åå¤ç
- æå¡ç«¯æ¥æ¶å°å®¢æ·ç«¯çä¿¡æ¯ï¼ä¼å建ä¸ä¸ªSocketChannelï¼è¯¥ééå°±æ¯å客æ·ç«¯è¿è¡éä¿¡çï¼æ¤åå´ç»ç该ééè¿è¡readådispatchæä½å³å¯ã
- æå¡ç«¯éè¦å°ä¸å®¢æ·ç«¯å»ºç«çééSocketChannelæç®¡è³Selectorï¼å¹¶è¡¨æå¯¹OP_READäºä»¶æå ´è¶£ï¼å®¢æ·ç«¯ç»ç±ééåéæ¶æ¯è³æå¡ç«¯ï¼Selectorä¼åºåä¸ä¸ªOP_READäºä»¶ã
- 卿¥æ¶å°OP_READæ¶é´åï¼æ¤åéè¿Bufferæ¥è¿è¡è¯»æä½ï¼è¿ééç¨DirectBufferæ¥åå°æ·è´æ¬¡æ°ã
- å¨è¯»å宿¯åï¼æä»¬éè¦å¯¹æ¶æ¯åååï¼ééå°æ¶æ¯åéè³å®¢æ·ç«¯ãå°æ¤å°±å®æäºä¸æ¬¡ååºã
Selectorçkeys()åselectedKeys()åæ
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
//齿¯HashSetçå®ä¾
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
//1.8çJDKèµ°ä¸é¢è¿ä¸ªåæ¯
//ä¸å¯ä¿®æ¹ï¼å¢ãå ãæ¹ï¼
this.publicKeys = Collections.unmodifiableSet(this.keys);
//ä¸å¯å¢å ï¼ä½æ¯å¯ç§»é¤
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
- keys()å³selectedKeysï¼ä¸ä¸ªæ¯æ³¨åè¿ Selectorçééï¼æ¯ä¸å¯ä»¥ä¿®æ¹çï¼å¯¹æ¤Setçä¿®æ¹æ¯æ ¹æ®ç³»ç»éç¥æ¥çï¼å®¢æ·ç«¯ç¦»çº¿ï¼å³è¿æ¥æå¼ï¼é£ä¹å¨ä¸ä¸æ¬¡ç³»ç»å¤éSelectorçæ¶åï¼ä¼æ´æ°æ¤Setã
- **keys()**å³keysï¼è¯¥Setå®¹çº³çæ¯äºä»¶éåï¼ç³»ç»åéè³Selectorçæ¶é´å卿¤éåï¼ä¸è¬èè¨ï¼æ¶è´¹è¿çäºä»¶éè¦ç§»é¤ï¼ä¸ç§»é¤çè¯ï¼è½®è¯¢å¯¼è´æ¶æ¯ä¼é夿¶è´¹ãæ£ç¡®åæ³ï¼ä¸è¬æ¯å¨æ¶è´¹ä¹åå ç§»é¤è¯¥Keyï¼ç¶åè¿è¡æ¶æ¯dispatchï¼å¦ææ å¿æ¶æ¯å¤ç失败ï¼å¯ä»¥ååæ»æä½ï¼æ¬æ¬¡æ²¡æåè¿ä¸ªå¤çï¼ã
ç°å¨æç¹æ³ä¸ä»£ç äº
Server端
package com.yzz.nio.chatroom.server;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* describe:
* E-mail:[email protected] date:2019/1/15
*
* @Since 0.0.1
*/
public class ChatRoomServer implements Runnable {
//éæ©å¨ï¼è´è´£çå¬å®¢æ·ç«¯æ¶æ¯
private Selector selector;
//å°å
private SocketAddress socketAddress;
//æå¡ç«¯Channel
private ServerSocketChannel serverSocketChannel;
//é»è®¤ç«¯å£å·
public static final int DEFAULT_PORT = 6633;
//log4j
private Logger log = Logger.getLogger(this.getClass());
//éè¿è¿æ¥æ± å建çä¸ä¸ªå线ç¨ï¼è´è´£å»å¤çæå¡ç«¯ä¸å¡
private ExecutorService singlePool = Executors.newSingleThreadExecutor();
/**
* ç§ææé ï¼å¤çéè¿startå¯å¨æå¡
*
* @param port 端å£å·
* @throws IOException
*/
private ChatRoomServer(int port) throws IOException {
socketAddress = new InetSocketAddress(port);
//è·åServerSocketChannelééå®ä¾ï¼è¿éä»
ç¸å½äºæå¼ä¸ä¸ªéé
serverSocketChannel = ServerSocketChannel.open();
//ç»å®å°åï¼è¿ä¸æ¥å®æä¹åï¼ç«¯å£å
¶å®å·²ç»å¯ä»¥è¢«è®¿é®äº
serverSocketChannel.bind(socketAddress);
//éé»å¡ï¼Selectoråªæ¥åéé»å¡ç对象
serverSocketChannel.configureBlocking(false);
//éæ©å¨å¼å§æä¾æå¡
selector = Selector.open();
//å°ä¼æå¡ç«¯ééæ³¨åè¿Selectorï¼å¹¶å¯¹OP_ACCEPTæå
´è¶£
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
log.info("chat room has started listening " + port);
}
/**
* UnmodifiableSet åªè¯» selector.keys()
* ä¸å¯growçSetï¼selector.selectedKeys()
*
* @throws IOException
*/
@Override
public void run() {
try {
//轮询çå¬
listening();
} catch (Exception e) {
e.printStackTrace();
} finally {
//åæ¶èµæº
try {
//å
³éæå¡ç«¯éé
if (serverSocketChannel != null) serverSocketChannel.close();
//å
³ééæ©å¨
if (null != selector) selector.close();
//å
³é线ç¨
singlePool.shutdownNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void listening() throws IOException {
//è¿éå¿
é¡»è¦è½®è¯¢è·åSelectoréé¢çäºä»¶
while (true) {
int waiter = 0;
//ç®åçå¾
被å¤ççäºä»¶ä¸ªæ°
waiter = selector.select();
if (waiter == 0) continue;
//ungrowableSet,ä¸è½å¢å æ°çå
ç´
Set<SelectionKey> keys = selector.selectedKeys();
//è·åè¿ä»£å¨
Iterator<SelectionKey> set = keys.iterator();
while (set.hasNext()) {
SelectionKey key = set.next();
//æä»¥ï¼åé¢å¨çº¿äººæ°çç»è®¡è¦æ³¨æ
set.remove();
//æä¾è½¬åæå¡
dispatch(key);
}
}
}
public void dispatch(SelectionKey key) {
SocketChannel clientChannel = null;
try {
switch (key.interestOps()) {
//è¿éæ¯æå¡ç«¯å¨æ¥æ¶å°å®¢æ·ç«¯è¿æ¥çæ¶å触åçäºä»¶
case SelectionKey.OP_ACCEPT:
//æ¥æ¶å®¢æ·ç«¯çè¿æ¥è¯·æ±ï¼å¹¶å»ºç«å客æ·ç«¯çè¿æ¥
clientChannel = serverSocketChannel.accept();
//设置éé»å¡(selector åªå¯ä»¥æ¥æ¶éé»å¡çChannel)
clientChannel.configureBlocking(false);
//å°å客æ·ç«¯äº¤äºçééæ³¨åå°Selectorï¼å¯¹OP_READæå
´è¶£
clientChannel.register(selector, SelectionKey.OP_READ);
log.info("ä¸çº¿æé," + "å½åå¨çº¿äººæ°ï¼" + (selector.keys().size() - 1) + "人");
break;
case SelectionKey.OP_READ:
//éè¿è¯»å客æ·ç«¯åéè¿æ¥çæ¶æ¯ï¼å¨è¿è¡è½¬å
clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = receiveMsg(clientChannel);
dispatchMsg(clientChannel, byteBuffer);
break;
}
} catch (IOException e) {
log.warn(e.getMessage());
//ç§»é¤å½åkeyï¼è¿éç§»é¤äºå¯ä»¥ï¼ä½æ¯keysä¸ä¼ç«å³å»ç§»é¤ï¼åªæSelectorè¢«åæ¬¡å¤éçæ¶åæä¼è¢«ç§»é¤
key.cancel();
log.info("ä¸çº¿æé," + "å½åå¨çº¿äººæ°ï¼" + (selector.keys().size() - 2) + "人");
}
}
/**
* 读客æ·ç«¯çæ¶æ¯
* è¿éè§å®æå¡ç«¯åéç»å®¢æ·ç«¯ç第ä¸ä¸ªåèæ¯æ è¯æ¯å¦æ¯èªèº«åéçæ¶æ¯
* 1æ è¯æ¬äºº 0æ è¯å
¶ä»äºº
*
* @param client
* @return
* @throws IOException
*/
public ByteBuffer receiveMsg(SocketChannel client) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
client.read(byteBuffer);
return byteBuffer;
}
/**
* è½¬åæ¶æ¯
*
* @param client
* @param byteBuffer
* @throws IOException
*/
public void dispatchMsg(SocketChannel client, ByteBuffer byteBuffer) throws IOException {
//ä¸å¯è¢«ä¿®æ¹
Set<SelectionKey> keys = selector.keys();
//è¿ééè¦è½¬æ¢ç¶æä¸ºå position = 0ï¼limit=1+message.length
byteBuffer.flip();
for (SelectionKey key : keys) {
SelectableChannel channel = key.channel();
//夿æ¯ä¸æ¯SocketChannelï¼è½¬åæ¯é对客æ·ç«¯ééèè¨ç
if (channel != null && channel instanceof SocketChannel) {
SocketChannel targetClient = (SocketChannel) channel;
//å¦ææ¯æ¬äººï¼éè¦å°ç¬¬ä¸ä½æ¹ä¸ºbyte 0
//åæ¶æ¯ è¿æ¶åposition=0ï¼limit=1+message.length
if (channel == client) {
byteBuffer.put(0, (byte) 0);
} else {
//æ å¿ä½ 1代表æ¬äººï¼0代表å
¶ä»äºº æ¤æ¶ position=1 limit=128
byteBuffer.put(0, (byte) 1);
}
targetClient.write(byteBuffer);
//éå¤ä½¿ç¨
byteBuffer.position(0);
}
}
}
/**
* 使ç¨é»è®¤ç«¯å£
* @return
*/
public static ChatRoomServer start() {
return start(DEFAULT_PORT);
}
/**
* 使ç¨èªå®ä¹ç«¯å£
* @param port
* @return
*/
public static ChatRoomServer start(int port) {
ChatRoomServer chartRoomServer = null;
try {
chartRoomServer = new ChatRoomServer(port);
chartRoomServer.singlePool.execute(chartRoomServer);
} catch (IOException e) {
e.printStackTrace();
}
return chartRoomServer;
}
}
Client
package com.yzz.nio.chatroom.client;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* describe:
* E-mail:[email protected] date:2019/1/15
*
* @Since 0.0.1
*/
public class ChatRoomClient {
private Charset charset = Charset.forName("UTF-8");
private SocketChannel socketChannel;
private Scanner scanner = new Scanner(System.in);
private ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
private Logger log = Logger.getLogger(ChatRoomClient.class);
private ExecutorService pool = Executors.newFixedThreadPool(2);
private volatile boolean stop;
private ChatRoomClient(String host, int port) throws IOException {
SocketAddress socketAddress = new InetSocketAddress(host, port);
socketChannel = SocketChannel.open(socketAddress);
log.info("client has connected remote " + host + ":" + port);
}
public void write() {
try {
while (!stop) {
String msg = scanner.next();
ByteBuffer b = charset.encode(msg);
socketChannel.write(b);
scanner.reset();
}
} catch (IOException e) {
e.printStackTrace();
clear();
}
}
public void read() {
try {
while (!stop) {
//é»å¡
socketChannel.read(byteBuffer);
//å
byteBuffer.flip();
byte tag = byteBuffer.get();
CharBuffer charBuffer = charset.decode(byteBuffer);
if (tag == 0) {
System.out.println(charBuffer + "(æ¬äºº)");
} else {
System.out.println(charBuffer);
}
byteBuffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
clear();
}
}
/**
* éæ¾èµæº
*/
private void clear() {
try {
stop = true;
if (null != socketChannel) socketChannel.close();
if (scanner != null) scanner.close();
if (byteBuffer != null) byteBuffer.clear();
if (pool != null && !pool.isShutdown()) pool.shutdownNow();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* å¯å¨ï¼éè¿ä¸¤ä¸ªçº¿ç¨å»æå¡è¯»å
* @param host
* @param port
*/
public static void start(String host, int port) {
try {
ChatRoomClient chatRoomClient = new ChatRoomClient(host, port);
chatRoomClient.pool.execute(() -> {
chatRoomClient.read();
});
chatRoomClient.pool.execute(() -> {
chatRoomClient.write();
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
循ç¯å©ç¨Buffer讲解
/**
* 读客æ·ç«¯çæ¶æ¯
* è¿éè§å®æå¡ç«¯åéç»å®¢æ·ç«¯ç第ä¸ä¸ªåèæ¯æ è¯æ¯å¦æ¯èªèº«åéçæ¶æ¯
* 1æ è¯æ¬äºº 0æ è¯å
¶ä»äºº
*
* @param client
* @return
* @throws IOException
*/
public ByteBuffer receiveMsg(SocketChannel client) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
byteBuffer.put((byte) 0);
client.read(byteBuffer);
return byteBuffer;
}
/**
* è½¬åæ¶æ¯
* æ å¿ä½ 1代表æ¬äººï¼0代表å
¶ä»äºº
*
* @param client
* @param byteBuffer
* @throws IOException
*/
public void dispatchMsg(SocketChannel client, ByteBuffer byteBuffer) throws IOException {
//ä¸å¯è¢«ä¿®æ¹
Set<SelectionKey> keys = selector.keys();
//è¿ééè¦è½¬æ¢ç¶æä¸ºè¯» position = 0ï¼limit=message.length
byteBuffer.flip();
for (SelectionKey key : keys) {
SelectableChannel channel = key.channel();
//夿æ¯ä¸æ¯SocketChannelï¼è½¬åæ¯é对客æ·ç«¯ééèè¨ç
if (channel != null && channel instanceof SocketChannel) {
SocketChannel targetClient = (SocketChannel) channel;
//å¦ææ¯æ¬äººï¼éè¦å°ç¬¬ä¸ä½æ¹ä¸ºbyte 0
//åæ¶æ¯ è¿æ¶åposition=0ï¼limit=1+message.length
if (channel == client) {
//æ å¿ä½ 1代表æ¬äººï¼0代表å
¶ä»äºº æ¤æ¶ position=1 limit=128
byteBuffer.put(0, (byte) 1);
}else {
byteBuffer.put(0, (byte) 0);
}
targetClient.write(byteBuffer);
//éå¤ä½¿ç¨
byteBuffer.position(0);
}
}
}
æ¶åºå¾
é¦å æå¨è¿éè§å®bufferç第ä¸ä¸ªåèåæ¾çæ¯tagï¼00000001表示æ¬äººï¼00000000è¡¨ç¤ºå ¶ä»äººï¼è¿æ ·å¨å®¢æ·ç«¯å¯ä»¥åºååºæ¬äººåå ¶ä»äººçæ¶æ¯ï¼éè¿äººä¸ºè®¾ç½®position忰彿¶æºçflipï¼å¨éå¤è°ç¨çè¿ç¨ä¸ï¼éå¤å©ç¨äºè¿ä¸ªDirectBufferï¼ä»èæé«äºæ§è½ï¼åå°äºå åçå¼éã
è¿è¡å¾ç
项ç®å°å
https://github.com/yinzhongzheng/nio-chatroom
æ»ç»
å¨åå®è¿ä¸ªç®æçå°ä¸è¥¿åï¼æ´å æ·±å ¥çè§£äºNIOçè¿è¡æµç¨ï¼åç¼ç éè¦æ³¨æçä¸äºé®é¢ï¼æ¬¢è¿å¤§å®¶æé®ã