æ¨èï¼ââJavaç½ç»ç¼ç¨æ±æ»ââ
Javaç½ç»ç¼ç¨-Socketç¼ç¨åæ¶å ï¼AIO模åçç®æå¤äººè天室ï¼
è¿ç¯å°±ä¸å ·ä½è®²è§£ä»£ç äºï¼æè§å¾æ注éåçå¤è¯¦ç»äºï¼ä¸è¿ï¼è¿æ¯å»ºè®®å çä¸é¢è¿ç¯å客ã
- ââJavaç½ç»ç¼ç¨-Socketç¼ç¨åæ¶äºï¼AIO模åçç®æ客æ·ç«¯-æå¡å¨ï¼ââ
AIO模åçå¤äººè天室ï¼åä¹åå®ç°çåºäºNIO模åã伪å¼æ¥IO模åãBIO模åå®ç°çå¤äººè天室ï¼å®ä»¬çä¸å¡æ¯ä¸æ ·çï¼åªä¸è¿ä½¿ç¨çç»ä»¶ä¸ä¸æ ·ã
- ââJavaç½ç»ç¼ç¨-Socketç¼ç¨åæ¶äºï¼åºäºBIO模åçç®æå¤äººè天室ï¼ââ
- ââJavaç½ç»ç¼ç¨-Socketç¼ç¨åæ¶ä¸ï¼ä¼ªå¼æ¥I/O模åçç®æå¤äººè天室ï¼ââ
- ââJavaç½ç»ç¼ç¨-Socketç¼ç¨åæ¶åï¼NIO模åçç®æå¤äººè天室ï¼ââ
æå¡å¨
å®æ´ä»£ç ï¼
package aio.chatroom;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ChatServer {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private static final int THREADPOOL_SIZE = 8;
private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverChannel;
private List<ClientHandler> connectedClients;
private Charset charset = Charset.forName("UTF-8");
private int port;
public ChatServer(){
this(DEFAULT_PORT);
}
public ChatServer(int port){
this.port = port;
this.connectedClients = new ArrayList<>();
}
private boolean readyToQuit(String msg){
return QUIT.equalsIgnoreCase(msg);
}
private void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel , Object> {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
// çå¾
ä¸ä¸ä¸ªå®¢æ·ç«¯çè¿æ¥
if(serverChannel.isOpen()){
serverChannel.accept(null , this);
}
if(clientChannel != null && clientChannel.isOpen()){
// 为æ¯ä¸ä¸ªç¨æ·åé
ä¸ä¸ªhandlerï¼å¹¶ä¸è¿ä¸ªhandlerä¹ç¸å½äºç¨æ·æ¬èº«
ClientHandler handler = new ClientHandler(clientChannel);
//å°ç¨æ·æ·»å å°å¨çº¿ç¨æ·å表
addClient(handler);
// å建ç¼å²åº
ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
/**
* 第ä¸ä¸ªbufferï¼æ¯è¦åå
¥çç¼å²åºã
* 第äºä¸ªbufferï¼æ¯å½readå®æåï¼
* æ¤æ¶bufferæ¯ææ°æ®çï¼
* å°è¿ä¸ªbufferå为attachmentã
* */
// 读å客æ·ç«¯åéçæ¶æ¯
clientChannel.read(buffer , buffer,handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客æ·ç«¯è¿æ¥å¤±è´¥ï¼"+ exc);
}
}
private class ClientHandler implements CompletionHandler<Integer , Object>{
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void completed(Integer result, Object attachment) {
ByteBuffer buffer = (ByteBuffer) attachment;
// 读å客æ·ç«¯åéçæ¶æ¯å®æï¼attachmentä¸ä¼ä¸ºnull
if(buffer != null){
// 客æ·ç«¯å¼å¸¸
if(result <= 0){
// å°å®¢æ·ç§»é¤åºå¨çº¿å®¢æ·å表
removeClient(this);
} else{
// 读模å¼
buffer.flip();
String msg = receive(buffer);
String clientName = getClientName(clientChannel);
String fwdMsg = clientName + ":" + msg;
System.out.println(fwdMsg);
// 转åæ¶æ¯ç»å
¶ä»ç¨æ·
forwardMessage(clientChannel , fwdMsg);
// å模å¼
buffer.clear();
if (readyToQuit(msg)){
// ç¨æ·éåº
removeClient(this);
} else{
// 继ç»è¯»å客æ·ç«¯åéçæ¶æ¯ï¼ä¸æ³¢æ¥ä¸æ³¢çæè§ï¼
clientChannel.read(buffer , buffer , this);
}
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// å
ç®åå¤ç为客æ·ç«¯å¼å¸¸ï¼ç§»é¤è¯¥å®¢æ·å³å¯
removeClient(this);
}
}
private String getClientName(AsynchronousSocketChannel clientChannel) {
String clientName = null;
try {
clientName = "客æ·ç«¯["+((InetSocketAddress) clientChannel.getRemoteAddress()).getPort()+"]";
} catch (IOException e) {
e.printStackTrace();
}
return clientName;
}
private String receive(ByteBuffer buffer) {
return String.valueOf(charset.decode(buffer));
}
private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {
ByteBuffer buffer = null;
for(ClientHandler clientHandler : connectedClients){
// 转åç»å
¶ä»çç¨æ·
if(clientHandler.clientChannel != clientChannel){
buffer = charset.encode(fwdMsg);
clientHandler.clientChannel.write(buffer , null , clientHandler);
buffer.clear();
}
}
}
private synchronized void addClient(ClientHandler clientHandler) {
// å°è¿æ¥æåçç¨æ·ä¸çº¿
connectedClients.add(clientHandler);
System.out.println(getClientName(clientHandler.clientChannel)+"è¿æ¥æå");
}
private synchronized void removeClient(ClientHandler clientHandler) {
// 移é¤ç¨æ·
connectedClients.remove(clientHandler);
System.out.println(getClientName(clientHandler.clientChannel)+"å·²éåº");
// å
³éèµæº
close(clientHandler.clientChannel);
}
private void start(){
try {
// å建线ç¨æ±
ExecutorService executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE);
// å建ChannelGroup
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
// æå¼ç®¡é , 并ä¸è®©ç®¡éå å
¥æ们å建çChannelGroup
serverChannel = AsynchronousServerSocketChannel.open(channelGroup);
// ç»å®ãçå¬ç«¯å£
serverChannel.bind(new InetSocketAddress(LOCALHOST , port));
System.out.println("å¯å¨æå¡å¨ï¼çå¬ç«¯å£ï¼"+ port);
while(true){
// å¼æ¥è°ç¨ï¼ æ们ä¸éè¦ä¼ ç»åè°å½æ°å
¶ä»ä¿¡æ¯ï¼æ以 attachment为null
serverChannel.accept(null , new AcceptHandler());
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
} finally{
close(serverChannel);
}
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.start();
}
}
package aio.chatroom;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
public class ChatClient {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private AsynchronousSocketChannel clientChannel;
private RWHandler handler;
private Charset charset = Charset.forName("UTF-8");
private String host;
private int port;
public ChatClient(){
this(LOCALHOST , DEFAULT_PORT);
}
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public boolean readyToQuit(String msg){
boolean flag = QUIT.equalsIgnoreCase(msg);
if(flag){
close(clientChannel);
}
return flag;
}
public synchronized void send(String msg){
ByteBuffer buffer = charset.encode(msg);
clientChannel.write(buffer , null , handler);
buffer.clear();
}
private synchronized void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ConnectHandler implements CompletionHandler<Void , Object> {
private ChatClient client;
public ConnectHandler(ChatClient client) {
this.client = client;
}
@Override
public void completed(Void result, Object attachment) {
handler = new RWHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
// å¼æ¥è°ç¨read ï¼ å½æå¡å¨ææ¶æ¯è½¬åç»è¯¥ç¨æ·ï¼ä¾¿è§¦ååè°å½æ°
clientChannel.read(buffer , buffer , handler);
// å建线ç¨çå¬ç¨æ·è¾å
¥ä¿¡æ¯
new Thread(new UserInputHandler(client)).start();
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客æ·ç«¯è¿æ¥å¤±è´¥");
}
}
private class RWHandler implements CompletionHandler<Integer , Object>{
private AsynchronousSocketChannel clientChannel;
public RWHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void completed(Integer result, Object attachment) {
ByteBuffer buffer = (ByteBuffer) attachment;
// 读åæå¡å¨è½¬åçæ¶æ¯æå
if(buffer != null){
// 读åçæ¶æ¯ææ
if(result > 0){
// 读模å¼
buffer.flip();
String msg = String.valueOf(charset.decode(buffer));
System.out.println(msg);
// å模å¼
buffer.clear();
// åå¼æ¥è°ç¨readï¼ç¸å½äºä¸ç´å¨çå¬æå¡å¨æ¯å¦è½¬åæ¶æ¯è¿æ¥ï¼å¼å¸¸é¤å¤ï¼
clientChannel.read(buffer , buffer , this);
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// ç®åå¤ç为客æ·ç«¯ä¸æå¡å¨æå¼è¿æ¥
close(clientChannel);
}
}
private void start(){
try {
// æå¼ç®¡é
clientChannel = AsynchronousSocketChannel.open();
// å¼æ¥è°ç¨connectè¿æ¥æå¡å¨
clientChannel.connect(new InetSocketAddress(host , port) , null , new ConnectHandler(this));
while(clientChannel.isOpen()){
// è¿é没æ³å°å¥½æ¹æ³æ¥æ¿ä»£è¿ä¸ªå¾ªå
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(clientChannel);
}
}
public static void main(String[] args) {
ChatClient client = new ChatClient();
client.start();
}
}
package aio.chatroom;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class UserInputHandler implements Runnable{
private ChatClient client;
public UserInputHandler(ChatClient client){
this.client = client;
}
@Override
public void run() {
try {
// çå¾
ç¨æ·è¾å
¥æ¶æ¯
BufferedReader consoleReader = new BufferedReader(
new InputStreamReader(System.in)
);
while(true){
String input = consoleReader.readLine();
// åæå¡å¨åéæ¶æ¯
client.send(input);
//æ£æ¥ç¨æ·æ¯å¦åå¤éåº
if(client.readyToQuit(input)){
break;
}
}
} catch (IOException e){
e.printStackTrace();
}
}
}