天天看點

netty~ 基于netty實作服務端的長連接配接

描述

        socket長連接配接即服務端不斷開用戶端channel的連接配接,用戶端需要定時向服務端進行心跳檢測,服務端需要将過期未進行心跳檢測的socket關閉。

        服務端關閉過期的channel連接配接: Netty提供了ScheduledFuture,可以通過ChannelHandlerContext.executor().schedule()建立,支援延時送出,也支援取消任務,為自動關閉提供了一個很好的實作方案。

實作Demo

消息定義

public class Msg {
    /**
    消息類型:
        1:心跳檢測消息
        2:普通消息
     */
    private byte type;
    /**消息長度*/
    private int length;
    /**消息内容*/
    private String content;

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "Msg{" +
                "type=" + type +
                ", length=" + length +
                ", content='" + content + '\'' +
                '}';
    }
}
           

消息編碼

public class MsgEncoder extends MessageToByteEncoder<Msg> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf byteBuf) throws Exception {
        byteBuf.writeByte(msg.getType());
        byteBuf.writeInt(msg.getLength());
        if (!StringUtil.isNullOrEmpty(msg.getContent())) {
            byteBuf.writeBytes(msg.getContent().getBytes());
        }
    }
}
           

消息解碼

public class MsgDecoder extends ReplayingDecoder<MsgDecoder.MsgState> {
    /**
     * 狀态類型通常是一個Enum ; 使用Void如果狀态管理是未使用
     * TYPE:    消息類型
     * LENGTH:  消息長度
     * CONTENT: 消息内容
     */
    public enum MsgState {
        TYPE,
        LENGTH,
        CONTENT
    }

    public MsgDecoder() {
        super(MsgState.TYPE);
    }

    private Msg msg;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        MsgState state = state();
        switch (state) {
            case TYPE:
                msg = new Msg();
                byte type = byteBuf.readByte();
                msg.setType(type);
                checkpoint(MsgState.LENGTH);
                break;
            case LENGTH:
                int length = byteBuf.readInt();
                msg.setLength(length);
                if (length > 0) {
                    checkpoint(MsgState.CONTENT);
                } else {
                    out.add(msg);
                    checkpoint(MsgState.TYPE);
                }
                break;
            case CONTENT:
                byte[] bytes = new byte[msg.getLength()];
                byteBuf.readBytes(bytes);
                String content = new String(bytes);
                msg.setContent(content);
                out.add(msg);
                checkpoint(MsgState.TYPE);
                break;
            default:
                throw new IllegalStateException("invalid state:" + state);
        }
    }
}
           

消息處理

@ChannelHandler.Sharable
public class MsgHandler extends SimpleChannelInboundHandler<Msg> {

    private static Map<Integer, ChannelCache> channelCache = new HashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Msg msg) throws Exception {
        System.out.println("收到消息,消息内容" + msg);

        Channel channel = ctx.channel();
        final int hashCode = channel.hashCode();
        //判斷channel在緩存中
        if (!channelCache.containsKey(hashCode)) {
            //添加通道關閉的監聽器,當通道關閉時将channel從緩存中移除
            channel.closeFuture().addListener(future -> {
                channelCache.remove(hashCode);
            });
            //建立并執行定時任務 10秒後服務端主動将channel關閉
            ScheduledFuture scheduledFuture = ctx.executor().schedule(
                    () -> {
                        channel.close();
                    }, 10, TimeUnit.SECONDS);
            //将管道資訊放入緩存
            channelCache.put(hashCode, new ChannelCache(channel, scheduledFuture));
        }

        switch (msg.getType()) {
            //心跳檢測
            case 1: {
                //建立一個新的定時器
                ScheduledFuture scheduledFuture = ctx.executor().schedule(
                        () -> channel.close(), 5, TimeUnit.SECONDS);
                //重新設定channel過期定時器并将老的定時器取消
                ChannelCache cache = channelCache.get(hashCode);
                cache.getScheduledFuture().cancel(true);
                cache.setScheduledFuture(scheduledFuture);
                ctx.channel().writeAndFlush(msg);
                break;
            }
            //普通消息
            case 2: {
                channelCache.entrySet().stream().forEach(entry -> {
                    Channel otherChannel = entry.getValue().getChannel();
                    otherChannel.writeAndFlush(msg);
                });
                break;
            }
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (null != cause) {
            cause.printStackTrace();
        }
        if (null != ctx) {
            ctx.close();
        }
    }
}
           

channel緩存

public class ChannelCache {
    private Channel channel;

    private ScheduledFuture scheduledFuture;

    public ChannelCache(Channel channel, ScheduledFuture scheduledFuture) {
        this.channel = channel;
        this.scheduledFuture = scheduledFuture;
    }

   。。。。
}
           

服務端

/**
 * 基于netty的服務端
 * 思路:
 *  socket長連接配接即服務端不斷開用戶端channel的連接配接,用戶端需要定時向服務端進行心跳檢測,服務端需要将過期未進行心跳檢測的socket關閉。
 * 服務端關閉過期的channel連接配接:
 *   Netty提供了ScheduledFuture,可以通過ChannelHandlerContext.executor().schedule()建立,支援延時送出,也支援取消任務,
 *   為自動關閉提供了一個很好的實作方案。
 */
public class LongConnServer {
    private static final int port = 9999;
    public static void main(String[] args) throws Exception {
        LongConnServer server = new LongConnServer();
        server.start();
    }

    public void start() throws Exception {
        ServerBootstrap b = new ServerBootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        b.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)
                            throws Exception {
                        ch.pipeline()
                                .addLast("decoder", new MsgDecoder())
                                .addLast("encoder", new MsgEncoder())
                                .addLast("handler", new MsgHandler());
                    }
                })
                // determining the number of connections queued
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);

        b.bind(port).sync();
    }
}
           

用戶端

/**
 * @describe: socket用戶端
 * @author: houkai
 */
public class LongConnClient {

    String host = "127.0.0.1";
    int port = 9999;

    public static void main(String[] args) throws Exception {
        new LongConnClient().testLongConn();
    }

    public void testLongConn() throws Exception {
        final Socket socket = new Socket();
        socket.connect(new InetSocketAddress(host, port));

        //獨立的線程 擷取服務端的響應消息
        new Thread(() -> {
            while (true) {
                readResponse(socket);
            }
        }).start();

        //每隔3秒進行一次心跳檢測
        new Thread(() -> {
            while (true) {
                try {
                    heartCheck(socket);
                    Thread.sleep(3000);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //用戶端每一秒向服務端發送一跳消息
        while (true) {
            byte[] content = ("hello, I'm  " + hashCode()).getBytes();
            ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5);
            byteBuffer.put((byte) 2);
            byteBuffer.putInt(content.length);
            byteBuffer.put(content);
            socket.getOutputStream().write(byteBuffer.array());
            Thread.sleep(1000);
        }
    }

    /**
     * 心跳檢測
     */
    private void heartCheck(Socket socket) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(5);
        byteBuffer.put((byte) 1);
        byteBuffer.putInt(0);
        socket.getOutputStream().write(byteBuffer.array());
    }

    /**
     * 讀取響應的消息
     */
    private void readResponse(final Socket socket) {
        try {
            InputStream in = socket.getInputStream();
            byte[] buffer = new byte[1024];
            int n;
            while ((n = in.read(buffer)) > 0) {
                System.out.println(new String(buffer, 0, n));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
           

繼續閱讀