æ¨èï¼ââJavaç½ç»ç¼ç¨æ±æ»ââ
Javaç½ç»ç¼ç¨-Socketç¼ç¨åæ¶åï¼NIO模åçç®æå¤äººè天室ï¼
ä»ä¹æ¯NIO模åå¢ï¼
ââJavaç½ç»ç¼ç¨-NIOåçââ
ä¹åçæ¬
ââJavaç½ç»ç¼ç¨-Socketç¼ç¨åæ¶ä¸ï¼ä¼ªå¼æ¥I/O模åçç®æå¤äººè天室ï¼ââ
æå¡å¨
è¿é使ç¨â
âChannelâ
ââãâ
âSelectorâ
ââãâ
âBufferâ
âæ¥å®ç°ä¸ä¸ªNIO模åçæå¡å¨ï¼æå¡å¨ç主è¦é»è¾å¦ä¸ï¼
- é¦å
å建â
ââï¼æå¡å¨ç®¡éï¼ï¼æ¿å°å ³äºè¯¥ââServerSocketChannelâ
ââçââServerSocketChannelâ
ââï¼å¹¶ä¸å°ââServerSocketâ
ââç»å®ç«¯å£ï¼æºç ä¸ä¹é´æ¥å®ç°äºçå¬ç«¯å£ï¼ä¹åççæ¬æ说è¿ï¼è¿éå°±ä¸åèµè¿°äºï¼ï¼åå建ââServerSocketâ
ââï¼å°å建çââSelectorâ
ââçââServerSocketChannelâ
ââäºä»¶æ³¨åå°ââACCEPTâ
âä¸ï¼ä¹å°±æ¯å½æ客æ·ç«¯è¿æ¥æå¡å¨æ¶è§¦åäºä»¶ï¼ãâSelectorâ
- å½æ客æ·ç«¯è¿æ¥åï¼å³â
ââçå¬å°ââSelectorâ
ââçââServerSocketChannelâ
ââäºä»¶è§¦åäºï¼ï¼å°±è·å该客æ·ç«¯çââACCEPTâ
ââï¼å¹¶å°ââSocketChannelâ
ââçââSocketChannelâ
ââäºä»¶æ³¨åå°ââREADâ
âä¸ï¼å½æ客æ·ç«¯åéæ¶æ¯ç»æå¡å¨æ¶è§¦åï¼ãâSelectorâ
- å½æ客æ·ç«¯åéæ¶æ¯ç»æå¡å¨æ¶ï¼æå¡å¨å°è¯¥æ¶æ¯è½¬åç»å
¶ä»ç¨æ·ï¼å 为å¨çº¿ç¨æ·çâ
ââé½å¨ââSocketChannelâ
ââä¸æ³¨åäºââSelectorâ
ââäºä»¶ï¼æ以æå¡å¨å¯ä»¥å¾æ¹ä¾¿çæ¿å°ç¨æ·çââREADâ
ââï¼ææå¨çº¿ç¨æ·ç管éï¼ï¼æ以æå¡å¨è½¬åæ¶æ¯å°±åå¾ååç®åäºï¼æåè¿è¦å¤æç¨æ·è¾å ¥çæ¶æ¯æ¯å¦æ¯è¦åå¤éåºï¼å¦æç¨æ·è¦éåºï¼å°±ç§»é¤è¯¥ç¨æ·ââSocketChannelâ
ââçââSocketChannelâ
ââäºä»¶ï¼å¹¶ä¸æéââREADâ
âãâSelectorâ
åBIO模åå伪å¼æ¥I/O模åå®ç°çæå¡å¨ï¼å½æ客æ·ç«¯è¿æ¥æå¡å¨åï¼æå¡å¨éè¦å建ä¸ä¸ªçº¿ç¨æ¥ä¸ç¨æ·è¿è¡éä¿¡ï¼å½å®¢æ·ç«¯è¿æ¥è¯·æ±å¹¶åæ°å¾å¤§æ¶ï¼æå¡å¨éè¦å建ç线ç¨å°±é常å¤äºï¼è¿æ ·å¸¦æ¥çä¸ä¸æåæ¢ãå åå¼éé½é常大ï¼å¹¶ä¸å建ç线ç¨è¿æ¯é»å¡å¼çï¼æ§è½è¾å·®ãèNIO模åï¼å®¢æ·ç«¯ä¸æå¡å¨éä¿¡æ¯éè¿â
âChannelâ
ââãâ
âBufferâ
ââï¼BIO模åå伪å¼æ¥I/O模åæ¯ä½¿ç¨â
âStreamâ
ââï¼ï¼å¹¶ä¸å½å®¢æ·ç«¯è¿æ¥æå¡å¨æ¶ï¼NIO模åä¸éè¦å建线ç¨ï¼åªéè¦å°ç¨æ·ç®¡éçâ
âREADâ
ââäºä»¶æ³¨åå°â
âSelectorâ
ââå³å¯ï¼ç¨æ·åéæ¶æ¯ç»æå¡å¨å°±ä¼è§¦å该äºä»¶ï¼èä¸éè¦å建线ç¨å»ä¸ç´é»å¡å¼ççå¬ç¨æ·åéè¿æ¥æ¶æ¯ï¼æäºâ
âSelectorâ
âï¼å®ç°éé»å¡éä¿¡å°±åå¾å¾ç®åäºã
代ç ä¸æ³¨éè¿æ¯æ¯è¾è¯¦ç»çï¼ç¸ä¿¡å¤§å®¶é½è½çæãå¦æ对â
âByteBufferâ
ââç读ãå模å¼ä¸å¤ªæï¼å¯ä»¥çä¸é¢è¿ç¯å客çç¸å ³é¨åã
â使ç¨JavaçIOä¸NIOæ¥Copyæ件çåç§æ¹æ³å®ç°ä»¥åæ§è½å¯¹æ¯ââ
package nio.chatroom.server;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
public class ChatServer {
// é»è®¤ç«¯å£
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
// ç¼å²åºå¤§å°
private static final int BUFFER = 1024;
private ServerSocketChannel server;
private Selector selector;
// 读å buffer
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
// åå
¥ buffer
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
private Charset charset = Charset.forName("UTF-8");
// èªå®ä¹ç«¯å£
private int port;
public ChatServer(){
this(DEFAULT_PORT);
}
public ChatServer(int port){
this.port = port;
}
public void start(){
try {
// æå¼ä¸ä¸ªServerSocket ç Channel
server = ServerSocketChannel.open();
// 设置éé»å¡
server.configureBlocking(false);
// å¾å°ä¸ä¸ªå
³äºè¯¥ServerSocketChannelçServerSocketã并ä¸ç»å®ç«¯å£
server.socket().bind(new InetSocketAddress(port));
// æå¼Selector
selector = Selector.open();
// æServerSocketChannel注åå°Selector ï¼ è®¾ç½®çå¬äºä»¶ä¸ºACCEPT
server.register(selector , SelectionKey.OP_ACCEPT);
System.out.println("å¯å¨æå¡å¨ï¼çå¬ç«¯å£ï¼"+ port +"...");
while(true){
// æ¬èº«æ¯é»å¡å¼è°ç¨
selector.select();
// 触åäºä»¶é
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for(SelectionKey key : selectionKeys){
// å¤ç被触åçäºä»¶
handles(key);
}
// å¤çå®æåï¼æå¨æ¸
空
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally{
close(selector);
}
}
private boolean readyToQuit(String msg){
return QUIT.equalsIgnoreCase(msg);
}
private synchronized void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private String receive(SocketChannel client) throws IOException {
// å模å¼
rBuffer.clear();
while((client.read(rBuffer)) > 0);
// 读模å¼
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}
private void forwardMessage(SocketChannel client , String fwdMsg) throws IOException {
for(SelectionKey key : selector.keys()){
Channel connectedClient = key.channel();
if(connectedClient instanceof ServerSocketChannel){
continue;
}
if(key.isValid() && !client.equals(connectedClient)){
// å模å¼
wBuffer.clear();
wBuffer.put(charset.encode(getClientName(client) +":"+fwdMsg));
// 读模å¼
wBuffer.flip();
while(wBuffer.hasRemaining()){
((SocketChannel) connectedClient).write(wBuffer);
}
}
}
}
private String getClientName(SocketChannel client){
return "客æ·ç«¯["+client.socket().getPort()+"]";
}
private void handles(SelectionKey key) throws IOException {
// ACCEPTäºä»¶ - å客æ·ç«¯å»ºç«äºè¿æ¥
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
// 设置æéé»å¡
client.configureBlocking(false);
client.register(selector , SelectionKey.OP_READ);
System.out.println(getClientName(client)+"å·²è¿æ¥");
}
// READäºä»¶ - 客æ·ç«¯åéäºæ¶æ¯
else if(key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
String fwdMsg = receive(client);
if(fwdMsg.isEmpty()){
// 客æ·ç«¯å¼å¸¸ , ä¸åçå¬è¿ä¸ªäºä»¶
key.cancel();
// æ´æ°çå¬äºä»¶ç¶æ
selector.wakeup();
} else{
forwardMessage(client , fwdMsg);
// æ£æ¥ç¨æ·æ¯å¦åå¤éåº
if(readyToQuit(fwdMsg)){
key.cancel();
selector.wakeup();
System.out.println(getClientName(client)+"å·²æå¼");
}
}
}
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.start();
}
}
客æ·ç«¯
å¦æçæäºæå¡å¨ç«¯ç代ç é»è¾ï¼å®¢æ·ç«¯ä»£ç é»è¾åºè¯¥å¾å®¹æçæã
- è·åä¸ä¸ªâ
ââï¼å¹¶ä¸è®¾ç½®æéé»å¡ï¼éè¦æå¨è®¾ç½®ï¼é»è®¤ä¸ºé»å¡ï¼ï¼åè·åä¸ä¸ªââSocketChannelâ
ââï¼å¹¶å°ââSelectorâ
ââçââSocketChannelâ
ââäºä»¶æ³¨åå°ââCONNECTâ
âä¸ï¼å®¢æ·ç«¯è¿æ¥æå¡å¨æ¶è§¦åï¼ãâSelectorâ
- å½æâ
ââçââSocketChannelâ
ââäºä»¶è§¦ååï¼ä¹å°±æå³çæ客æ·ç«¯è¿æ¥ä¸äºæå¡å¨ï¼æ以éè¦å建ä¸ä¸ªçº¿ç¨æ¥çå¬ç¨æ·çè¾å ¥ï¼è¿ä¸ªå®ç°ä¸éè¦åï¼ï¼å¹¶ä¸è¿éè¦å°ââCONNECTâ
ââçââSocketChannelâ
ââäºä»¶æ³¨åå°ââREADâ
âä¸(å½æå¡å¨è½¬åå ¶ä»ç¨æ·çæ¶æ¯æ¶è§¦åï¼è§¦ååï¼å®¢æ·ç«¯ç´æ¥æå°æå¡å¨è½¬åçæ¶æ¯å³å¯)ãâSelectorâ
package nio.chatroom.client;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Set;
public class ChatClient {
private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
private static final int DEFAULT_SERVER_PORT = 8888;
private static final int BUFFER = 1024;
private static final String QUIT = "quit";
private String host;
private int port;
private SocketChannel client;
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
private Selector selector;
private Charset charset = Charset.forName("UTF-8");
public ChatClient(){
this(DEFAULT_SERVER_HOST , DEFAULT_SERVER_PORT);
}
public ChatClient(String host , int port){
this.host = host;
this.port = port;
}
// æ£æ¥ç¨æ·æ¯å¦åå¤éåº
public boolean readyToQuit(String msg){
return QUIT.equalsIgnoreCase(msg);
}
public void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start(){
try {
client = SocketChannel.open();
// 设置éé»å¡
client.configureBlocking(false);
selector = Selector.open();
client.register(selector , SelectionKey.OP_CONNECT);
client.connect(new InetSocketAddress(host , port));
while(true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for(SelectionKey key : selectionKeys){
handles(key);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException e){
// ç¨æ·æ£å¸¸éåºï¼äº§ççå¼å¸¸
} finally {
close(selector);
}
}
private void handles(SelectionKey key) throws IOException {
// CONNECTäºä»¶ - è¿æ¥å°±ç»ªäºä»¶
if(key.isConnectable()){
SocketChannel client = (SocketChannel) key.channel();
if(client.isConnectionPending()){
client.finishConnect();
// å¤çç¨æ·çè¾å
¥
new Thread(new UserInputHandler(this)).start();
}
client.register(selector , SelectionKey.OP_READ);
}
// READäºä»¶ - æå¡å¨è½¬åæ¶æ¯
else if(key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
String msg = receive(client);
if(msg.isEmpty()){
// æå¡å¨å¼å¸¸
close(selector);
}
else{
System.out.println(msg);
}
}
}
private String receive(SocketChannel client) throws IOException {
// å模å¼
rBuffer.clear();
while(client.read(rBuffer) > 0);
// å模å¼
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}
public void send(String msg) throws IOException {
if(msg.isEmpty()){
return ;
}
// å模å¼
wBuffer.clear();
wBuffer.put(charset.encode(msg));
wBuffer.flip();
while(wBuffer.hasRemaining()){
client.write(wBuffer);
}
// æ£æ¥ç¨æ·æ¯å¦åå¤éåº
if(readyToQuit(msg)){
close(selector);
}
}
public static void main(String[] args) {
ChatClient client = new ChatClient();
client.start();
}
}
è¿ä¸ªç±»æ²¡ææ¹å¨ï¼å°±ä¸è¯´äºã
package nio.chatroom.client;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class UserInputHandler implements Runnable{
private ChatClient client;
public UserInputHandler(ChatClient client){
this.client = client;
}
@Override
public void run() {
try {
// çå¾
ç¨æ·è¾å
¥æ¶æ¯
BufferedReader consoleReader = new BufferedReader(
new InputStreamReader(System.in)
);
while(true){
String input = consoleReader.readLine();
// åæå¡å¨åéæ¶æ¯
client.send(input);
//æ£æ¥ç¨æ·æ¯å¦åå¤éåº
if(client.readyToQuit(input)){
break;
}
}
} catch (IOException e){
e.printStackTrace();
}
}
}
è¿éæ们便å®ç°äºä¸ä¸ªNIO模åçç®æå¤äººè天室ï¼å¤§å®¶å¯ä»¥å¨æè¯ä¸è¯ã
æµè¯