1. 基本介紹
産生TCP粘包和拆包問題的主要原因是,作業系統在發送TCP資料的時候,底層會有一個緩沖區,例如1024個位元組大小,如果一次請求發送的資料量比較小,沒達到緩沖區大小,TCP則會将多個請求合并為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的資料量比較大,超過了緩沖區大小,TCP就會将其拆分為多次發送,這就是拆包,也就是将一個大的包拆分為多個小包進行發送。如下圖展示了TCP粘包和拆包的一個示意圖:

上圖中示範了TCP粘包和拆包的三種情況:
- A和B兩個包都剛好滿足TCP緩沖區的大小,或者說其等待時間已經達到TCP等待時長,進而還是使用兩個獨立的包進行發送;
- A和B兩次請求間隔時間内較短,并且資料包較小,因而合并為同一個包發送給服務端;
- B包比較大,因而将其拆分為兩個包B_1和B_2進行發送,而這裡由于拆分後的B_2比較小,其又與A包合并在一起發送。
2.執行個體
MyServer
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyServerHandler());
}
}
MyServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//将buffer轉成字元串
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("伺服器接收到資料 " + message);
System.out.println("伺服器接收到消息量=" + (++this.count));
//伺服器回送資料給用戶端, 回送一個随機id ,
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
ctx.writeAndFlush(responseByteBuf);
}
}
MyClient
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定義一個初始化類
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
MyClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用用戶端發送10條資料 hello,server 編号
for(int i= 0; i< 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("用戶端接收到消息=" + message);
System.out.println("用戶端接收消息數量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.解決方案
解決辦法
3.1 方法一:FixedLengthFrameDecoder
消息定長,封包大小固定長度,不夠空格補全,發送和接收方遵循相同的約定, 這樣即使粘包了通過接收方程式設計實作擷取定長封包也能區分。
3.2 方法二:DelimiterBasedFrameDecoder
封包添加特殊分隔符,
例如每條封包結束都添加回車換行符(例如FTP協定) 或者指定特殊字元作為封包分隔符,接收方通過特殊分隔符切分封包區分。
ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
3.3 使用自定義協定 + 編解碼器
執行個體:
- 要求用戶端發送 5 個 Message 對象, 用戶端每次發送一個 Message 對象
- 伺服器端每次接收一個Message, 分5次進行解碼, 每讀取到 一個Message , 會回複一個Message 對象 給用戶端.
/**
* 協定包
*
* @author Administrator
*/
public class MessageProtocol {
private int len;
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用用戶端發送5條資料 "今天天氣冷,吃火鍋" 編号
for (int i = 0; i < 5; i++) {
String mes = "學好netty,go,go,go";
byte[] content = mes.getBytes(Charset.forName("utf-8"));
int length = mes.getBytes(Charset.forName("utf-8")).length;
//建立協定包對象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
// @Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("用戶端接收到消息如下");
System.out.println("長度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("用戶端接收消息數量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("異常消息=" + cause.getMessage());
ctx.close();
}
}
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被調用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder decode 被調用");
//需要将得到二進制位元組碼-> MessageProtocol 資料包(對象)
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
//封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
}
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收到資料,并處理
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("============================");
System.out.println("伺服器接收到資訊如下");
System.out.println("長度=" + len);
System.out.println("内容=" + new String(content, StandardCharsets.UTF_8));
System.out.println("伺服器接收到消息包數量=" + (++this.count));
//回複消息
String responseContent = UUID.randomUUID().toString();
int responseLen = responseContent.getBytes(StandardCharsets.UTF_8).length;
byte[] responseContent2 = responseContent.getBytes(StandardCharsets.UTF_8);
//建構一個協定包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseLen);
messageProtocol.setContent(responseContent2);
ctx.writeAndFlush(messageProtocol);
}
}