package com.netty.core;
import java.security.cert.CertificateException;
import javax.net.ssl.SSLException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.InitializingBean;
import com.netty.util.NettyPropertiesUtil;
public class NettyServer implements InitializingBean {
Logger logger = Logger.getLogger(NettyServer.class);
public boolean isSSL = true;
public int port = 8443;
public int threadSize = 1;
public int connnecttimeout = 10000;
public int sotimeout = 10000;
public void init() {
isSSL = Integer
.parseInt(NettyPropertiesUtil.getProperty("netty.isSSL")) == 1 ? true
: false;
port = Integer.parseInt(NettyPropertiesUtil.getProperty("netty.port"));
threadSize = Integer.parseInt(NettyPropertiesUtil
.getProperty("netty.nioeventloopgroup.size"));
connnecttimeout = Integer.parseInt(NettyPropertiesUtil
.getProperty("netty.serverbootstrap.connnecttimeout"));
sotimeout = Integer.parseInt(NettyPropertiesUtil
.getProperty("netty.serverbootstrap.sotimeout"));
}
@SuppressWarnings({ "deprecation" })
public void start() throws CertificateException, SSLException,
InterruptedException {
logger.info("netty server start");
final SslContext sslCtx;
if (isSSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(),
ssc.privateKey());
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(threadSize);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new NettyServerInitializer(sslCtx));
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connnecttimeout);
b.option(ChannelOption.SO_TIMEOUT, sotimeout);
Channel ch = b.bind(port).sync().channel();
logger.info("netty server end");
ch.closeFuture().sync();
logger.info("netty end ");
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void afterPropertiesSet() throws Exception {
try {
logger.info(" netty conf init start");
init();
logger.info(" netty conf init end");
} catch (Exception e) {
logger.info(" netty conf init fail");
e.printStackTrace();
}
}
}
package com.netty.core;
import org.apache.log4j.Logger;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.HttpHeaders.Names;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
Logger logger = Logger.getLogger(NettyServerHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest){
HttpRequest req = (HttpRequest) msg;
logger.info("RECV MSG: "+req.getUri());
if(HttpHeaders.is100ContinueExpected(req)){
ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
}
String resp="ok";
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(resp.getBytes()));
response.headers().set(Names.CONTENT_TYPE, "text/html; charset=utf-8");
response.headers().set(Names.CONTENT_LENGTH, response.content().readableBytes());
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
package com.netty.core;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslContext;
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public NettyServerInitializer() {
sslCtx=null;
}
public NettyServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (null!=sslCtx) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpServerCodec());
//完整參數封裝
p.addLast("aggregator", new HttpObjectAggregator(1048576));
p.addLast(new NettyServerHandler());
}
}
package test.netty;
import java.net.URI;
import org.junit.Test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
public class NettyClientTest {
public void connect(String host, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 用戶端接收到的是httpResponse響應,是以要使用HttpResponseDecoder進行解碼
ch.pipeline().addLast(new HttpResponseDecoder());
// 用戶端發送的是httprequest,是以要使用HttpRequestEncoder進行編碼
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpClientInboundHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
URI uri = new URI("http://127.0.0.1:8443");
String msg = "Are you ok?";
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
// 建構http請求
request.headers().set(HttpHeaders.Names.HOST, host);
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
// 發送http請求
f.channel().write(request);
f.channel().flush();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
@Test
public void Test(){
try{
NettyClientTest client = new NettyClientTest();
client.connect("127.0.0.1", 8443);
}catch(Exception e){
e.printStackTrace();
}
}
}
class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse)
{
HttpResponse response = (HttpResponse) msg;
System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
}
if(msg instanceof HttpContent)
{
HttpContent content = (HttpContent)msg;
ByteBuf buf = content.content();
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
buf.release();
}
}
}