天天看點

Netty3.10.1:關于MessageReceived

要分析粘包問題,首先要從資料讀開始分析。那麼,netty從哪開始讀的呢?

-----------------------------------------------------------------------------

messageReceived

java.lang.Exception

 at org.jboss.netty.channel.SimpleChannelHandler.stack(SimpleChannelHandler.java:331)

 at org.jboss.netty.example.echo.EchoServerHandler.messageReceived(EchoServerHandler.java:47)

 at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)

 at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)

 at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)

 at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)

 at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)

 at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:97)

 at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)

 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)

 at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)

 at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:187)

 at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)

 at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)

 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

 at java.lang.Thread.run(Thread.java:662)

以上是Netty讀取消息以及觸發我們的MessageReceived處理邏輯的整個線程棧。

下面就讓我們從NioWorker.read開始分析

=============================================================

什麼時候執行read操作?

代碼如下:

 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {

            SelectionKey k = i.next();

            i.remove();

            try {

                int readyOps = k.readyOps();

                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {

                    if (!read(k)) {

                        // Connection already closed - no need to handle write.

                        continue;

                    }

                }

                if ((readyOps & SelectionKey.OP_WRITE) != 0) {

                    writeFromSelectorLoop(k);

                }

            } catch (CancelledKeyException e) {

                close(k);

            }

            if (cleanUpCancelledKeys()) {

                break; // break the loop to avoid ConcurrentModificationException

            }

        }

其實也就是說,當對某個key進行OP_READ檢測後,如果确實發生了資料可讀事件,就執行讀操作。

讀操作,又做了哪些事情?

============================================================

代碼如下:

@Override

    protected boolean read(SelectionKey k) {

        final SocketChannel ch = (SocketChannel) k.channel();//擷取對應的channel

        final NioSocketChannel channel = (NioSocketChannel) k.attachment();//擷取對應的NioSocketChannel 

        final ReceiveBufferSizePredictor predictor =

            channel.getConfig().getReceiveBufferSizePredictor();

        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

        final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

        int ret = 0;

        int readBytes = 0;

        boolean failure = true;

        ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());

        //擷取臨時緩沖區

        try {

            while ((ret = ch.read(bb)) > 0) {//盡量往裡面讀資料

                readBytes += ret;

                if (!bb.hasRemaining()) {

                    break;

                }

            }

            failure = false;

        } catch (ClosedChannelException e) {

            // Can happen, and does not need a user attention.

        } catch (Throwable t) {

            fireExceptionCaught(channel, t);

        }

        if (readBytes > 0) {//如果讀到了資料

            bb.flip();//轉化成讀模式

            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);

            buffer.setBytes(0, bb);//這裡是關鍵的複制操作。

            buffer.writerIndex(readBytes);//org.jboss.netty.buffer.BigEndianHeapChannelBuffer

            // Update the predictor.

            predictor.previousReceiveBufferSize(readBytes);

            // Fire the event.

            fireMessageReceived(channel, buffer);//收到消息了,就觸發消息處理邏輯

        }

        if (ret < 0 || failure) {

            k.cancel(); // Some JDK implementations run into an infinite loop without this.

            close(channel, succeededFuture(channel));

            return false;

        }

        return true;

    }

這裡的bufferFactory

org.jboss.netty.buffer.HeapChannelBufferFactory

buffer對應的類是

org.jboss.netty.buffer.BigEndianHeapChannelBuffer

好,不管怎麼樣,現在擷取了消息,怎麼觸發消息處理機制呢?

---------------------------------------------------代碼如下:

public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {

        channel.getPipeline().sendUpstream(

                new UpstreamMessageEvent(channel, message, remoteAddress));

    }

是以,這裡,其實交給對應的channel的處理管道pipiline來處理,這裡的管道可能有好幾個對象,這個由addLast函數添加。

注意2點:1 sendUpstream,這是處理的方向 2 UpstreamMessageEvent這是一個上行事件,隻有有能力處理的ctx裡的handler才可以處理。

------------------假如我們在自定義邏輯裡加入了一個繼承了SimpleChannelHandler的類或者SimpleChannelUpstreamHandler的類的對象(通過addLast函數)。

那麼,消息就會最終傳遞到我們這個函數裡。

----------------------------------------而這個函數,就是大部分時候,我們自己要增加的邏輯。

其實最基本的問題就是如何擷取本次的内容。

 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {

      try{

          BigEndianHeapChannelBuffer behcb= (BigEndianHeapChannelBuffer)e.getMessage();

          System.out.println("\n\n\nreceived: "+new String(behcb.array(),"UTF-8"));

     }catch(Exception e11){

     }

}

其實根本原因在于,Netty剛開始會把socket裡的資料讀到一個ByteBuffer直接配置設定的緩沖區裡,比如說1024個位元組的容量,

最後讀了256個位元組,然後這256個資料會複制到BigEndianHeapChannelBuffer 的一個256長度的位元組數組裡。

是以我們此時通過這個256位元組數組,獲得了本次所讀的256個位元組的值,問題就解決了。

版權聲明:本文為CSDN部落客「weixin_34185512」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_34185512/article/details/92588404