天天看点

Netty多线程服务器与客户端

发此篇博客的目的是,如果有网络通讯方面的大牛看到,希望能给与一些指导.

我相信很多人都能用netty开发出自己的客户端和服务器,但是,此服务器和客户端的可重用性有多高呢?我一直想弄个性能还算不错,然后其他人在此架构上做简单的命令处理即可.

开发这个服务器和客户端的原因是正是如此,也可以说是一个简单的网络平台.让其具备一定的2次开发功能.

以下代码只是一个初步是设想,有兴趣的朋友我们可以一起讨论讨论.

如果你完全不了解netty,请先自己学习一下.^_^,因为我也是菜鸟,无法解释那么多的类是干什么的.^_^

关于netty的下载和其他个jar请自行下载

首先是服务器的初步实现.因为Netty是基于事件的,再加上其无阻塞的特性.我们必须要牢记:

数据发送后,代码不会被阻塞,而是顺序运行,也就是说,做了一件事件后,这件事情不一定已经成功,所以我们不能在下一行代码中百分百的确定其已经发送到了对方,因此,你会发行其很多方法都会返回一个"Future".只要注意到这一点,Netty的使用难度就不是很大了.

(一)handler处理篇

首先,是handler,初次接触netty的朋友要注意,handler不是一个单例.即每个channel下都会有自己的一个handler实例.

Java代码

public class ServerHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(

ServerHandler.class.getName());

private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();

private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();

@Override

public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)

throws Exception {

if (e instanceof ChannelStateEvent) {

logger.log(Level.INFO, "Channel state changed: {0}", e);

}

super.handleUpstream(ctx, e);

}

@Override

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)

throws Exception {

System.out.println(this);

String request = (String) e.getMessage();

//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接

//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接

if (request.toLowerCase().equals("bye")) {

ChannelFuture future = e.getChannel().write("bye\r\n");

future.addListener(ChannelFutureListener.CLOSE);

} else {

//以下是我初步解析客户端发送过来的数据,然后决定处理方式

RecevieData receivedaData = MessageDecoder.decode(request);

if (null != receivedaData) {

//服务器第5版

if (VersionCode.V5.equals(receivedaData.getVersion())) {

//然后判断命令是否存在

for (String s : CommandCode.COMMANDS) {

if (s.equals(receivedaData.getActionType())) {

COMMAND_FLAG.set(true);

if (s.equals(CommandCode.KEEP_ALIVE)) {

serverChannelGroup.addChannel(e.getChannel());

}

break;

} else {

COMMAND_FLAG.set(false);

}

}

if (COMMAND_FLAG.get()) {

COMMAND_FLAG.set(false);

//将这个命令传递给下一个handler来处理.

//这里的"下一个handler"即为用户自己定义的处理handler

ctx.sendUpstream(e);

} else {

e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));

}

} else {

//版本错误

e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));

}

} else {

//如果格式错误,那么直接返回

e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));

}

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)

throws Exception {

logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",

e.getCause());

e.getChannel().close();

ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());

}

}

在上面这个handler中,我使用了ctx.sendUpstream(e);来处理,个人觉得此处为了要实现执行运行时代码,也可以使用接口等方式.但既然netty提供了sendUpstream 的方法,我们用这个岂不是更方便^_^

下面是使用SSL连接的handler

Java代码

public class ServerSSLHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(

ServerSSLHandler.class.getName());

private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();

private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();

@Override

public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)

throws Exception {

if (e instanceof ChannelStateEvent) {

logger.log(Level.INFO, "Channel state changed: {0}", e);

}

super.handleUpstream(ctx, e);

}

@Override

public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)

throws Exception {

//ssl握手

SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);

sslHandler.handshake();

}

@Override

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)

throws Exception {

System.out.println(this);

String request = (String) e.getMessage();

//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接

//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接

if (request.toLowerCase().equals("bye")) {

ChannelFuture future = e.getChannel().write("bye\r\n");

future.addListener(ChannelFutureListener.CLOSE);

} else {

//以下是我初步解析客户端发送过来的数据,然后决定处理方式

RecevieData receivedaData = MessageDecoder.decode(request);

if (null != receivedaData) {

//服务器第5版

if (VersionCode.V5.equals(receivedaData.getVersion())) {

//然后判断命令是否存在

for (String s : CommandCode.COMMANDS) {

if (s.equals(receivedaData.getActionType())) {

COMMAND_FLAG.set(true);

if (s.equals(CommandCode.KEEP_ALIVE)) {

serverChannelGroup.addChannel(e.getChannel());

}

break;

} else {

COMMAND_FLAG.set(false);

}

}

if (COMMAND_FLAG.get()) {

COMMAND_FLAG.set(false);

//将这个命令传递给下一个handler来处理.

//这里的"下一个handler"即为用户自己定义的处理handler

ctx.sendUpstream(e);

} else {

e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));

}

} else {

//版本错误

e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));

}

} else {

//如果格式错误,那么直接返回

e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));

}

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)

throws Exception {

logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",

e.getCause());

e.getChannel().close();

ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());

}

}

关于SSL连接需要用到的一些其他东西,稍后在介绍

当我们有了2个handler后,当然就是要把他们添加到我们的Pipeline中

Java代码

public class ServerPipelineFactory implements

ChannelPipelineFactory {

public ChannelPipeline getPipeline() {

ChannelPipeline pipeline = pipeline();

ServerConfig config = ServerConfig.getInstance();

try {

if (config.ssl()) {

SSLEngine engine =

SecureSslContextFactory.getServerContext().createSSLEngine();

//说明是服务器端SslContext

engine.setUseClientMode(false);

pipeline.addLast("ssl", new SslHandler(engine));

}

//此Decoder可以自动解析一句以\r\n结束的命令,我为了方便,也用了这个Decoder

//使用这个Decoder,我不用刻意发送命令长度用于解析,只要没有收到\r\n说明数据还

//没有发送完毕.这个Decoder会等到收到\r\n后调用下个handler

pipeline.addLast("framer", new DelimiterBasedFrameDecoder(

8192, Delimiters.lineDelimiter()));

//字串解码,可以自己设置charset

pipeline.addLast("decoder", new StringDecoder());

//字串编码,可以自己设置charset

pipeline.addLast("encoder", new StringEncoder());

if (config.ssl()) {

//如果开启了SSL,那么使用sslhandler

pipeline.addLast("sslhandler", new ServerSSLHandler());

} else {

//如果没有开启SSL,那么使用普通handler

pipeline.addLast("handler", new ServerHandler());

}

//遍历配置文件中的服务器handler,将其添加进Pipeline链中

for (Element e : config.handler()) {

pipeline.addLast(e.attribute(e.getQName("id")).getValue().trim(),

(ChannelHandler) Class.forName(e.attribute(e.getQName("class")).getValue().trim()).newInstance());

}

} catch (DocumentException ex) {

Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);

} catch (InstantiationException ex) {

Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);

} catch (IllegalAccessException ex) {

Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);

} catch (ClassNotFoundException ex) {

Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);

}

return pipeline;

}

}

下面是xml处理类

Java代码

public class ServerConfig {

private static final String HOST = "host";

private static final String PORT = "port";

private static final String HANDLER = "handler";

private static final String CLIENTHANDLER = "clienthandler";

private static final String SSL = "ssl";

private static final ServerConfig SERVER_CONFIG = new ServerConfig();

private static final String XML_PATH = "lib/server.xml";

private static final SAXReader SAR_READER = new SAXReader();

private ServerConfig() {

super();

}

public String host() throws DocumentException {

return this.rootElement().element(HOST).getTextTrim().trim();

}

public int port() throws DocumentException {

return Integer.parseInt(this.rootElement().element(PORT).getTextTrim().trim());

}

public boolean ssl() throws DocumentException {

return Integer.parseInt(this.rootElement().element(SSL).getTextTrim().trim()) == 1 ? true : false;

}

public List<Element> handler() throws DocumentException {

return this.rootElement().elements(HANDLER);

}

public List<Element> clienthandler() throws DocumentException {

return this.rootElement().elements(CLIENTHANDLER);

}

private Element rootElement() throws DocumentException {

return SAR_READER.read(new File(XML_PATH)).getRootElement();

}

public static ServerConfig getInstance() {

return SERVER_CONFIG;

}

}

server.xml,放到lib下即可,注意其中的handler 以及clienthandler 项,如果你新建了自己的handler,那么需要在此xml中配置一下.

Xml代码

<?xml version="1.0" encoding="UTF-8"?>

<root>

<!-- 配置主机地址 -->

<host>127.0.0.1</host>

<!-- 配置服务端口 -->

<port>8080</port>

<!-- 是否启用ssl,1为启用,0为停用 -->

<ssl>0</ssl>

<!--服务器业务handler -->

<handler id="timeHandler" class="com.chinatenet.nio.server.handler.ServerTimeHandler" />

<!--客户端业务handler -->

<clienthandler id="timeHandler" class="com.chinatenet.nio.client.handler.ClientTimeHandler" />

</root>

到此,一个简单的可扩展handler的服务器雏形就出来了

下面,我们添加一个自定义的服务器处理handler进来

Java代码

public class ServerTimeHandler extends SimpleChannelUpstreamHandler {

@Override

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

RecevieData receivedaData = MessageDecoder.decode((String) e.getMessage());

if (CommandCode.GET_TIME.equals(receivedaData.getActionType())

|| CommandCode.KEEP_ALIVE.equals(receivedaData.getActionType())) {

if (VersionCode.V5.equals(receivedaData.getVersion())) {

//回复客户端后,即可进行自己的业务.当然.这里可以根据需要,看

//是先回复再处理还是等处理结果出来后,将结果返回客户端

e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.OK,

System.currentTimeMillis() / 1000 + ""));

} else {

//版本错误

e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED,

StatusCode.VERSION_NOT_SUPPORTED_TXET));

}

} else {

//如果不是此handler处理的命令,那么流下去

ctx.sendUpstream(e);

}

}

}

最后测试一下

Java代码

public class Server {

public static void main(String[] args) throws DocumentException {

ServerBootstrap bootstrap = new ServerBootstrap(

new NioServerSocketChannelFactory(

Executors.newCachedThreadPool(),

Executors.newCachedThreadPool()));

bootstrap.setPipelineFactory(new ServerPipelineFactory());

//因为我要用到长连接

bootstrap.setOption("child.tcpNoDelay", true);

bootstrap.setOption("child.keepAlive", true);

ServerConfig config = ServerConfig.getInstance();

bootstrap.bind(new InetSocketAddress(Integer.valueOf(config.port())));

}

}

总结:在整个服务器编码中,刚开始会遇到"远程主机强迫关闭了一个现有的连接。"等这类错误,最后修改成"相互告知对方我要关闭了"再进行关闭就可以了.

最后再完善一下异常处理即可.