天天看点

netty连接池FixedChannelPool应用

package com.zny.common.netty.connectpool;

import java.net.InetSocketAddress;

import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelOption;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.pool.FixedChannelPool;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.util.concurrent.Future;

public class ConnectPoolManager {

private final NioEventLoopGroup group = new NioEventLoopGroup();

private final Bootstrap bs = new Bootstrap();

private FixedChannelPool fixpool = null;

private InetSocketAddress remoteaddress = null;

public ConnectPoolManager(String host, int port, int maxconnect) {

    bs.group(group)

          .channel(NioSocketChannel.class)

          .option(ChannelOption.TCP_NODELAY, true)

     .option(ChannelOption.SO_KEEPALIVE, true);

    remoteaddress = InetSocketAddress.createUnresolved(host, port);

    bs.remoteAddress(remoteaddress);

    fixpool = new FixedChannelPool(bs, new DemoPoolHandler(), maxconnect);

    //创建时,打开最多连接数

}

//申请连接,没有申请到(或者网络断开),返回null

public Channel acquire(int seconds) { 

try {

Future<Channel> fch = fixpool.acquire();

Channel ch = fch.get(seconds, TimeUnit.SECONDS);

return ch;

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

//释放连接

public void release(Channel channel) { 

try {

if (channel != null) {

fixpool.release(channel);

}

} catch (Exception e) {

e.printStackTrace();

}

}

public DemoChannelHandler getChannelHandler(Channel channel) {

        ChannelPipeline pipeline = channel.pipeline();

        DemoChannelHandler ret = (DemoChannelHandler)(pipeline.last());

        return ret;

}

//单元测试

    public static void main(String[] args) {

   try {

    int maxconnect = 10;

    ConnectPoolManager pool = new ConnectPoolManager("127.0.0.1", 5001, maxconnect);

    Channel ch = pool.acquire(5);

        pool.release(ch);

        Channel[] chlist = new Channel[maxconnect];

        for(int i=0; i<maxconnect; i++) {

        chlist[i] = pool.acquire(1);

        }

        for(int i=0; i<maxconnect; i++) {

        pool.release(chlist[i]);

        }

   } catch (Exception ex) {

    ex.printStackTrace();

   }

    }

}

package ConnectPool;

import io.netty.channel.Channel;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.pool.ChannelPoolHandler;

import io.netty.channel.socket.nio.NioSocketChannel;

public class DemoPoolHandler implements ChannelPoolHandler {

@Override

public void channelReleased(Channel ch) throws Exception {

System.out.println("channelReleased");

}

@Override

public void channelAcquired(Channel ch) throws Exception {

System.out.println("channelAcquired");

}

@Override

public void channelCreated(Channel ch) throws Exception {

System.out.println("channelCreated");

NioSocketChannel channel = (NioSocketChannel) ch;

        channel.config().setKeepAlive(true);

        channel.config().setTcpNoDelay(true);

        ChannelPipeline pipeline = channel.pipeline();

        DemoChannelHandler handler = new DemoChannelHandler();

        pipeline.addLast(handler);

}

}

package ConnectPool;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandler.Sharable;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

@Sharable

public class DemoChannelHandler extends ChannelInboundHandlerAdapter {

private ChannelHandlerContext chctx;

    private final ByteBuf firstMessage;

    public DemoChannelHandler() {

        firstMessage = Unpooled.buffer(50);

        for (int i = 0; i < firstMessage.capacity(); i ++) {

            firstMessage.writeByte((byte)(i+48));

        }

    }

public boolean isWritable() {

    return this.chctx.channel().isWritable();

    }

    public void close() {

    if (chctx != null) {

    chctx.close();

    }

    }

    @Override

    public void channelActive(ChannelHandlerContext ctx) {

        this.chctx = ctx;

        ctx.writeAndFlush(firstMessage);

    }

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        this.chctx = ctx;

        ctx.write(msg);

    }

    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) {

       ctx.flush();

    }

}