要分析粘包問題,首先要從資料讀開始分析。那麼,netty從哪開始讀的呢?
-----------------------------------------------------------------------------
messageReceived
java.lang.Exception
at org.jboss.netty.channel.SimpleChannelHandler.stack(SimpleChannelHandler.java:331)
at org.jboss.netty.example.echo.EchoServerHandler.messageReceived(EchoServerHandler.java:47)
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:97)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:187)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
以上是Netty讀取消息以及觸發我們的MessageReceived處理邏輯的整個線程棧。
下面就讓我們從NioWorker.read開始分析
=============================================================
什麼時候執行read操作?
代碼如下:
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException
}
}
其實也就是說,當對某個key進行OP_READ檢測後,如果确實發生了資料可讀事件,就執行讀操作。
讀操作,又做了哪些事情?
============================================================
代碼如下:
@Override
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();//擷取對應的channel
final NioSocketChannel channel = (NioSocketChannel) k.attachment();//擷取對應的NioSocketChannel
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
//擷取臨時緩沖區
try {
while ((ret = ch.read(bb)) > 0) {//盡量往裡面讀資料
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {//如果讀到了資料
bb.flip();//轉化成讀模式
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);//這裡是關鍵的複制操作。
buffer.writerIndex(readBytes);//org.jboss.netty.buffer.BigEndianHeapChannelBuffer
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);//收到消息了,就觸發消息處理邏輯
}
if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}
return true;
}
這裡的bufferFactory
org.jboss.netty.buffer.HeapChannelBufferFactory
buffer對應的類是
org.jboss.netty.buffer.BigEndianHeapChannelBuffer
好,不管怎麼樣,現在擷取了消息,怎麼觸發消息處理機制呢?
---------------------------------------------------代碼如下:
public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {
channel.getPipeline().sendUpstream(
new UpstreamMessageEvent(channel, message, remoteAddress));
}
是以,這裡,其實交給對應的channel的處理管道pipiline來處理,這裡的管道可能有好幾個對象,這個由addLast函數添加。
注意2點:1 sendUpstream,這是處理的方向 2 UpstreamMessageEvent這是一個上行事件,隻有有能力處理的ctx裡的handler才可以處理。
------------------假如我們在自定義邏輯裡加入了一個繼承了SimpleChannelHandler的類或者SimpleChannelUpstreamHandler的類的對象(通過addLast函數)。
那麼,消息就會最終傳遞到我們這個函數裡。
----------------------------------------而這個函數,就是大部分時候,我們自己要增加的邏輯。
其實最基本的問題就是如何擷取本次的内容。
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
try{
BigEndianHeapChannelBuffer behcb= (BigEndianHeapChannelBuffer)e.getMessage();
System.out.println("\n\n\nreceived: "+new String(behcb.array(),"UTF-8"));
}catch(Exception e11){
}
}
其實根本原因在于,Netty剛開始會把socket裡的資料讀到一個ByteBuffer直接配置設定的緩沖區裡,比如說1024個位元組的容量,
最後讀了256個位元組,然後這256個資料會複制到BigEndianHeapChannelBuffer 的一個256長度的位元組數組裡。
是以我們此時通過這個256位元組數組,獲得了本次所讀的256個位元組的值,問題就解決了。
版權聲明:本文為CSDN部落客「weixin_34185512」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_34185512/article/details/92588404