天天看點

Netty4.0入門

<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.0.34.Final</version>
		</dependency>
           

建構伺服器:

/**
 * 
 */
package com.zuk.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ZukServer {
	
	public static void main(String[] args) {
		
		ServerBootstrap server = new ServerBootstrap();
		try {
			EventLoopGroup parentGroup = new NioEventLoopGroup();
			EventLoopGroup childGroup = new NioEventLoopGroup();
			server.group(parentGroup, childGroup);
			
			server.channel(NioServerSocketChannel.class);
			
			server.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					//接受消息解碼
					ch.pipeline().addLast(new ZukServerRequestDecoder());
					//消息處理
					ch.pipeline().addLast(new ZukServerHandler());
					//傳回消息編碼
					ch.pipeline().addLast(new ZukServerResponseEncoder());
					
				}
			});
			
			server.option(ChannelOption.SO_BACKLOG, 2048);// 連結緩沖池隊列大小
			server.bind(10102).sync();
			
			System.out.println("server started.");
			
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}

}
           

消息解碼

/**
 * 
 */
package com.zuk.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * @author HUANGLIAO322
 * 解析消息
 */
public class ZukServerRequestDecoder extends ByteToMessageDecoder{
	

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
		

		System.out.println("ZukRequestDecoder.decode");
		
		if(buffer.readableBytes()>0){
			byte[] data = new byte[buffer.readableBytes()];
			buffer.readBytes(data);
			String str = new String(data);
			System.out.println("str:"+str);
			//解析出消息對象,繼續往下面的handler傳遞
			out.add(str);
		}
		else
		{
			System.out.println("buffer is zero.");
		}
		

		
		//資料不完整,等待完整的資料包
		return ;
	}
}
           

消息處理

/**
 * 
 */
package com.zuk.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author HUANGLIAO322
 *
 */
public class ZukServerHandler  extends SimpleChannelInboundHandler<Object> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("msg:"+msg);
		Channel channel = ctx.channel();
		channel.writeAndFlush("hi,"+msg+",i'm from server.");
	}

}
           

消息編碼

/**
 * 
 */
package com.zuk.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import com.alibaba.fastjson.JSON;

/**
 * @author HUANGLIAO322
 *
 */
public class ZukServerResponseEncoder extends MessageToByteEncoder<Object>{

	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buffer)
			throws Exception {
		System.out.println("ZukResponseEncoder.encode:"+JSON.toJSON(msg));
		buffer.writeBytes(((String)msg).getBytes());

	} 
	
}
           

建構用戶端

/**
 * 
 */
package com.zuk.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class ZukClient {

	public static void main(String[] args) throws Exception {
		
		Bootstrap bootstrap = new Bootstrap();
		
		EventLoopGroup group = new NioEventLoopGroup();
		bootstrap.group(group);
		
		bootstrap.channel(NioSocketChannel.class);
		
		bootstrap.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline().addLast(new ZukClientResponseDecoder());
				ch.pipeline().addLast(new ZukClientRequestEncoder());
				ch.pipeline().addLast(new ZukClientHandler());
			}
		});
		
		// 連接配接服務端
		ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 10102));
		connect.sync();
		Channel channel = connect.channel();
		
		int i = 0;
		while(i++<100){
			Thread.sleep(1000);
			channel.writeAndFlush("pafc_"+i);
		}
		
		channel.close();
		
	}
	
}
           

用戶端消息處理

/**
 * 
 */
package com.zuk.client;

import com.alibaba.fastjson.JSON;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 *
 */
public class ZukClientHandler extends SimpleChannelInboundHandler<Object> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("ZukClientHandler.msg:"+JSON.toJSONString(msg));
	}

}
           
package com.zuk.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import com.alibaba.fastjson.JSON;
import com.cn.common.core.model.RequestMessage;
 
public class ZukClientRequestEncoder extends MessageToByteEncoder<Object>{

	@Override
	protected void encode(ChannelHandlerContext ctx, Object obj, ByteBuf buffer) throws Exception {

		System.out.println("ZukResponseEncoder.encode:"+JSON.toJSON(obj));
		buffer.writeBytes(((String)obj).getBytes());
		

	}
}
           
package com.zuk.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import com.cn.common.core.model.Response;
 
public class ZukClientResponseDecoder extends ByteToMessageDecoder{
	
	/**
	 * 資料包基本長度
	 */
	public static int BASE_LENTH = 4 + 2 + 2 + 4 + 4;

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
		
		System.out.println("ZukClientResponseDecoder.decode");
		
		if(buffer.readableBytes()>0){
			byte[] data = new byte[buffer.readableBytes()];
			buffer.readBytes(data);
			String str = new String(data);
			System.out.println("str:"+str);
			//解析出消息對象,繼續往下面的handler傳遞
			out.add(str);
		}
		else
		{
			System.out.println("buffer is zero.");
		}
		
//		while(true){
//			if(buffer.readableBytes() >= BASE_LENTH){
//				//第一個可讀資料包的起始位置
//				int beginIndex;
//				
//				while(true) {
//					//標頭開始遊标點
//					beginIndex = buffer.readerIndex();
//					//标記初始讀遊标位置
//					buffer.markReaderIndex();
//					if (buffer.readInt() == ConstantValue.HEADER_FLAG) {
//						break;
//					}
//					//未讀到標頭辨別略過一個位元組
//					buffer.resetReaderIndex();
//					buffer.readByte();
//					
//					//不滿足
//					if(buffer.readableBytes() < BASE_LENTH){
//						return ;
//					}
//				}
//				//讀取子產品号指令号
//				short module = buffer.readShort();
//				short cmd = buffer.readShort();
//				
//				int stateCode = buffer.readInt();
//				
//				//讀取資料長度 
//				int lenth = buffer.readInt();
//				if(lenth < 0 ){
//					ctx.channel().close();
//				}
//				
//				//資料包還沒到齊
//				if(buffer.readableBytes() < lenth){
//					buffer.readerIndex(beginIndex);
//					return ;
//				}
//				
//				//讀資料部分
//				byte[] data = new byte[lenth];
//				buffer.readBytes(data);
//				
//				Response response = new Response();
//				response.setModule(module);
//				response.setCmd(cmd);
//				response.setStateCode(stateCode);
//				response.setData(data);
//				//解析出消息對象,繼續往下面的handler傳遞
//				out.add(response);
//			}else{
//				break;
//			}
//		}
		//資料不完整,等待完整的資料包
		return ;
	}

}