本章具體講解SpringBoot中如何內建Netty
1.搭建一個Springboot項目
一,服務端
1.項目結構目錄
2.導入jar包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
3.yml 配置
tcp:
port: 8555
boss:
thread:
count: 2
worker:
thread:
count: 2
so:
keepalive: true
backlog: 100
server:
port: 8888
4.建立TCP服務
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
/**
* @Author:hemingzhu
* @date:2019/07/10 15:34
* @Explanation:
*/
@Component
public class TCPServer {
@Autowired
@Qualifier("serverBootstrap")
private ServerBootstrap b;
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private ChannelFuture serverChannelFuture;
@PostConstruct
public void start() throws Exception {
System.out.println("Starting server at " + tcpPort);
serverChannelFuture = b.bind(tcpPort).sync();
}
@PreDestroy
public void stop() throws Exception {
serverChannelFuture.channel().closeFuture().sync();
}
public ServerBootstrap getB() {
return b;
}
public void setB(ServerBootstrap b) {
this.b = b;
}
public InetSocketAddress getTcpPort() {
return tcpPort;
}
public void setTcpPort(InetSocketAddress tcpPort) {
this.tcpPort = tcpPort;
}
}
5.初始化通道
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* @Author:hemingzhu
* @date:2019/07/10 15:30
* @Explanation:
*/
@Component
@Qualifier("springProtocolInitializer")
public class StringProtocolInitalizer extends ChannelInitializer<SocketChannel> {
@Autowired
StringDecoder stringDecoder;
@Autowired
StringEncoder stringEncoder;
@Autowired
NettyHandle nettyHandle;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", stringDecoder);
pipeline.addLast("handler", nettyHandle);
pipeline.addLast("encoder", stringEncoder);
}
public StringDecoder getStringDecoder() {
return stringDecoder;
}
public void setStringDecoder(StringDecoder stringDecoder) {
this.stringDecoder = stringDecoder;
}
public StringEncoder getStringEncoder() {
return stringEncoder;
}
public void setStringEncoder(StringEncoder stringEncoder) {
this.stringEncoder = stringEncoder;
}
public NettyHandle getNettyHandle() {
return nettyHandle;
}
public void setNettyHandle(NettyHandle nettyHandle) {
this.nettyHandle = nettyHandle;
}
}
6.Netty配置
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @Author:hemingzhu
* @date:2019/07/10 15:26
* @Explanation:
*/
@Configuration
public class NettyConfigTest {
//讀取yml中配置
@Value("${boss.thread.count}")
private int bossCount;
@Value("${worker.thread.count}")
private int workerCount;
@Value("${tcp.port}")
private int tcpPort;
@Value("${so.keepalive}")
private boolean keepAlive;
@Value("${so.backlog}")
private int backlog;
@Autowired
private NettyHandle nettyHandle;
//bootstrap配置
@SuppressWarnings("unchecked")
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.childHandler(nettyHandle);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes")
ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount);
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount);
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPort() {
return new InetSocketAddress(tcpPort);
}
@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions() {
Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
options.put(ChannelOption.SO_BACKLOG, backlog);
return options;
}
@Bean(name = "stringEncoder")
public StringEncoder stringEncoder() {
return new StringEncoder();
}
@Bean(name = "stringDecoder")
public StringDecoder stringDecoder() {
return new StringDecoder();
}
/**
* Necessary to make the Value annotations work.
*
* @return
*/
@Bean
public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}
7.NettyHandle事件處理
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
* @Author:hemingzhu
* @date:2019/07/10 15:26
* @Explanation:
*/
@Component
@Qualifier("serverHandler")
@ChannelHandler.Sharable
public class NettyHandle extends SimpleChannelInboundHandler<String> {
private static final Logger log = LoggerFactory.getLogger(NettyHandle.class);
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
log.info("client msg:"+msg);
String clientIdToLong= ctx.channel().id().asLongText();
log.info("client long id:"+clientIdToLong);
String clientIdToShort= ctx.channel().id().asShortText();
log.info("client short id:"+clientIdToShort);
if(msg.indexOf("bye")!=-1){
//close
ctx.channel().close();
}else{
//send to client
ctx.channel().writeAndFlush("Yoru msg is:"+msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
log.info("server 讀取資料……");
//讀取資料
byte[] req = readClientData((ByteBuf) msg);
String body = new String(req, "GBK"); //擷取到的值
log.info("用戶端的資料------>"+body);
sendInfo(ctx , "收到");
}
private void sendInfo(ChannelHandlerContext ctx , String info) {
ByteBuf bufff = Unpooled.buffer();
bufff.writeBytes(info.getBytes());
ctx.writeAndFlush(bufff);
ctx.flush();
}
private byte[] readClientData(ByteBuf msg) {
// logger.info("讀用戶端的資料.");
ByteBuf buf = msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
buf.release();
return req;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("開始連接配接");
sendInfo(ctx , "連接配接成功");
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("異常關閉");
sendInfo(ctx , "異常");
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("離線");
sendInfo(ctx , "離線");
super.channelInactive(ctx);
}
}
二、用戶端
1.NClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* @Author:hemingzhu
* @date:2019/07/08 14:01
* @Explanation:
*/
public class NClient {
private String host;
private int port;
public NClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup nioEventLoopGroup = null;
try {
//建立Bootstrap對象用來引導啟動用戶端
Bootstrap bootstrap = new Bootstrap();
//建立EventLoopGroup對象并設定到Bootstrap中,EventLoopGroup可以了解為是一個線程池,這個線程池用來處理連接配接、接受資料、發送資料
nioEventLoopGroup = new NioEventLoopGroup();
//建立InetSocketAddress并設定到Bootstrap中,InetSocketAddress是指定連接配接的伺服器位址
bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
//添加一個ChannelHandler,用戶端成功連接配接伺服器後就會被執行
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new NClientHandler());
}
});
// • 調用Bootstrap.connect()來連接配接伺服器
ChannelFuture f = bootstrap.connect().sync();
// • 最後關閉EventLoopGroup來釋放資源
f.channel().closeFuture().sync();
} finally {
nioEventLoopGroup.shutdownGracefully().sync();
}
}
}
2.NClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.ArrayList;
import java.util.List;
/**
* @Author:hemingzhu
* @date:2019/07/08 14:07
* @Explanation:
*/
public class NClientHandler extends ChannelInboundHandlerAdapter {
public static List<ChannelHandlerContext> cts = new ArrayList<ChannelHandlerContext>();
/**
* 向服務端發送資料
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("用戶端與服務端通道-開啟:" + ctx.channel().localAddress() + "channelActive");
cts.add(ctx);
String sendInfo = "你好服務端";
System.out.println("用戶端準備發送的資料包:" + sendInfo);
write(ctx,sendInfo);
}
public void write(ChannelHandlerContext ctx , String mess) throws Exception {
String sendInfo = mess;
ctx.writeAndFlush(Unpooled.copiedBuffer(sendInfo, CharsetUtil.UTF_8)); // 必須有flush
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//讀取資料
//讀取資料
ByteBuf buf1 = (ByteBuf) msg;
byte[] req = readClientData((ByteBuf) msg);
String body = new String(req, "UTF-8"); //擷取到的值
System.out.println("用戶端的資料------>"+body);
//寫資料
write(ctx,"wits寫的資料");
}
//将netty的資料裝換為位元組數組
private byte[] readClientData(ByteBuf msg) {
ByteBuf buf = msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
buf.release();
return req;
}
/**
* channelInactive
*
* channel 通道 Inactive 不活躍的
*
* 當用戶端主動斷開服務端的連結後,這個通道就是不活躍的。也就是說用戶端與服務端的關閉了通信通道并且不可以傳輸資料
*
*/
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("用戶端與服務端通道-關閉:" + ctx.channel().localAddress() + "channelInactive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cts.remove(ctx);
ctx.close();
System.out.println("異常退出:" + cause.getMessage());
}
}
3.mainTest
import com.herbert.client.NClient;
import com.herbert.finalPool.ConstantPool;
/**
* @Author:hemingzhu
* @date:2019/07/08 14:01
* @Explanation:
*/
public class TestMain {
public static void main(String[] args) throws Exception {
new NClient(ConstantPool.HOST, ConstantPool.PORT).start(); // 連接配接127.0.0.1/65535,并啟動
}
}