描述
socket長連接配接即服務端不斷開用戶端channel的連接配接,用戶端需要定時向服務端進行心跳檢測,服務端需要将過期未進行心跳檢測的socket關閉。
服務端關閉過期的channel連接配接: Netty提供了ScheduledFuture,可以通過ChannelHandlerContext.executor().schedule()建立,支援延時送出,也支援取消任務,為自動關閉提供了一個很好的實作方案。
實作Demo
消息定義
public class Msg {
/**
消息類型:
1:心跳檢測消息
2:普通消息
*/
private byte type;
/**消息長度*/
private int length;
/**消息内容*/
private String content;
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Msg{" +
"type=" + type +
", length=" + length +
", content='" + content + '\'' +
'}';
}
}
消息編碼
public class MsgEncoder extends MessageToByteEncoder<Msg> {
@Override
protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf byteBuf) throws Exception {
byteBuf.writeByte(msg.getType());
byteBuf.writeInt(msg.getLength());
if (!StringUtil.isNullOrEmpty(msg.getContent())) {
byteBuf.writeBytes(msg.getContent().getBytes());
}
}
}
消息解碼
public class MsgDecoder extends ReplayingDecoder<MsgDecoder.MsgState> {
/**
* 狀态類型通常是一個Enum ; 使用Void如果狀态管理是未使用
* TYPE: 消息類型
* LENGTH: 消息長度
* CONTENT: 消息内容
*/
public enum MsgState {
TYPE,
LENGTH,
CONTENT
}
public MsgDecoder() {
super(MsgState.TYPE);
}
private Msg msg;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
MsgState state = state();
switch (state) {
case TYPE:
msg = new Msg();
byte type = byteBuf.readByte();
msg.setType(type);
checkpoint(MsgState.LENGTH);
break;
case LENGTH:
int length = byteBuf.readInt();
msg.setLength(length);
if (length > 0) {
checkpoint(MsgState.CONTENT);
} else {
out.add(msg);
checkpoint(MsgState.TYPE);
}
break;
case CONTENT:
byte[] bytes = new byte[msg.getLength()];
byteBuf.readBytes(bytes);
String content = new String(bytes);
msg.setContent(content);
out.add(msg);
checkpoint(MsgState.TYPE);
break;
default:
throw new IllegalStateException("invalid state:" + state);
}
}
}
消息處理
@ChannelHandler.Sharable
public class MsgHandler extends SimpleChannelInboundHandler<Msg> {
private static Map<Integer, ChannelCache> channelCache = new HashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Msg msg) throws Exception {
System.out.println("收到消息,消息内容" + msg);
Channel channel = ctx.channel();
final int hashCode = channel.hashCode();
//判斷channel在緩存中
if (!channelCache.containsKey(hashCode)) {
//添加通道關閉的監聽器,當通道關閉時将channel從緩存中移除
channel.closeFuture().addListener(future -> {
channelCache.remove(hashCode);
});
//建立并執行定時任務 10秒後服務端主動将channel關閉
ScheduledFuture scheduledFuture = ctx.executor().schedule(
() -> {
channel.close();
}, 10, TimeUnit.SECONDS);
//将管道資訊放入緩存
channelCache.put(hashCode, new ChannelCache(channel, scheduledFuture));
}
switch (msg.getType()) {
//心跳檢測
case 1: {
//建立一個新的定時器
ScheduledFuture scheduledFuture = ctx.executor().schedule(
() -> channel.close(), 5, TimeUnit.SECONDS);
//重新設定channel過期定時器并将老的定時器取消
ChannelCache cache = channelCache.get(hashCode);
cache.getScheduledFuture().cancel(true);
cache.setScheduledFuture(scheduledFuture);
ctx.channel().writeAndFlush(msg);
break;
}
//普通消息
case 2: {
channelCache.entrySet().stream().forEach(entry -> {
Channel otherChannel = entry.getValue().getChannel();
otherChannel.writeAndFlush(msg);
});
break;
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (null != cause) {
cause.printStackTrace();
}
if (null != ctx) {
ctx.close();
}
}
}
channel緩存
public class ChannelCache {
private Channel channel;
private ScheduledFuture scheduledFuture;
public ChannelCache(Channel channel, ScheduledFuture scheduledFuture) {
this.channel = channel;
this.scheduledFuture = scheduledFuture;
}
。。。。
}
服務端
/**
* 基于netty的服務端
* 思路:
* socket長連接配接即服務端不斷開用戶端channel的連接配接,用戶端需要定時向服務端進行心跳檢測,服務端需要将過期未進行心跳檢測的socket關閉。
* 服務端關閉過期的channel連接配接:
* Netty提供了ScheduledFuture,可以通過ChannelHandlerContext.executor().schedule()建立,支援延時送出,也支援取消任務,
* 為自動關閉提供了一個很好的實作方案。
*/
public class LongConnServer {
private static final int port = 9999;
public static void main(String[] args) throws Exception {
LongConnServer server = new LongConnServer();
server.start();
}
public void start() throws Exception {
ServerBootstrap b = new ServerBootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
b.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline()
.addLast("decoder", new MsgDecoder())
.addLast("encoder", new MsgEncoder())
.addLast("handler", new MsgHandler());
}
})
// determining the number of connections queued
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
b.bind(port).sync();
}
}
用戶端
/**
* @describe: socket用戶端
* @author: houkai
*/
public class LongConnClient {
String host = "127.0.0.1";
int port = 9999;
public static void main(String[] args) throws Exception {
new LongConnClient().testLongConn();
}
public void testLongConn() throws Exception {
final Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
//獨立的線程 擷取服務端的響應消息
new Thread(() -> {
while (true) {
readResponse(socket);
}
}).start();
//每隔3秒進行一次心跳檢測
new Thread(() -> {
while (true) {
try {
heartCheck(socket);
Thread.sleep(3000);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//用戶端每一秒向服務端發送一跳消息
while (true) {
byte[] content = ("hello, I'm " + hashCode()).getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5);
byteBuffer.put((byte) 2);
byteBuffer.putInt(content.length);
byteBuffer.put(content);
socket.getOutputStream().write(byteBuffer.array());
Thread.sleep(1000);
}
}
/**
* 心跳檢測
*/
private void heartCheck(Socket socket) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(5);
byteBuffer.put((byte) 1);
byteBuffer.putInt(0);
socket.getOutputStream().write(byteBuffer.array());
}
/**
* 讀取響應的消息
*/
private void readResponse(final Socket socket) {
try {
InputStream in = socket.getInputStream();
byte[] buffer = new byte[1024];
int n;
while ((n = in.read(buffer)) > 0) {
System.out.println(new String(buffer, 0, n));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}