天天看点

java使用netty

Netty为什么会高效?回答就是良好的线程模型,和内存管理。在Java的NIO例子中就我将客户端的操作单独放在一个线程中处理了,这么做的原因在于如果将客户端连接串起来,后来的连接就要等前一个处理完,当然这并不意味着多线程比单线程有优势,而是在于每个客户端都需要进行读取准备好的缓存数据,再执行一些业务逻辑。如果业务逻辑耗时很久,那么顺序执行的方式没有多线程优势大。另一个方面目前多核CPU很常见了,多线程是个不错的选择。这些在第一节就说明过,也提到过NIO并不是提升了IO操作的速度,而是减少了CPU的浪费时间,这些概念不能搞混。

netty作为一款服务端框架,其优势在于自身良好的线程模型架构,关于netty的基本架构,我用几张图进行说明,netty主要处理方式有三种线程模型,1、 单线程 2、主从线程 3、主从多线程

1、事件分离器把接收到的客户事件分发到不同的事件处理器中,在netty中,处理器定义为channelHandler:如下图:

java使用netty

2、netty处理请求结构,理解起来就是,客户端发起请求,服务端开辟一个两个线程组,一个线程组负责接收请求,由于是线程组,可理解为线程池,能够处理的并发请求量就提升了,然后是工作线程,主线程组把请求转发给从线程组,也叫工作线程池,这个线程组负责执行具体的业务,

java使用netty

3、那么最终的处理结构就是,

java使用netty

下面用具体的代码来模拟一下上述过程的实现,这里为了演示,做一个客户端访问具体的端口号,由服务端返回一条信息,

4、pom文件:

<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>5.0.0.Alpha2</version>
			<!-- <version>4.1.24.Final</version> -->
		</dependency>

5、测试主类,

           

package com.congge.sort.netty.day1;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

  • netty测试
  • @author asus

*/

public class HelloServer {

public static void main(String[] args) {
	
	//定义主线程组,用于接收客户端请求,但是并不做任何逻辑处理
	EventLoopGroup bossGroup = new NioEventLoopGroup();
	//从线程组,主线程组会把相应的请求转交给该线程组,由从线程组去做任务
	EventLoopGroup workerGroup = new NioEventLoopGroup();
	//创建netty服务器
	ServerBootstrap serverBootstrap = new ServerBootstrap();
	try {
		//服务器设置绑定两个线程组,并设置相应的助手类【handler】
		serverBootstrap.group(bossGroup, workerGroup)
						.channel(NioServerSocketChannel.class)				//设置nio双向通道
						.childHandler(new HelloServerInitializer());		//子处理器
		
		//启动server并绑定端口号
		ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
		//关闭监听的channel
		channelFuture.channel().closeFuture().sync();
	} catch (Exception e) {
		e.printStackTrace();
	}finally {
		bossGroup.shutdownGracefully();
		workerGroup.shutdownGracefully();
	}
	
}
           

}

package com.congge.sort.netty.day1;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

/**
 * 初始化配置器,channel注册成功后,会执行里面相应的初始化方法
 * @author asus
 *
 */
public class HelloServerInitializer extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel channel) throws Exception {
		
		//channel获取相应的管道
		ChannelPipeline pipeline = channel.pipeline();
		
		//为管道增加相应的handler,可理解为是拦截器,或者监听器,监听客户端建立连接后的信息
		//当请求到服务端,我们需要对写出到客户端的数据做编码处理
		pipeline.addLast("httpCode",new HttpServerCodec());
		
		//添加自定义助手类,可添加多个
		pipeline.addLast("myHandler",new MyHandler());
		
	}

}

           
package com.congge.sort.netty.day1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;

/**
 * 自定义助手类
 * @author asus
 *
 */
public class MyHandler extends SimpleChannelInboundHandler<HttpObject>{

	@Override
	protected void messageReceived(ChannelHandlerContext context, HttpObject msg) 
			throws Exception {
		
		//通过context的上下文获取channel
		Channel channel = context.channel();
		
		if(msg instanceof HttpRequest){
			//获取请求地址
			System.out.println(channel.remoteAddress());
		}
		
		//自定义相应客户端信息
		ByteBuf content = Unpooled.copiedBuffer("hello neyy <<<>>>",CharsetUtil.UTF_8);
		
		//侯建httpResponse对象
		FullHttpResponse response = new DefaultFullHttpResponse(
				HttpVersion.HTTP_1_1, 
				HttpResponseStatus.OK,
				content);
		
		//设置response对象的头信息
		response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
		response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
		
		//将数据刷到客户端
		context.writeAndFlush(response);
		
	}

}