文章目錄
-
- 服務端
- 用戶端
服務端
初始化一個ServerSocketChannel,綁定端口,然後使用Selector監聽accept事件。
當有accept發生時,表示有用戶端連接配接進來了,擷取用戶端的SocketChannel,然後注冊其read事件;用來接收用戶端發送的消息。
package chatroom;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 服務端
*
* @author wenei
* @date 2021-07-20 20:36
*/
public class Server {
private static final Logger log = Logger.getLogger(Server.class.getName());
private int port;
private List<SocketChannel> clientChannelList = new ArrayList<>();
public Server(int port) {
this.port = port;
}
public void start() throws IOException {
// 初始化服務端channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// 建立Selector
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
final int selectCount = selector.select();
if (selectCount <= 0) {
continue;
}
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 當有accept事件時,将新的連接配接加入Selector
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel accept = serverChannel.accept();
accept.configureBlocking(false);
clientChannelList.add(accept);
accept.register(selector, SelectionKey.OP_READ);
log.log(Level.INFO, "新連接配接 " + accept);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
log.log(Level.INFO, "可讀連接配接 " + socketChannel);
ByteBuffer buffer = ByteBuffer.allocate(60);
try {
/**
* 當用戶端非正常退出時,read抛出異常,屬于被動性關閉;
* 當用戶端正常傳回時,傳回-1,但也是readable信号,是以需要處理
*/
final int read = socketChannel.read(buffer);
if (read == -1) {
log.log(Level.INFO, "連接配接主動關閉:" + socketChannel);
clientChannelList.remove(socketChannel);
socketChannel.close();
continue;
}
} catch (IOException e) {
log.log(Level.INFO, "連接配接被動關閉:" + socketChannel);
clientChannelList.remove(socketChannel);
socketChannel.close();
continue;
}
buffer.flip();
byte[] bytes = new byte[60];
int index = 0;
while (buffer.hasRemaining()) {
bytes[index++] = buffer.get();
}
bytes[index] = '\0';
log.log(Level.INFO, "接受資料: " + new String(bytes, StandardCharsets.UTF_8).trim());
// 廣播
clientChannelList.forEach(channel -> {
if (channel != socketChannel) {
buffer.flip();
try {
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
});
// buffer.clear();
}
}
}
}
public static void main(String[] args) throws IOException {
new Server(10022).start();
}
}
用戶端
使用主線程擷取鍵盤輸入,然後傳給服務端。
使用子線程接收服務端發送的資訊并顯示。
package chatroom;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* 用戶端
*
* @author wenei
* @date 2021-07-21 9:14
*/
public class Client {
/**
* 用戶端接收資訊線程
*/
static class ClientReceiveThread implements Runnable {
/**
* 用戶端socket
*/
private SocketChannel socketChannel;
public ClientReceiveThread(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
while (true) {
final int selectCount = selector.select(100);
if (Thread.currentThread().isInterrupted()) {
System.out.println("連接配接關閉");
socketChannel.close();
return;
}
if (selectCount <= 0) {
continue;
}
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer recvBuffer = ByteBuffer.allocate(60);
socketChannel.read(recvBuffer);
recvBuffer.flip();
byte[] bytes = new byte[60];
int index = 0;
while (recvBuffer.hasRemaining()) {
bytes[index++] = recvBuffer.get();
}
bytes[index] = '\0';
System.out.println("接受資料: " + new String(bytes, StandardCharsets.UTF_8).trim());
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private int port;
public Client(int port) {
this.port = port;
}
public void start() throws IOException {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(port));
socketChannel.configureBlocking(false);
Scanner scanner = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(60);
Thread thread = new Thread(new ClientReceiveThread(socketChannel));
thread.start();
while (true) {
String data = scanner.nextLine();
if (data.equals("exit")) {
break;
}
System.out.println("輸入資料:" + data);
buffer.put(data.getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
thread.interrupt();
}
public static void main(String[] args) throws IOException {
new Client(10022).start();
}
}