天天看點

Springboot內建Netty本章具體講解SpringBoot中如何內建Netty

本章具體講解SpringBoot中如何內建Netty

1.搭建一個Springboot項目

一,服務端

1.項目結構目錄

Springboot內建Netty本章具體講解SpringBoot中如何內建Netty

2.導入jar包

<dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha2</version>
  </dependency>
           

3.yml 配置

tcp:
  port: 8555
boss:
  thread:
    count: 2
worker:
  thread:
    count: 2
so:
  keepalive:  true
  backlog: 100

server:
  port: 8888
           

4.建立TCP服務

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;

/**
 * @Author:hemingzhu
 * @date:2019/07/10 15:34
 * @Explanation:
 */
@Component
public class TCPServer {
    @Autowired
    @Qualifier("serverBootstrap")
    private ServerBootstrap b;

    @Autowired
    @Qualifier("tcpSocketAddress")
    private InetSocketAddress tcpPort;

    private ChannelFuture serverChannelFuture;

    @PostConstruct
    public void start() throws Exception {
        System.out.println("Starting server at " + tcpPort);
        serverChannelFuture = b.bind(tcpPort).sync();
    }

    @PreDestroy
    public void stop() throws Exception {
        serverChannelFuture.channel().closeFuture().sync();
    }

    public ServerBootstrap getB() {
        return b;
    }

    public void setB(ServerBootstrap b) {
        this.b = b;
    }

    public InetSocketAddress getTcpPort() {
        return tcpPort;
    }

    public void setTcpPort(InetSocketAddress tcpPort) {
        this.tcpPort = tcpPort;
    }
}
           

5.初始化通道

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
 * @Author:hemingzhu
 * @date:2019/07/10 15:30
 * @Explanation:
 */
@Component
@Qualifier("springProtocolInitializer")
public class StringProtocolInitalizer extends ChannelInitializer<SocketChannel> {

    @Autowired
    StringDecoder stringDecoder;

    @Autowired
    StringEncoder stringEncoder;

    @Autowired
    NettyHandle nettyHandle;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decoder", stringDecoder);
        pipeline.addLast("handler", nettyHandle);
        pipeline.addLast("encoder", stringEncoder);
    }

    public StringDecoder getStringDecoder() {
        return stringDecoder;
    }

    public void setStringDecoder(StringDecoder stringDecoder) {
        this.stringDecoder = stringDecoder;
    }

    public StringEncoder getStringEncoder() {
        return stringEncoder;
    }

    public void setStringEncoder(StringEncoder stringEncoder) {
        this.stringEncoder = stringEncoder;
    }

    public NettyHandle getNettyHandle() {
        return nettyHandle;
    }

    public void setNettyHandle(NettyHandle nettyHandle) {
        this.nettyHandle = nettyHandle;
    }
}
           

6.Netty配置

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * @Author:hemingzhu
 * @date:2019/07/10 15:26
 * @Explanation:
 */
@Configuration
public class NettyConfigTest {

    //讀取yml中配置
    @Value("${boss.thread.count}")
    private int bossCount;

    @Value("${worker.thread.count}")
    private int workerCount;

    @Value("${tcp.port}")
    private int tcpPort;

    @Value("${so.keepalive}")
    private boolean keepAlive;

    @Value("${so.backlog}")
    private int backlog;

    @Autowired
    private NettyHandle nettyHandle;

    //bootstrap配置
    @SuppressWarnings("unchecked")
    @Bean(name = "serverBootstrap")
    public ServerBootstrap bootstrap() {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup(), workerGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(nettyHandle);
        Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
        Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
        for (@SuppressWarnings("rawtypes")
                ChannelOption option : keySet) {
            b.option(option, tcpChannelOptions.get(option));
        }
        return b;
    }

    @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
    public NioEventLoopGroup bossGroup() {
        return new NioEventLoopGroup(bossCount);
    }

    @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
    public NioEventLoopGroup workerGroup() {
        return new NioEventLoopGroup(workerCount);
    }

    @Bean(name = "tcpSocketAddress")
    public InetSocketAddress tcpPort() {
        return new InetSocketAddress(tcpPort);
    }

    @Bean(name = "tcpChannelOptions")
    public Map<ChannelOption<?>, Object> tcpChannelOptions() {
        Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
        options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
        options.put(ChannelOption.SO_BACKLOG, backlog);
        return options;
    }

    @Bean(name = "stringEncoder")
    public StringEncoder stringEncoder() {
        return new StringEncoder();
    }

    @Bean(name = "stringDecoder")
    public StringDecoder stringDecoder() {
        return new StringDecoder();
    }

    /**
     * Necessary to make the Value annotations work.
     *
     * @return
     */
    @Bean
    public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }
}
           

7.NettyHandle事件處理

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * @Author:hemingzhu
 * @date:2019/07/10 15:26
 * @Explanation:
 */
@Component
@Qualifier("serverHandler")
@ChannelHandler.Sharable
public class NettyHandle extends SimpleChannelInboundHandler<String> {
    private static final Logger log = LoggerFactory.getLogger(NettyHandle.class);

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("client msg:"+msg);
        String clientIdToLong= ctx.channel().id().asLongText();
        log.info("client long id:"+clientIdToLong);
        String clientIdToShort= ctx.channel().id().asShortText();
        log.info("client short id:"+clientIdToShort);
        if(msg.indexOf("bye")!=-1){
            //close
            ctx.channel().close();
        }else{
            //send to client
            ctx.channel().writeAndFlush("Yoru msg is:"+msg);

        }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
        log.info("server 讀取資料……");
        //讀取資料

        byte[] req = readClientData((ByteBuf) msg);
        String body = new String(req, "GBK"); //擷取到的值
        log.info("用戶端的資料------>"+body);

        sendInfo(ctx , "收到");


    }

    private void sendInfo(ChannelHandlerContext ctx , String info) {
        ByteBuf bufff = Unpooled.buffer();
        bufff.writeBytes(info.getBytes());
        ctx.writeAndFlush(bufff);
        ctx.flush();
    }

    private byte[] readClientData(ByteBuf msg) {
//        logger.info("讀用戶端的資料.");
        ByteBuf buf = msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        buf.release();
        return req;
    }



    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("開始連接配接");
        sendInfo(ctx , "連接配接成功");

        super.channelActive(ctx);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("異常關閉");
        sendInfo(ctx , "異常");
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("離線");
        sendInfo(ctx , "離線");
        super.channelInactive(ctx);
    }





}
           

二、用戶端

1.NClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * @Author:hemingzhu
 * @date:2019/07/08 14:01
 * @Explanation:
 */
public class NClient {

    private  String host;
    private  int port;

    public NClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup nioEventLoopGroup = null;
        try {
            //建立Bootstrap對象用來引導啟動用戶端
            Bootstrap bootstrap = new Bootstrap();
            //建立EventLoopGroup對象并設定到Bootstrap中,EventLoopGroup可以了解為是一個線程池,這個線程池用來處理連接配接、接受資料、發送資料
            nioEventLoopGroup = new NioEventLoopGroup();
            //建立InetSocketAddress并設定到Bootstrap中,InetSocketAddress是指定連接配接的伺服器位址
            bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        //添加一個ChannelHandler,用戶端成功連接配接伺服器後就會被執行
                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(new NClientHandler());
                        }
                    });
            // • 調用Bootstrap.connect()來連接配接伺服器
            ChannelFuture f = bootstrap.connect().sync();
            // • 最後關閉EventLoopGroup來釋放資源
            f.channel().closeFuture().sync();
        } finally {
            nioEventLoopGroup.shutdownGracefully().sync();
        }
    }


}
           

2.NClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author:hemingzhu
 * @date:2019/07/08 14:07
 * @Explanation:
 */
public class NClientHandler extends ChannelInboundHandlerAdapter {

    public static List<ChannelHandlerContext> cts = new ArrayList<ChannelHandlerContext>();


    /**
     * 向服務端發送資料
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("用戶端與服務端通道-開啟:" + ctx.channel().localAddress() + "channelActive");
        cts.add(ctx);
        String sendInfo = "你好服務端";
        System.out.println("用戶端準備發送的資料包:" + sendInfo);
        write(ctx,sendInfo);
    }

    public void write(ChannelHandlerContext ctx , String mess) throws Exception {
        String sendInfo = mess;
        ctx.writeAndFlush(Unpooled.copiedBuffer(sendInfo, CharsetUtil.UTF_8)); // 必須有flush
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //讀取資料

        //讀取資料
        ByteBuf buf1 = (ByteBuf) msg;
        byte[] req = readClientData((ByteBuf) msg);
        String body = new String(req, "UTF-8"); //擷取到的值
        System.out.println("用戶端的資料------>"+body);
        //寫資料
        write(ctx,"wits寫的資料");

    }

    //将netty的資料裝換為位元組數組
    private byte[] readClientData(ByteBuf msg) {
        ByteBuf buf = msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        buf.release();
        return req;
    }

    /**
     * channelInactive
     *
     * channel 通道 Inactive 不活躍的
     *
     * 當用戶端主動斷開服務端的連結後,這個通道就是不活躍的。也就是說用戶端與服務端的關閉了通信通道并且不可以傳輸資料
     *
     */
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("用戶端與服務端通道-關閉:" + ctx.channel().localAddress() + "channelInactive");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cts.remove(ctx);
        ctx.close();
        System.out.println("異常退出:" + cause.getMessage());
    }

}
           

3.mainTest

import com.herbert.client.NClient;
import com.herbert.finalPool.ConstantPool;

/**
 * @Author:hemingzhu
 * @date:2019/07/08 14:01
 * @Explanation:
 */
public class TestMain {

    public static void main(String[] args) throws Exception {
        new NClient(ConstantPool.HOST, ConstantPool.PORT).start(); // 連接配接127.0.0.1/65535,并啟動


    }

}
           

繼續閱讀