天天看點

Netty-Pipeline流水線詳解Pipeline流水線

文章目錄

  • 詳解Pipeline流水線
    • 背景
    • pipeline入站處理流程
      • code
    • Pipeline出站處理流程
      • code
      • 出站處理流程

詳解Pipeline流水線

背景

一條netty通道需要很多的handler業務處理器來處理業務,每條通道内部都有一條流水線Pipeline将handler裝配起來Netty的業務處理器流水線是基于責任鍊設計模式來設計的, 内部是一個雙向連結清單結構,支援動态添加或删除Handler業務處理器

pipeline入站處理流程

為了完整的延時Pipeline入站處理流程,建立三個入站處理器,然後加入到流水線中

code

package com.wangyg.netty.ch06;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import util.Logger;

import java.nio.ByteBuffer;

public class InPipeline {
    static class SimpleInhandlerA extends ChannelInboundHandlerAdapter{
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Logger.info("入站處理器A: 被回調 ");
            super.channelRead(ctx, msg);
        }
    }

    static  class SimpleInHandlerB extends  ChannelInboundHandlerAdapter{
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Logger.info("入站處理器B: 被回調 ");
            super.channelRead(ctx, msg);
        }
    }

    static  class SimpleInhandlerC extends ChannelInboundHandlerAdapter{
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Logger.info(" 入站處理器C : 被回調");
            super.channelRead(ctx, msg);
        }
    }

    @Test
    public void testPipelineInBound(){
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

            @Override
            protected void initChannel(EmbeddedChannel ch) throws Exception {
                //添加A
                ch.pipeline().addLast(new SimpleInhandlerA());
                //添加B
                ch.pipeline().addLast(new SimpleInHandlerB());
                //添加C
                ch.pipeline().addLast(new SimpleInhandlerC());
            }
        };


        EmbeddedChannel channel = new EmbeddedChannel(i); //傳入channel 初始化
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeInt(1); //寫入1
        //相同到寫入一個入站封包(資料包)
        channel.writeInbound(buffer);

        try {
            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

           

入站順序

A -> B ->C

Pipeline出站處理流程

建立三個出站處理器,添加順序為 A ->B ->C

code

package com.wangyg.netty.ch06;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import util.Logger;

public class OutPipeline {
    public class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            Logger.info("出站處理器A: 被回調");
            super.write(ctx, msg, promise);
        }
    }

    public class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            Logger.info("出站處理器B: 被回調");
            super.write(ctx, msg, promise);
        }
    }

    public class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            Logger.info("出站處理器C: 被回調");
            super.write(ctx, msg, promise);
        }
    }

    @Test
    public void testPipelineOutBound() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            @Override
            protected void initChannel(EmbeddedChannel ch) throws Exception {
                ch.pipeline().addLast(new SimpleOutHandlerA());
                ch.pipeline().addLast(new SimpleOutHandlerB());
                ch.pipeline().addLast(new SimpleOutHandlerC());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);

        ByteBuf buffer = Unpooled.buffer();
        buffer.writeInt(1);
        //向通道寫入一個出站封包(資料包)
        channel.writeOutbound(buffer);
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

           

出站處理流程

package com.wangyg.netty.ch06;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import util.Logger;

public class OutPipeline {
    public class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            Logger.info("出站處理器A: 被回調");
            super.write(ctx, msg, promise);
        }
    }

    public class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            Logger.info("出站處理器B: 被回調");
            super.write(ctx, msg, promise);
        }
    }

    public class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            Logger.info("出站處理器C: 被回調");
            super.write(ctx, msg, promise);
        }
    }

    @Test
    public void testPipelineOutBound() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            @Override
            protected void initChannel(EmbeddedChannel ch) throws Exception {
                ch.pipeline().addLast(new SimpleOutHandlerA());
                ch.pipeline().addLast(new SimpleOutHandlerB());
                ch.pipeline().addLast(new SimpleOutHandlerC());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);

        ByteBuf buffer = Unpooled.buffer();
        buffer.writeInt(1);
        //向通道寫入一個出站封包(資料包)
        channel.writeOutbound(buffer);
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

           

出站順序

C-> B ->A