推薦:Java網絡程式設計彙總
Java網絡程式設計-Socket程式設計初涉五(AIO模型的簡易用戶端-伺服器)
什麼是AIO模型?
Java新一代網絡程式設計模型AIO原理
功能
這裡實作一個很簡單的用戶端-伺服器,用戶端連接配接伺服器後,發送消息給伺服器,伺服器直接傳回該消息給用戶端即可,這隻是個嘗鮮版,以後也會用AIO模型來實作多人聊天室。
伺服器
伺服器的主要邏輯如下:
- 打開管道
,綁定、監聽端口(啟動伺服器)。AsynchronousServerSocketChannel
- 伺服器異步調用
(通過accept
的實作類CompletionHandler
來實作異步調用,因為有回調機制),使用AcceptHandler
來減少System.in.read()
循環中頻繁異步調用while
。accept
- 當有用戶端連接配接成功後,觸發回調函數
(completed
類),然後再異步調用一次AcceptHandler
,等待下一個用戶端連接配接成功時觸發(先存着);回調函數accept
會把completed
傳過來,調用AsynchronousSocketChannel
的AsynchronousSocketChannel
方法,來讀取用戶端發送過來的消息。read
- 将
的AsynchronousSocketChannel
和read
方法實作異步調用(通過write
的實作類CompletionHandler
來實作異步調用,因為有回調機制),為了厘清楚是ClientHandler
還是read
,write
傳一個attachment
(Map裡面記錄必要資料Map
、buffer
)。當讀取用戶端發送的消息後,再将該消息發送給用戶端(都是通過type
,是以将它定義為AsynchronousSocketChannel
類的屬性),即異步調用ClientHandler
,觸發回調函數(read
的回調函數),在回調函數裡面再異步調用read
,觸發回調函數(write
的回調函數),在回調函數裡面再異步調用write
。read
可能會看起來懵懵懂懂,可以結合代碼來了解,代碼有一定的注釋。
伺服器完整代碼:
package aio.test;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;
public class Server {
final String LOCALHOST = "localhost";
final int DEFAULT_PORT = 8888;
AsynchronousServerSocketChannel serverChannel;
private void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start(){
try {
// 打開通道
serverChannel = AsynchronousServerSocketChannel.open();
// 綁定、監聽端口
serverChannel.bind(new InetSocketAddress(DEFAULT_PORT));
System.out.println("啟動伺服器,監聽端口:"+DEFAULT_PORT+"...");
while(true){
// 異步調用
serverChannel.accept(null,new AcceptHandler());
// 不會頻繁調用accept的小技巧
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
} finally{
close(serverChannel);
}
}
private class AcceptHandler implements
CompletionHandler<AsynchronousSocketChannel , Object> {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
//此次調用accept完成,再一次調用accept,等待下一個用戶端連接配接
if(serverChannel.isOpen()) {
serverChannel.accept(null , this);
}
AsynchronousSocketChannel clientChannel = result;
if(clientChannel != null && clientChannel.isOpen()){
ClientHandler handler = new ClientHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Map<String , Object> info = new HashMap<>();
info.put("type", "read");
info.put("buffer" , buffer);
clientChannel.read(buffer , info, handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// 處理錯誤
}
}
private class ClientHandler implements
CompletionHandler<Integer , Object>{
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel channel){
this.clientChannel = channel;
}
@Override
public void completed(Integer result, Object attachment) {
Map<String , Object> info = (Map<String, Object>) attachment;
String type = (String) info.get("type");
if(type.equals("read")){
ByteBuffer buffer = (ByteBuffer) info.get("buffer");
// 讀模式
buffer.flip();
info.put("type" , "write");
clientChannel.write(buffer ,info , this);
// 寫模式(也相當于清空)
buffer.clear();
}
else if(type.equals("write")){
ByteBuffer buffer = ByteBuffer.allocate(1024);
info.put("type" , "read");
info.put("buffer" , buffer);
clientChannel.read(buffer , info , this);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// 處理錯誤
}
}
public static void main(String[] args) {
Server server = new Server();
server.start();
}
}
用戶端
為了讓大家了解其他實作異步調用的方法,用戶端使用
Future
來實作異步調用(正如該單詞的英文意思
将來
、
未來
)。
Future
實作簡單的異步調用,會簡單一點。
調用AIO模型元件的一些方法會傳回一個
Future
執行個體,再調用該執行個體的
get
方法會阻塞,等待該方法調用完成,阻塞便解除(具體看代碼吧,用戶端比較簡單,注釋還是比較全的)。
package aio.test;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Client {
final String LOCALHOST = "localhost";
final int DEFAULT_PORT = 8888;
AsynchronousSocketChannel clientChannel;
private void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start(){
try {
// 建立Channel
clientChannel = AsynchronousSocketChannel.open();
Future<Void> future = clientChannel.connect(
new InetSocketAddress(LOCALHOST , DEFAULT_PORT));
// 當未連接配接成功前,這裡是阻塞的
future.get();
// 等待使用者的輸入
BufferedReader consoleReader =
new BufferedReader(new InputStreamReader(System.in));
while(true){
String input = consoleReader.readLine();
byte[] inputBytes = input.getBytes();
// 得到buffer的模式是讀模式,可以Debug看一看
ByteBuffer buffer = ByteBuffer.wrap(inputBytes);
Future<Integer> writeResult = clientChannel.write(buffer);
// 等待成功寫入使用者管道
writeResult.get();
// 寫模式
buffer.clear();
Future<Integer> readResult = clientChannel.read(buffer);
// 等待成功讀取使用者管道
readResult.get();
String echo = new String(buffer.array());
System.out.println(echo);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
close(clientChannel);
}
}
public static void main(String[] args) {
Client client = new Client();
client.start();
}
}
測試
測試沒什麼問題。