天天看點

Redis:redission 源代碼剖析1 連接配接建立

redission作為redis  官方推薦的java用戶端。 redission使用netty4.x作為網絡層。 redission使用異步io方式操作。這與jedis 同步io操作方式完全不同。

但是redission也提供了同步操作方式。

在org.redisson.client 包下面,RedisClient 提供了對netty client端的包裝。RedisClient 提供了同步或者異步連接配接redis伺服器的方式,同時也提供了斷線重連的機制。

ConnectionWatchdog   過濾器實作了斷線重連的機制。

在redission中也提供了編碼和解碼器。

RedisClient 使用ChannelGroup來管理實際的socketchannel連結。

在netty中,實際輸入輸出操作都是通過Channel通道來進行。雖然,netty和apache  mina架構非常類似,這也加快了使用apache  mina的人遷移到netty上面。

但是在netty 中使用Channel 管理輸入輸出流。在apache  mina 是使用IoSession來管理輸入輸出流。

RedisClient 代碼注釋如下:

//redisclient  use  netty4.x
public class RedisClient {
    //構造用戶端
    private final Bootstrap bootstrap;
    //連接配接位址
    private final InetSocketAddress addr;
    //管理用戶端
    private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private final long timeout;

    public RedisClient(String host, int port) {
        this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000);
    }

    public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) {
    	//連接配接端口
        addr = new InetSocketAddress(host, port);
        //構造用戶端
        bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
        //配置業務處理。這裡實作了斷線重連機制和編碼解碼功能
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
            	
                ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels),
                                        new CommandEncoder(),
                                        new CommandsListEncoder(),
                                        new CommandsQueue(),
                                        new CommandDecoder());
            }
        });
         //設定連結逾時時間
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
        this.timeout = timeout;
    }

    public InetSocketAddress getAddr() {
        return addr;
    }

    long getTimeout() {
        return timeout;
    }

    public Bootstrap getBootstrap() {
        return bootstrap;
    }
    //同步連接配接方法
    public RedisConnection connect() {
        try {
            ChannelFuture future = bootstrap.connect();
            future.syncUninterruptibly();
            return new RedisConnection(this, future.channel());
        } catch (Exception e) {
            throw new RedisConnectionException("Unable to connect to " + addr, e);
        }
    }
    //異步連接配接方式
    public Future<RedisConnection> connectAsync() {
        final Promise<RedisConnection> f = bootstrap.group().next().newPromise();
        ChannelFuture channelFuture = bootstrap.connect();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    RedisConnection c = new RedisConnection(RedisClient.this, future.channel());
                    f.setSuccess(c);
                } else {
                    f.setFailure(future.cause());
                }
            }
        });
        return f;
    }

    public RedisPubSubConnection connectPubSub() {
        try {
            ChannelFuture future = bootstrap.connect();
            future.syncUninterruptibly();
            return new RedisPubSubConnection(this, future.channel());
        } catch (Exception e) {
            throw new RedisConnectionException("Unable to connect to " + addr, e);
        }
    }

    public Future<RedisPubSubConnection> connectPubSubAsync() {
        final Promise<RedisPubSubConnection> f = bootstrap.group().next().newPromise();
        ChannelFuture channelFuture = bootstrap.connect();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    RedisPubSubConnection c = new RedisPubSubConnection(RedisClient.this, future.channel());
                    f.setSuccess(c);
                } else {
                    f.setFailure(future.cause());
                }
            }
        });
        return f;
    }
    //同步關閉用戶端功能
    public void shutdown() {
        shutdownAsync().syncUninterruptibly();
    }
    //異步關閉用戶端功能
    public ChannelGroupFuture shutdownAsync() {
        return channels.close();
    }

    @Override
    public String toString() {
        return "RedisClient [addr=" + addr + "]";
    }
}           

在這裡    private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);  來管理通道。

在RedisClient 實作了斷線重連機制。ConnectionWatchdog

public class ConnectionWatchdog extends ChannelInboundHandlerAdapter  代碼如下:

public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {

    private final Logger log = LoggerFactory.getLogger(getClass());

    private final Bootstrap bootstrap;
    private final ChannelGroup channels;
    private static final int BACKOFF_CAP = 12;

    public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) {
        this.bootstrap = bootstrap;
        this.channels  = channels;
    }
    //當連接配接建立時,持有通道的引用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
        ctx.fireChannelActive();
    }
    //當斷開連結時,使用短線重連功能更
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        RedisConnection connection = RedisConnection.getFrom(ctx.channel());
        if (!connection.isClosed()) {
        	//擷取連接配接線程池來做重新連結
            EventLoopGroup group = ctx.channel().eventLoop().parent();
            //開辟一個新線程,處理新連接配接任務
            reconnect(group, connection);
        }
        ctx.fireChannelInactive();
    }
    //斷線重連,執行一次任務
    private void reconnect(final EventLoopGroup group, final RedisConnection connection){
        group.schedule(new Runnable() {
            @Override
            public void run() {
                tryReconnect(group, connection, 1);
            }
        }, 100, TimeUnit.MILLISECONDS);
    }

    private void tryReconnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
        if (connection.isClosed() || group.isShuttingDown()) {
            return;
        }

        log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);

        bootstrap.connect().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(final ChannelFuture future) throws Exception {
                if (connection.isClosed()) {
                    return;
                }

                try {
                    if (future.isSuccess()) {
                        log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
                        reconnect(connection, future.channel());
                        return;
                    }
                } catch (RedisException e) {
                    log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), e);
                }
                //根據失敗嘗試次數,來決定嘗試連接配接的次數
                int timeout = 2 << attempts;
                group.schedule(new Runnable() {
                    @Override
                    public void run() {
                        tryReconnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
                    }
                }, timeout, TimeUnit.MILLISECONDS);
            }
        });
    }

    private void reconnect(final RedisConnection connection, final Channel channel) {
        if (connection.getReconnectListener() != null) {
            // new connection used only for channel init
            RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
            Promise<RedisConnection> connectionFuture = bootstrap.group().next().newPromise();
            connection.getReconnectListener().onReconnect(rc, connectionFuture);
            connectionFuture.addListener(new FutureListener<RedisConnection>() {
                @Override
                public void operationComplete(Future<RedisConnection> future) throws Exception {
                    if (future.isSuccess()) {
                        connection.updateChannel(channel);
                        resubscribe(connection);
                    }
                }
            });
        } else {
            connection.updateChannel(channel);
            resubscribe(connection);
        }
    }

    private void resubscribe(RedisConnection connection) {
        if (connection instanceof RedisPubSubConnection) {
            RedisPubSubConnection conn = (RedisPubSubConnection) connection;
            for (Entry<String, Codec> entry : conn.getChannels().entrySet()) {
                conn.subscribe(entry.getValue(), entry.getKey());
            }
            for (Entry<String, Codec> entry : conn.getPatternChannels().entrySet()) {
                conn.psubscribe(entry.getValue(), entry.getKey());
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}           

在channelActive 時間觸發時, 連接配接被添加到    private final ChannelGroup channels;  進行管理。

//當連接配接建立時,持有通道的引用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
        ctx.fireChannelActive();
    }           

當channelActive 連結斷開時,觸發斷線重連。

//當斷開連結時,使用短線重連功能更
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        RedisConnection connection = RedisConnection.getFrom(ctx.channel());
        if (!connection.isClosed()) {
        	//擷取連接配接線程池來做重新連結
            EventLoopGroup group = ctx.channel().eventLoop().parent();
            //開辟一個新線程,處理新連接配接任務
            reconnect(group, connection);
        }
        ctx.fireChannelInactive();
    }           

在這裡擷取了netty本身的線程池來管理新建立的線程。

在連接配接過程中涉及到監聽器。

public interface ReconnectListener {

    void onReconnect(RedisConnection redisConnection, Promise<RedisConnection> connectionFuture) throws RedisException;

}           

在redission中 RedisConnection  類中提供了同步或者異步讀寫資料的方法。

public class RedisConnection implements RedisCommands {

    private static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection");
    //對于netty client 的封裝
    final RedisClient redisClient;
    //表示連接配接的狀态
    private volatile boolean closed;
    //實際socketchannel通道,所有輸入輸出資料都是通過這個通道進行的。
    volatile Channel channel;
    //斷線重連監聽器
    private ReconnectListener reconnectListener;
    private long lastUsageTime;


    public RedisConnection(RedisClient redisClient, Channel channel) {
        super();
        this.redisClient = redisClient;

        updateChannel(channel);
        lastUsageTime = System.currentTimeMillis();
    }
}           

其實,redission 使用netty4.x  異步io 操作,但是通過public interface Future  來使用了同步操作。

繼續閱讀