文章目錄
- 詳解Pipeline流水線
-
- 背景
- pipeline入站處理流程
-
- code
- Pipeline出站處理流程
-
- code
- 出站處理流程
詳解Pipeline流水線
背景
一條netty通道需要很多的handler業務處理器來處理業務,每條通道内部都有一條流水線Pipeline将handler裝配起來Netty的業務處理器流水線是基于責任鍊設計模式來設計的, 内部是一個雙向連結清單結構,支援動态添加或删除Handler業務處理器
pipeline入站處理流程
為了完整的延時Pipeline入站處理流程,建立三個入站處理器,然後加入到流水線中
code
package com.wangyg.netty.ch06;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import util.Logger;
import java.nio.ByteBuffer;
public class InPipeline {
static class SimpleInhandlerA extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Logger.info("入站處理器A: 被回調 ");
super.channelRead(ctx, msg);
}
}
static class SimpleInHandlerB extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Logger.info("入站處理器B: 被回調 ");
super.channelRead(ctx, msg);
}
}
static class SimpleInhandlerC extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Logger.info(" 入站處理器C : 被回調");
super.channelRead(ctx, msg);
}
}
@Test
public void testPipelineInBound(){
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel ch) throws Exception {
//添加A
ch.pipeline().addLast(new SimpleInhandlerA());
//添加B
ch.pipeline().addLast(new SimpleInHandlerB());
//添加C
ch.pipeline().addLast(new SimpleInhandlerC());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i); //傳入channel 初始化
ByteBuf buffer = Unpooled.buffer();
buffer.writeInt(1); //寫入1
//相同到寫入一個入站封包(資料包)
channel.writeInbound(buffer);
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
入站順序
A -> B ->C
Pipeline出站處理流程
建立三個出站處理器,添加順序為 A ->B ->C
code
package com.wangyg.netty.ch06;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import util.Logger;
public class OutPipeline {
public class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Logger.info("出站處理器A: 被回調");
super.write(ctx, msg, promise);
}
}
public class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Logger.info("出站處理器B: 被回調");
super.write(ctx, msg, promise);
}
}
public class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Logger.info("出站處理器C: 被回調");
super.write(ctx, msg, promise);
}
}
@Test
public void testPipelineOutBound() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleOutHandlerA());
ch.pipeline().addLast(new SimpleOutHandlerB());
ch.pipeline().addLast(new SimpleOutHandlerC());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buffer = Unpooled.buffer();
buffer.writeInt(1);
//向通道寫入一個出站封包(資料包)
channel.writeOutbound(buffer);
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
出站處理流程
package com.wangyg.netty.ch06;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import util.Logger;
public class OutPipeline {
public class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Logger.info("出站處理器A: 被回調");
super.write(ctx, msg, promise);
}
}
public class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Logger.info("出站處理器B: 被回調");
super.write(ctx, msg, promise);
}
}
public class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Logger.info("出站處理器C: 被回調");
super.write(ctx, msg, promise);
}
}
@Test
public void testPipelineOutBound() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleOutHandlerA());
ch.pipeline().addLast(new SimpleOutHandlerB());
ch.pipeline().addLast(new SimpleOutHandlerC());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buffer = Unpooled.buffer();
buffer.writeInt(1);
//向通道寫入一個出站封包(資料包)
channel.writeOutbound(buffer);
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
出站順序
C-> B ->A