為什麼使用netty
1.netty傳輸速度快
零拷貝
具體來講,如果要從IO中讀取資料,分為兩個步驟:
(1)從IO流中讀取出來放到緩沖區,程式從緩沖區中讀取,再放到堆中,此時資料就會被拷貝兩次才能到達堆或者堆記憶體中。如果資料量很大,那麼就會造成資源的浪費
(2)Netty其實就是利用NIO中的零拷貝特性,當Netty需要接收資料或者傳輸資料的時候,就會新開辟一塊堆記憶體,然後資料就直接通過IO讀取到了新開辟的堆記憶體中,這樣也就加快了資料傳輸的速度。
2.異步非阻塞的IO
線程在通路某一個資源的時候,該資源是否準備就緒的一種處理方式,如果說該資源目前沒準備就緒,這個時候就會有兩種處理方式:阻塞與非阻塞,netty解決了NIO目前發現的所有bug。
(1)阻塞:這個線程會一直持續等待這個資源就緒并處理完畢,直到響應傳回一個結果,這個時候線程是一直阻塞狀态,不可以去做任何事情
(2)非阻塞:這個線程直接傳回結果,不會持續等待這個資源處理完畢才響應,它會去請求别的資源。
同步與異步
這裡的 “同步與異步” 指的是通路資料的一種機制,類似于Ajax。
(1)同步:主動請求,并且會等待IO操作完成之後,IO會有一個通知
(2)異步:當一個線程主動請求資料之後,可以繼續處理其他任務,發起其他請求,多個請求完成之後再逐一的通過異步形式
3.支援高并發,Netty的三種線程模型
1.Reactor線程模型:
(1)單線程模型:所有的IO操作都由同一個NIO線程處理,僅限于一些小型應用場景。但在高負載、高并發等情況下使用單線程肯定就不太合理,主要是因為NIO的一個線程同時要去處理成千上萬的請求 的時候,在性能上會支撐不了,即便CPU負載100%,對于海量消息的處理,編碼解碼以及讀取、發送消息等情況,依然滿足不了。
(2)多線程模型:由一組NIO線程處理IO操作
(3)主從線程模型:一組線程池接受請求,一組線程池處理IO
2.當NIO的線程負載過重之後,整體服務性能處理就會變慢,結果就是導緻用戶端在向服務端發起請求、連結就會逾時,由于用戶端一般都會有一種逾時機制,反複地向服務端再次發起請求,此時就相當于陷入了死循環,更加加重了伺服器負載。
4.代碼示例
maven依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
搭建netty服務
package com.imooc.netty;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Component
public class WSServer {
private static class SingletionWSServer {
static final WSServer instance = new WSServer();
}
public static WSServer getInstance() {
return SingletionWSServer.instance;
}
private EventLoopGroup mainGroup;
private EventLoopGroup subGroup;
private ServerBootstrap server;
private ChannelFuture future;
public WSServer() {
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WSServerInitialzer());
}
public void start() {
this.future = server.bind(8088);
System.err.println("netty websocket server 啟動完畢...");
}
}
啟動netty
package com.imooc;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import com.imooc.netty.WSServer;
@Component
public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) {
try {
WSServer.getInstance().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Channel
package com.imooc.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// websocket 基于http協定,是以要有http編解碼器
pipeline.addLast(new HttpServerCodec());
// 對寫大資料流的支援
pipeline.addLast(new ChunkedWriteHandler());
// 對httpMessage進行聚合,聚合成FullHttpRequest或FullHttpResponse
// 幾乎在netty中的程式設計,都會使用到此hanler
pipeline.addLast(new HttpObjectAggregator(1024*64));
// ====================== 以上是用于支援http協定 ======================
// ====================== 增加心跳支援 start ======================
// 針對用戶端,如果在1分鐘時沒有向服務端發送讀寫心跳(ALL),則主動斷開
// 如果是讀空閑或者寫空閑,不處理
pipeline.addLast(new IdleStateHandler(50, 52, 54));
// 自定義的空閑狀态檢測
pipeline.addLast(new HeartBeatHandler());
// ====================== 增加心跳支援 end ======================
// ====================== 以下是支援httpWebsocket ======================
/**
* websocket 伺服器處理的協定,用于指定給用戶端連接配接通路的路由 : /ws
* 本handler會幫你處理一些繁重的複雜的事
* 會幫你處理握手動作: handshaking(close, ping, pong) ping + pong = 心跳
* 對于websocket來講,都是以frames進行傳輸的,不同的資料類型對應的frames也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定義的handler
pipeline.addLast(new ChatHandler());
}
}
自定義的空閑狀态檢測
package com.imooc.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @Description: 用于檢測channel的心跳handler
* 繼承ChannelInboundHandlerAdapter,進而不需要實作channelRead0方法
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判斷evt是否是IdleStateEvent(用于觸發使用者事件,包含 讀空閑/寫空閑/讀寫空閑 )
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)evt; // 強制類型轉換
if (event.state() == IdleState.READER_IDLE) {
System.out.println("進入讀空閑...");
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("進入寫空閑...");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel關閉前,users的數量為:" + ChatHandler.users.size());
Channel channel = ctx.channel();
// 關閉無用的channel,以防資源浪費
channel.close();
System.out.println("channel關閉後,users的數量為:" + ChatHandler.users.size());
}
}
}
}
自定義處理消息的handler
package com.imooc.netty;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import com.imooc.SpringUtil;
import com.imooc.enums.MsgActionEnum;
import com.imooc.service.UserService;
import com.imooc.utils.JsonUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
*
* @Description: 處理消息的handler
* TextWebSocketFrame: 在netty中,是用于為websocket專門處理文本的對象,frame是消息的載體
*/
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于記錄和管理所有用戶端的channle
public static ChannelGroup users =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
throws Exception {
// 擷取用戶端傳輸過來的消息
String content = msg.text();
Channel currentChannel = ctx.channel();
// 1. 擷取用戶端發來的消息
DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
Integer action = dataContent.getAction();
// 2. 判斷消息類型,根據不同的類型來處理不同的業務
if (action == MsgActionEnum.CONNECT.type) {
// 2.1 當websocket 第一次open的時候,初始化channel,把用的channel和userid關聯起來
String senderId = dataContent.getChatMsg().getSenderId();
UserChannelRel.put(senderId, currentChannel);
// 測試
for (Channel c : users) {
System.out.println(c.id().asLongText());
}
UserChannelRel.output();
} else if (action == MsgActionEnum.CHAT.type) {
// 2.2 聊天類型的消息,把聊天記錄儲存到資料庫,同時标記消息的簽收狀态[未簽收]
ChatMsg chatMsg = dataContent.getChatMsg();
String msgText = chatMsg.getMsg();
String receiverId = chatMsg.getReceiverId();
String senderId = chatMsg.getSenderId();
// 儲存消息到資料庫,并且标記為 未簽收
UserService userService = (UserService)SpringUtil.getBean("userServiceImpl");
String msgId = userService.saveMsg(chatMsg);
chatMsg.setMsgId(msgId);
DataContent dataContentMsg = new DataContent();
dataContentMsg.setChatMsg(chatMsg);
// 發送消息
// 從全局使用者Channel關系中擷取接受方的channel
Channel receiverChannel = UserChannelRel.get(receiverId);
if (receiverChannel == null) {
// TODO channel為空代表使用者離線,推送消息(JPush,個推,小米推送)
} else {
// 當receiverChannel不為空的時候,從ChannelGroup去查找對應的channel是否存在
Channel findChannel = users.find(receiverChannel.id());
if (findChannel != null) {
// 使用者線上
receiverChannel.writeAndFlush(
new TextWebSocketFrame(
JsonUtils.objectToJson(dataContentMsg)));
} else {
// 使用者離線 TODO 推送消息
}
}
} else if (action == MsgActionEnum.SIGNED.type) {
// 2.3 簽收消息類型,針對具體的消息進行簽收,修改資料庫中對應消息的簽收狀态[已簽收]
UserService userService = (UserService)SpringUtil.getBean("userServiceImpl");
// 擴充字段在signed類型的消息中,代表需要去簽收的消息id,逗号間隔
String msgIdsStr = dataContent.getExtand();
String msgIds[] = msgIdsStr.split(",");
List<String> msgIdList = new ArrayList<>();
for (String mid : msgIds) {
if (StringUtils.isNotBlank(mid)) {
msgIdList.add(mid);
}
}
System.out.println(msgIdList.toString());
if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() > 0) {
// 批量簽收
userService.updateMsgSigned(msgIdList);
}
} else if (action == MsgActionEnum.KEEPALIVE.type) {
// 2.4 心跳類型的消息
System.out.println("收到來自channel為[" + currentChannel + "]的心跳包...");
}
}
/**
* 當用戶端連接配接服務端之後(打開連接配接)
* 擷取用戶端的channle,并且放到ChannelGroup中去進行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
users.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asShortText();
System.out.println("用戶端被移除,channelId為:" + channelId);
// 當觸發handlerRemoved,ChannelGroup會自動移除對應用戶端的channel
users.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 發生異常之後關閉連接配接(關閉channel),随後從ChannelGroup中移除
ctx.channel().close();
users.remove(ctx.channel());
}
}
package com.imooc.enums;
/**
*
* @Description: 發送消息的動作 枚舉
*/
public enum MsgActionEnum {
CONNECT(1, "第一次(或重連)初始化連接配接"),
CHAT(2, "聊天消息"),
SIGNED(3, "消息簽收"),
KEEPALIVE(4, "用戶端保持心跳"),
PULL_FRIEND(5, "拉取好友");
public final Integer type;
public final String content;
MsgActionEnum(Integer type, String content){
this.type = type;
this.content = content;
}
public Integer getType() {
return type;
}
}
package com.imooc.netty;
import java.io.Serializable;
public class DataContent implements Serializable {
private static final long serialVersionUID = 8021381444738260454L;
private Integer action; // 動作類型
private ChatMsg chatMsg; // 使用者的聊天内容entity
private String extand; // 擴充字段
public Integer getAction() {
return action;
}
public void setAction(Integer action) {
this.action = action;
}
public ChatMsg getChatMsg() {
return chatMsg;
}
public void setChatMsg(ChatMsg chatMsg) {
this.chatMsg = chatMsg;
}
public String getExtand() {
return extand;
}
public void setExtand(String extand) {
this.extand = extand;
}
}
package com.imooc.netty;
import java.io.Serializable;
public class ChatMsg implements Serializable {
private static final long serialVersionUID = 3611169682695799175L;
private String senderId; // 發送者的使用者id
private String receiverId; // 接受者的使用者id
private String msg; // 聊天内容
private String msgId; // 用于消息的簽收
public String getSenderId() {
return senderId;
}
public void setSenderId(String senderId) {
this.senderId = senderId;
}
public String getReceiverId() {
return receiverId;
}
public void setReceiverId(String receiverId) {
this.receiverId = receiverId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
}