天天看點

聊天室 - Netty WebSocket初試

背景

最近項目上面需要用到聊天室,在Ajax輪詢和WebSocket之間考慮了下,決定還是采用WebSocket來實作這個項目。采用WebSocket實作,那麼就必須進行伺服器的技術選型,主要考慮的有Java(Netty/Jetty)、node.js(socket.io)、PHP(swoole/workerman)。

但是PHP語言的伺服器就全部放棄了,論速度估計是比不上前面兩者(沒測試,但有人好像測試過,不過PHP的一般情況下貌似的确速度方面不如其他語言,而且workerman貌似不太快,swoole安裝還挺麻煩的,相對來講),最關鍵的是我感覺PHP的一個非常重要的就是可以在虛拟主機中運作,但是PHP的WebSocket基本上必須得虛拟伺服器,那我要他何用!!!

在Java和node.js中進行選擇的主要原因是:伺服器中已經配好Java環境了,就懶得搞node.js了。

Netty和Jetty對比,好像是Netty更好一些,我從網上看到的,說錯了,别打我!!!

好了,那就Netty了,順便還可以學點新的玩意。以下内容其實大部分來自慕課網的netty websocket課程,主講人是濟癫,其中有個地方好像有BUG,随手改了,另外Netty5被廢了,于是我用的包其實是Netty4的。

PS:網絡上絕大部分内容應該都是來自慕課網的課程或者是《Netty權威指南》,其實慕課網的這些個應該是來自《Netty權威指南》。

伺服器代碼

其實沒有什麼好說的,直接上代碼吧!

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class NettyConfig {
	
	/*
	 *   存儲每個用戶端接入的配置量
	 */
	public static ChannelGroup cg = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}
           

這個沒有任何改動,都是慕課網原裝的!

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

// 接收/處理/響應用戶端WebSocket請求的核心業務處理類
public class MwWebSocketHandler extends SimpleChannelInboundHandler<Object> {

	private WebSocketServerHandshaker handshaker;
	private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket";

	// 服務端處理用戶端WebSocket請求的核心方法
	@Override
	protected void channelRead0(ChannelHandlerContext arg0, Object arg1) throws Exception {
		// TODO Auto-generated method stub
		if(arg1 instanceof FullHttpRequest) {
			handHttpRequest(arg0, (FullHttpRequest)arg1);
		} else if (arg1 instanceof WebSocketFrame) {
			handWebsocketFrame(arg0, (WebSocketFrame)arg1);
		}
		
	}
	
	// 處理用戶端與服務端之間的WebSocket業務
	private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
		// 判斷是否是關閉WebSocket的指令
		if(frame instanceof CloseWebSocketFrame) {
			handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
			return;
		}
		// 判斷是否是ping指令 
		if (frame instanceof PingWebSocketFrame) {
			ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
			return;
		}
		// 判斷是否是二進制消息
		if (!(frame instanceof TextWebSocketFrame)) {
			System.out.println("該例子不處理二進制消息!");
			throw new RuntimeException("【 " + this.getClass().getName() + "】不支援消息!");
		}
		// 傳回應答消息
		// 擷取用戶端向服務端發送的消息
		String request = ((TextWebSocketFrame)frame).text();
		System.out.println("服務端收到用戶端的消息====>>>" + request);
		TextWebSocketFrame tws = new TextWebSocketFrame((ctx.channel().id() + "===>>>" + request);
		NettyConfig.cg.writeAndFlush(tws);
	}
	
	// 處理用戶端向服務端發起HTTP握手請求的業務
	@SuppressWarnings("deprecation")
	private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
		if(!req.getDecoderResult().isSuccess() || !("websocket".equals(req.headers().get("Upgrade")))) {
			sendHttpRequest(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
			return;
		}
		WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
		handshaker = wsFactory.newHandshaker(req);
		if(handshaker == null) {
			WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
		} else {
			handshaker.handshake(ctx.channel(), req);
		}
	}
	
	// 服務端向用戶端響應消息
	@SuppressWarnings("deprecation")
	private void sendHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
		if(res.getStatus().code() != 200) {
			ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
			res.content().writeBytes(buf);
			buf.release();
		}
		// 服務端向用戶端發送資料
		ChannelFuture cf = ctx.channel().writeAndFlush(res);
		if(res.getStatus().code() != 200) {
			cf.addListener(ChannelFutureListener.CLOSE);
		}
	}

	
	// 用戶端與服務端建立連接配接時調用
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// TODO Auto-generated method stub
		NettyConfig.cg.add(ctx.channel());
		System.out.println("用戶端與服務端連接配接開啟...");
	}

	// 用戶端與服務端斷開連接配接時調用
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		// TODO Auto-generated method stub
		NettyConfig.cg.remove(ctx.channel());
		System.out.println("用戶端與服務端連接配接關閉。");
	}

	
	// 服務端接收用戶端發送過來結束時調用
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		// TODO Auto-generated method stub
		ctx.flush();
	}

	// 工程出現異常時調用
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		cause.printStackTrace();
		ctx.close();
	}

}

           

把傳回消息時會增加一個時間的地方去掉了,另外Nettt4中的處理 “服務端處理用戶端WebSocket請求的核心方法”是channelRead0,不再是 messageReceived了,有幾個函數被标記為deprecation了,以上這些就是使用netty4的差異吧。另外有一個BUG是:如果用戶端斷開請求時,伺服器會彈出上面的那個自己定義的二進制異常,包括重新整理、浏覽器關閉和websocket主動關閉。其實原因在于在判斷frame是否為CloseWebSocketFrame後,如果是的話,除了執行代碼塊後的代碼,還會繼續往下執行,然後是判斷PingWebSocketFrame,再然後判斷是否是TextWebScoketFrame,如果不是的話,就是二進制代碼,但是上面說了這個frame其實是CloseWebSocketFrame,是以自然會抛出異常。那麼解決方案其實很簡單,在判斷是否為CloseWebSocketFrame時,如果是的話執行完close後,那就return出去!

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

// 初始化連接配接時的各個元件
public class MwWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel arg0) throws Exception {
		// TODO Auto-generated method stub
		arg0.pipeline().addLast("http-codec", new HttpServerCodec());
		arg0.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
		arg0.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
		arg0.pipeline().addLast("handler", new MwWebSocketHandler());
	}

}
           

這個沒啥說的,繼續!

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

// 程式的入口,負責啟動應用
public class Main {
	public static void main(String[] args) {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workGroup);
			b.channel(NioServerSocketChannel.class);
			b.childHandler(new MwWebSocketChannelHandler());
			System.out.println("服務端開啟,等待用戶端連接配接...");
			Channel ch = b.bind(8888).sync().channel();
			ch.closeFuture().sync();
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		} finally {
			// TODO: handle finally clause
			bossGroup.shutdownGracefully();
			workGroup.shutdownGracefully();
		}
	}
}
           

同上

用戶端代碼

用戶端代碼我增加了一個關閉按鈕,其實關閉、重新整理浏覽器也會自動關閉websocket的。

<html> 
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
		<title>WebSocket用戶端</title>
	</head>
	<body>
		<form onSubmit="return false;">
			<input type="text" name="message" value="" />
			<br/><br/>
			<input type="button" value="發送WebSocket消息" onclick="send(this.form.message.value)" />
			<input type="button" value="關閉" onclick="close_socket()" />
			<hr color="red" />
			<h2>用戶端接收到服務端傳回的應答消息</h2>
			<textarea id="responseContent" style="width:100%;height:300px"></textarea>
		</form>
		<script type="text/javascript">
			var socket;
			if(!window.WebSocket) {
				window.WebSocket = window.MozWebSocket;
			}
			if(window.WebSocket) {
				console.log("debug2");
				socket = new WebSocket("ws://127.0.0.1:8888/websocket");
				socket.onmessage = function(event) {
					var ta = document.getElementById("responseContent");
					ta.value += event.data + "\r\n";
				}
				socket.onopen = function(event) {
					var ta = document.getElementById("responseContent");
					ta.value = "您的浏覽器支援WebSocket,已連接配接伺服器...\r\n";
				}
				socket.onclose = function(event) {
					var ta = document.getElementById("responseContent");
					ta.value = "WebSocket 連接配接關閉!\r\n";
				}
			} else {
				alert("您的浏覽器不支援WebSocket");
			}
			function send(message) {
				console.log(message);
				if (!window.WebSocket) {
					return;
				}
				if (socket.readyState = WebSocket.OPEN) {
					socket.send(message);
				} else {
					alert("腳本沒有連接配接成功");
				}
			}
			function close_socket() {
				socket.close();
			}
		</script>
	</body>
</html>
           

OK啦,對于一個簡單的聊天室是可以實作基本的功能了,不過這玩意如果上交給上司,那我就該另投履歷了,是以之後還會有一些别的東西加入,例如:

1.使用者的識别!

2.使用者的分組,不可能所有的使用者都在一起聊吧!!

3.心跳機制,就算沒有什麼意外,難免會被防火牆/網關去中斷!!

4.前端的美化,現在都9102年了,還用這個界面!!!

5.以及其他功能的實作!

之後有空的時候還會更新的!!!

注意:是有空的時候!!!!!