天天看點

netty+springboot實作聊天系統

為什麼使用netty

1.netty傳輸速度快

零拷貝

具體來講,如果要從IO中讀取資料,分為兩個步驟:

(1)從IO流中讀取出來放到緩沖區,程式從緩沖區中讀取,再放到堆中,此時資料就會被拷貝兩次才能到達堆或者堆記憶體中。如果資料量很大,那麼就會造成資源的浪費

(2)Netty其實就是利用NIO中的零拷貝特性,當Netty需要接收資料或者傳輸資料的時候,就會新開辟一塊堆記憶體,然後資料就直接通過IO讀取到了新開辟的堆記憶體中,這樣也就加快了資料傳輸的速度。

2.異步非阻塞的IO

線程在通路某一個資源的時候,該資源是否準備就緒的一種處理方式,如果說該資源目前沒準備就緒,這個時候就會有兩種處理方式:阻塞與非阻塞,netty解決了NIO目前發現的所有bug。

(1)阻塞:這個線程會一直持續等待這個資源就緒并處理完畢,直到響應傳回一個結果,這個時候線程是一直阻塞狀态,不可以去做任何事情

(2)非阻塞:這個線程直接傳回結果,不會持續等待這個資源處理完畢才響應,它會去請求别的資源。

同步與異步

這裡的 “同步與異步” 指的是通路資料的一種機制,類似于Ajax。

(1)同步:主動請求,并且會等待IO操作完成之後,IO會有一個通知

(2)異步:當一個線程主動請求資料之後,可以繼續處理其他任務,發起其他請求,多個請求完成之後再逐一的通過異步形式

3.支援高并發,Netty的三種線程模型

1.Reactor線程模型:

(1)單線程模型:所有的IO操作都由同一個NIO線程處理,僅限于一些小型應用場景。但在高負載、高并發等情況下使用單線程肯定就不太合理,主要是因為NIO的一個線程同時要去處理成千上萬的請求 的時候,在性能上會支撐不了,即便CPU負載100%,對于海量消息的處理,編碼解碼以及讀取、發送消息等情況,依然滿足不了。

(2)多線程模型:由一組NIO線程處理IO操作

(3)主從線程模型:一組線程池接受請求,一組線程池處理IO

2.當NIO的線程負載過重之後,整體服務性能處理就會變慢,結果就是導緻用戶端在向服務端發起請求、連結就會逾時,由于用戶端一般都會有一種逾時機制,反複地向服務端再次發起請求,此時就相當于陷入了死循環,更加加重了伺服器負載。

4.代碼示例

maven依賴

<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.25.Final</version>
		</dependency>
           

搭建netty服務

package com.imooc.netty;

import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Component
public class WSServer {

	private static class SingletionWSServer {
		static final WSServer instance = new WSServer();
	}
	
	public static WSServer getInstance() {
		return SingletionWSServer.instance;
	}
	
	private EventLoopGroup mainGroup;
	private EventLoopGroup subGroup;
	private ServerBootstrap server;
	private ChannelFuture future;
	
	public WSServer() {
		mainGroup = new NioEventLoopGroup();
		subGroup = new NioEventLoopGroup();
		server = new ServerBootstrap();
		server.group(mainGroup, subGroup)
			.channel(NioServerSocketChannel.class)
			.childHandler(new WSServerInitialzer());
	}
	
	public void start() {
		this.future = server.bind(8088);
		System.err.println("netty websocket server 啟動完畢...");
	}
}

           

啟動netty

package com.imooc;

import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import com.imooc.netty.WSServer;

@Component
public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {

	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		if (event.getApplicationContext().getParent() == null) {
			try {
				WSServer.getInstance().start();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
}

           

Channel

package com.imooc.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		
		// websocket 基于http協定,是以要有http編解碼器
		pipeline.addLast(new HttpServerCodec());
		// 對寫大資料流的支援 
		pipeline.addLast(new ChunkedWriteHandler());
		// 對httpMessage進行聚合,聚合成FullHttpRequest或FullHttpResponse
		// 幾乎在netty中的程式設計,都會使用到此hanler
		pipeline.addLast(new HttpObjectAggregator(1024*64));
		
		// ====================== 以上是用于支援http協定    ======================
		
		
		
		// ====================== 增加心跳支援 start    ======================
		// 針對用戶端,如果在1分鐘時沒有向服務端發送讀寫心跳(ALL),則主動斷開
		// 如果是讀空閑或者寫空閑,不處理
		pipeline.addLast(new IdleStateHandler(50, 52, 54));
		// 自定義的空閑狀态檢測
		pipeline.addLast(new HeartBeatHandler());
		// ====================== 增加心跳支援 end    ======================
		
		
		
		
		// ====================== 以下是支援httpWebsocket ======================
		
		/**
		 * websocket 伺服器處理的協定,用于指定給用戶端連接配接通路的路由 : /ws
		 * 本handler會幫你處理一些繁重的複雜的事
		 * 會幫你處理握手動作: handshaking(close, ping, pong) ping + pong = 心跳
		 * 對于websocket來講,都是以frames進行傳輸的,不同的資料類型對應的frames也不同
		 */
		pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
		
		// 自定義的handler
		pipeline.addLast(new ChatHandler());
	}

}

           

自定義的空閑狀态檢測

package com.imooc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * @Description: 用于檢測channel的心跳handler 
 * 				 繼承ChannelInboundHandlerAdapter,進而不需要實作channelRead0方法
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		
		// 判斷evt是否是IdleStateEvent(用于觸發使用者事件,包含 讀空閑/寫空閑/讀寫空閑 )
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent)evt;		// 強制類型轉換
			
			if (event.state() == IdleState.READER_IDLE) {
				System.out.println("進入讀空閑...");
			} else if (event.state() == IdleState.WRITER_IDLE) {
				System.out.println("進入寫空閑...");
			} else if (event.state() == IdleState.ALL_IDLE) {
				
				System.out.println("channel關閉前,users的數量為:" + ChatHandler.users.size());
				
				Channel channel = ctx.channel();
				// 關閉無用的channel,以防資源浪費
				channel.close();
				
				System.out.println("channel關閉後,users的數量為:" + ChatHandler.users.size());
			}
		}
		
	}
	
}

           

自定義處理消息的handler

package com.imooc.netty;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import com.imooc.SpringUtil;
import com.imooc.enums.MsgActionEnum;
import com.imooc.service.UserService;
import com.imooc.utils.JsonUtils;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 
 * @Description: 處理消息的handler
 * TextWebSocketFrame: 在netty中,是用于為websocket專門處理文本的對象,frame是消息的載體
 */
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

	// 用于記錄和管理所有用戶端的channle
	public static ChannelGroup users = 
			new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
	
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) 
			throws Exception {
		// 擷取用戶端傳輸過來的消息
		String content = msg.text();
		
		Channel currentChannel = ctx.channel();

		// 1. 擷取用戶端發來的消息
		DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
		Integer action = dataContent.getAction();
		// 2. 判斷消息類型,根據不同的類型來處理不同的業務

		if (action == MsgActionEnum.CONNECT.type) {
			// 	2.1  當websocket 第一次open的時候,初始化channel,把用的channel和userid關聯起來
			String senderId = dataContent.getChatMsg().getSenderId();
			UserChannelRel.put(senderId, currentChannel);
			
			// 測試
			for (Channel c : users) {
				System.out.println(c.id().asLongText());
			}
			UserChannelRel.output();
		} else if (action == MsgActionEnum.CHAT.type) {
			//  2.2  聊天類型的消息,把聊天記錄儲存到資料庫,同時标記消息的簽收狀态[未簽收]
			ChatMsg chatMsg = dataContent.getChatMsg();
			String msgText = chatMsg.getMsg();
			String receiverId = chatMsg.getReceiverId();
			String senderId = chatMsg.getSenderId();
			
			// 儲存消息到資料庫,并且标記為 未簽收
			UserService userService = (UserService)SpringUtil.getBean("userServiceImpl");
			String msgId = userService.saveMsg(chatMsg);
			chatMsg.setMsgId(msgId);
			
			DataContent dataContentMsg = new DataContent();
			dataContentMsg.setChatMsg(chatMsg);
			
			// 發送消息
			// 從全局使用者Channel關系中擷取接受方的channel
			Channel receiverChannel = UserChannelRel.get(receiverId);
			if (receiverChannel == null) {
				// TODO channel為空代表使用者離線,推送消息(JPush,個推,小米推送)
			} else {
				// 當receiverChannel不為空的時候,從ChannelGroup去查找對應的channel是否存在
				Channel findChannel = users.find(receiverChannel.id());
				if (findChannel != null) {
					// 使用者線上
					receiverChannel.writeAndFlush(
							new TextWebSocketFrame(
									JsonUtils.objectToJson(dataContentMsg)));
				} else {
					// 使用者離線 TODO 推送消息
				}
			}
			
		} else if (action == MsgActionEnum.SIGNED.type) {
			//  2.3  簽收消息類型,針對具體的消息進行簽收,修改資料庫中對應消息的簽收狀态[已簽收]
			UserService userService = (UserService)SpringUtil.getBean("userServiceImpl");
			// 擴充字段在signed類型的消息中,代表需要去簽收的消息id,逗号間隔
			String msgIdsStr = dataContent.getExtand();
			String msgIds[] = msgIdsStr.split(",");
			
			List<String> msgIdList = new ArrayList<>();
			for (String mid : msgIds) {
				if (StringUtils.isNotBlank(mid)) {
					msgIdList.add(mid);
				}
			}
			
			System.out.println(msgIdList.toString());
			
			if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() > 0) {
				// 批量簽收
				userService.updateMsgSigned(msgIdList);
			}
			
		} else if (action == MsgActionEnum.KEEPALIVE.type) {
			//  2.4  心跳類型的消息
			System.out.println("收到來自channel為[" + currentChannel + "]的心跳包...");
		}
	}
	
	/**
	 * 當用戶端連接配接服務端之後(打開連接配接)
	 * 擷取用戶端的channle,并且放到ChannelGroup中去進行管理
	 */
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		users.add(ctx.channel());
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		
		String channelId = ctx.channel().id().asShortText();
		System.out.println("用戶端被移除,channelId為:" + channelId);
		
		// 當觸發handlerRemoved,ChannelGroup會自動移除對應用戶端的channel
		users.remove(ctx.channel());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		// 發生異常之後關閉連接配接(關閉channel),随後從ChannelGroup中移除
		ctx.channel().close();
		users.remove(ctx.channel());
	}
}

           
package com.imooc.enums;

/**
 * 
 * @Description: 發送消息的動作 枚舉
 */
public enum MsgActionEnum {
	
	CONNECT(1, "第一次(或重連)初始化連接配接"),
	CHAT(2, "聊天消息"),	
	SIGNED(3, "消息簽收"),
	KEEPALIVE(4, "用戶端保持心跳"),
	PULL_FRIEND(5, "拉取好友");
	
	public final Integer type;
	public final String content;
	
	MsgActionEnum(Integer type, String content){
		this.type = type;
		this.content = content;
	}
	
	public Integer getType() {
		return type;
	}  
}

           
package com.imooc.netty;

import java.io.Serializable;

public class DataContent implements Serializable {

	private static final long serialVersionUID = 8021381444738260454L;

	private Integer action;		// 動作類型
	private ChatMsg chatMsg;	// 使用者的聊天内容entity
	private String extand;		// 擴充字段
	
	public Integer getAction() {
		return action;
	}
	public void setAction(Integer action) {
		this.action = action;
	}
	public ChatMsg getChatMsg() {
		return chatMsg;
	}
	public void setChatMsg(ChatMsg chatMsg) {
		this.chatMsg = chatMsg;
	}
	public String getExtand() {
		return extand;
	}
	public void setExtand(String extand) {
		this.extand = extand;
	}
}

           
package com.imooc.netty;

import java.io.Serializable;

public class ChatMsg implements Serializable {

	private static final long serialVersionUID = 3611169682695799175L;
	
	private String senderId;		// 發送者的使用者id	
	private String receiverId;		// 接受者的使用者id
	private String msg;				// 聊天内容
	private String msgId;			// 用于消息的簽收
	
	public String getSenderId() {
		return senderId;
	}
	public void setSenderId(String senderId) {
		this.senderId = senderId;
	}
	public String getReceiverId() {
		return receiverId;
	}
	public void setReceiverId(String receiverId) {
		this.receiverId = receiverId;
	}
	public String getMsg() {
		return msg;
	}
	public void setMsg(String msg) {
		this.msg = msg;
	}
	public String getMsgId() {
		return msgId;
	}
	public void setMsgId(String msgId) {
		this.msgId = msgId;
	}
	
}