NIO架構Netty搭建簡易聊天室
Netty Reactor模型
單線程模型
使用者發起 IO 請求到 Reactor 線程
Ractor 線程将使用者的 IO 請求放入到通道,然後再進行後續處理
處理完成後,Reactor 線程重新獲得控制權,繼續其他用戶端的處理

缺點:
這種模型一個時間點隻有一個任務在執行,這個任務執行完了,再去執行下一個任務。
- 但單線程的 Reactor 模型每一個使用者事件都在一個線程中執行:
- 性能有極限,不能處理成百上千的事件
- 當負荷達到一定程度時,性能将會下降
- 某一個事件處理器發生故障,不能繼續處理其他事件
多線程模型
Reactor 多線程模型是由一組 NIO 線程來處理 IO 操作(之前是單個線程),是以在請求處理上會比上一中模型效率更高,可以處理更多的用戶端請求。
這種模式使用多個線程執行多個任務,任務可以同時執行
缺點:
但是如果并發仍然很大,Reactor 仍然無法處理大量的用戶端請求
主從多線程模型
這種線程模型是 Netty 推薦使用的線程模型
這種模型适用于高并發場景,一組線程池接收請求,一組線程池處理 IO
Netty聊天室案例搭建
後端部分
1.建立Maven項目引入netty依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.paic</groupId>
<artifactId>netty_chat</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
</dependencies>
</project>
2.建立netty伺服器
package com.paic.netty_chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @ProjectName: netty_chat
* @Package: com.paic.netty_chat
* @ClassName: WebsocketNettyServer
* @Author: Administrator
* @Description: ${description}
* @Date: 2019/5/7 21:59
* @Version: 1.0
*/
public class WebsocketNettyServer {
public static void main(String[] args) {
//建立兩個線程池
NioEventLoopGroup mainGrp = new NioEventLoopGroup();//主線程池
NioEventLoopGroup subGrp = new NioEventLoopGroup();//從線程池
try {
//建立Netty伺服器啟動對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化伺服器啟動對象
serverBootstrap
//指定使用上面建立的兩個線程池
.group(mainGrp,subGrp)
//指定Netty通道類型
.channel(NioServerSocketChannel.class)
//指定通道初始化器用來加載當Channel收到消息事件後,如何進行業務處理
.childHandler(new WebsocketChannelInitializer());
//綁定伺服器端口,啟動伺服器
ChannelFuture future = serverBootstrap.bind(9090).sync();
//等待伺服器關閉
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//關閉伺服器
mainGrp.shutdownGracefully();
subGrp.shutdownGracefully();
}
}
}
3.建立通道初始化器
package com.paic.netty_chat;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @ProjectName: netty_chat
* @Package: com.paic.netty_chat
* @ClassName: WebsocketChannelInitializer
* @Author: Administrator
* @Description: ${description}
* @Date: 2019/5/7 22:07
* @Version: 1.0
*/
/**
* 通道初始化器
* 用來加載通道處理器
*/
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
//擷取管道,将一個個的ChannelHandler添加到管道中
ChannelPipeline pipeline = ch.pipeline();
//添加一個http的編解碼器
pipeline.addLast(new HttpServerCodec());
//添加一個用于支援大資料流的支援
pipeline.addLast(new ChunkedWriteHandler());
//添加一個聚合器,這個聚合器主要講HttpMessage聚合成FullHttpRequest/Response
pipeline.addLast(new HttpObjectAggregator(1024*64));
//需要指定接收請求的路由
// 必須使用以ws字尾結尾的url才能通路
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//添加自定義的Handler
pipeline.addLast(new ChatHandler());
}
}
4.實作自定義Handler
package com.paic.netty_chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @ProjectName: netty_chat
* @Package: com.paic.netty_chat
* @ClassName: ChatHandler
* @Author: Administrator
* @Description: ${description}
* @Date: 2019/5/7 22:26
* @Version: 1.0
*/
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//用來儲存所有用戶端連接配接
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:MM");
//當channel中有新的消息會自動調用
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//當接收到資料後會自動調用
//擷取用戶端發送過來的文本消息
String text = msg.text();
System.out.println("接收到消息資料為:"+text);
for(Channel client:clients){
client.writeAndFlush(new TextWebSocketFrame(sdf.format(new Date())+":"+text));
}
}
//當有新的用戶端連接配接伺服器之後,會自動調用這個方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
clients.add(ctx.channel());
}
}
前端部分
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>線上聊天室</title>
</head>
<body>
<input type="text" id ="message">
<input type="button" value ="發送消息" onclick="sendMsg()">
接收到的消息
<p id="server_message" style="background-color: #AAAAAA"></p>
<script>
var websocket =null;
//判斷目前浏覽器是否支援websocket
if(window.WebSocket){
websocket = new WebSocket("ws://127.0.0.1:9090/ws");
websocket.onopen = function () {
console.log("建立連接配接");
}
websocket.onclose = function () {
console.log("斷開連接配接");
}
websocket.onmessage = function (e) {
console.log("接收到伺服器消息:"+e.data);
var server_message = document.getElementById("server_message");
server_message.innerHTML +=e.data+"<br/>";
}
} else {
alert("目前浏覽器不支援webSocket");
}
function sendMsg(){
var message = document.getElementById("message");
websocket.send(message.value);
}
</script>
</body>
</html>