redission作為redis 官方推薦的java用戶端。 redission使用netty4.x作為網絡層。 redission使用異步io方式操作。這與jedis 同步io操作方式完全不同。
但是redission也提供了同步操作方式。
在org.redisson.client 包下面,RedisClient 提供了對netty client端的包裝。RedisClient 提供了同步或者異步連接配接redis伺服器的方式,同時也提供了斷線重連的機制。
ConnectionWatchdog 過濾器實作了斷線重連的機制。
在redission中也提供了編碼和解碼器。
RedisClient 使用ChannelGroup來管理實際的socketchannel連結。
在netty中,實際輸入輸出操作都是通過Channel通道來進行。雖然,netty和apache mina架構非常類似,這也加快了使用apache mina的人遷移到netty上面。
但是在netty 中使用Channel 管理輸入輸出流。在apache mina 是使用IoSession來管理輸入輸出流。
RedisClient 代碼注釋如下:
//redisclient use netty4.x
public class RedisClient {
//構造用戶端
private final Bootstrap bootstrap;
//連接配接位址
private final InetSocketAddress addr;
//管理用戶端
private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final long timeout;
public RedisClient(String host, int port) {
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) {
//連接配接端口
addr = new InetSocketAddress(host, port);
//構造用戶端
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
//配置業務處理。這裡實作了斷線重連機制和編碼解碼功能
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels),
new CommandEncoder(),
new CommandsListEncoder(),
new CommandsQueue(),
new CommandDecoder());
}
});
//設定連結逾時時間
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
this.timeout = timeout;
}
public InetSocketAddress getAddr() {
return addr;
}
long getTimeout() {
return timeout;
}
public Bootstrap getBootstrap() {
return bootstrap;
}
//同步連接配接方法
public RedisConnection connect() {
try {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
return new RedisConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("Unable to connect to " + addr, e);
}
}
//異步連接配接方式
public Future<RedisConnection> connectAsync() {
final Promise<RedisConnection> f = bootstrap.group().next().newPromise();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
RedisConnection c = new RedisConnection(RedisClient.this, future.channel());
f.setSuccess(c);
} else {
f.setFailure(future.cause());
}
}
});
return f;
}
public RedisPubSubConnection connectPubSub() {
try {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
return new RedisPubSubConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("Unable to connect to " + addr, e);
}
}
public Future<RedisPubSubConnection> connectPubSubAsync() {
final Promise<RedisPubSubConnection> f = bootstrap.group().next().newPromise();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
RedisPubSubConnection c = new RedisPubSubConnection(RedisClient.this, future.channel());
f.setSuccess(c);
} else {
f.setFailure(future.cause());
}
}
});
return f;
}
//同步關閉用戶端功能
public void shutdown() {
shutdownAsync().syncUninterruptibly();
}
//異步關閉用戶端功能
public ChannelGroupFuture shutdownAsync() {
return channels.close();
}
@Override
public String toString() {
return "RedisClient [addr=" + addr + "]";
}
}
在這裡 private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 來管理通道。
在RedisClient 實作了斷線重連機制。ConnectionWatchdog
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter 代碼如下:
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Bootstrap bootstrap;
private final ChannelGroup channels;
private static final int BACKOFF_CAP = 12;
public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) {
this.bootstrap = bootstrap;
this.channels = channels;
}
//當連接配接建立時,持有通道的引用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
ctx.fireChannelActive();
}
//當斷開連結時,使用短線重連功能更
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
if (!connection.isClosed()) {
//擷取連接配接線程池來做重新連結
EventLoopGroup group = ctx.channel().eventLoop().parent();
//開辟一個新線程,處理新連接配接任務
reconnect(group, connection);
}
ctx.fireChannelInactive();
}
//斷線重連,執行一次任務
private void reconnect(final EventLoopGroup group, final RedisConnection connection){
group.schedule(new Runnable() {
@Override
public void run() {
tryReconnect(group, connection, 1);
}
}, 100, TimeUnit.MILLISECONDS);
}
private void tryReconnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
if (connection.isClosed() || group.isShuttingDown()) {
return;
}
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
bootstrap.connect().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (connection.isClosed()) {
return;
}
try {
if (future.isSuccess()) {
log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
reconnect(connection, future.channel());
return;
}
} catch (RedisException e) {
log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), e);
}
//根據失敗嘗試次數,來決定嘗試連接配接的次數
int timeout = 2 << attempts;
group.schedule(new Runnable() {
@Override
public void run() {
tryReconnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
});
}
private void reconnect(final RedisConnection connection, final Channel channel) {
if (connection.getReconnectListener() != null) {
// new connection used only for channel init
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
Promise<RedisConnection> connectionFuture = bootstrap.group().next().newPromise();
connection.getReconnectListener().onReconnect(rc, connectionFuture);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) {
connection.updateChannel(channel);
resubscribe(connection);
}
}
});
} else {
connection.updateChannel(channel);
resubscribe(connection);
}
}
private void resubscribe(RedisConnection connection) {
if (connection instanceof RedisPubSubConnection) {
RedisPubSubConnection conn = (RedisPubSubConnection) connection;
for (Entry<String, Codec> entry : conn.getChannels().entrySet()) {
conn.subscribe(entry.getValue(), entry.getKey());
}
for (Entry<String, Codec> entry : conn.getPatternChannels().entrySet()) {
conn.psubscribe(entry.getValue(), entry.getKey());
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}
在channelActive 時間觸發時, 連接配接被添加到 private final ChannelGroup channels; 進行管理。
//當連接配接建立時,持有通道的引用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
ctx.fireChannelActive();
}
當channelActive 連結斷開時,觸發斷線重連。
//當斷開連結時,使用短線重連功能更
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
if (!connection.isClosed()) {
//擷取連接配接線程池來做重新連結
EventLoopGroup group = ctx.channel().eventLoop().parent();
//開辟一個新線程,處理新連接配接任務
reconnect(group, connection);
}
ctx.fireChannelInactive();
}
在這裡擷取了netty本身的線程池來管理新建立的線程。
在連接配接過程中涉及到監聽器。
public interface ReconnectListener {
void onReconnect(RedisConnection redisConnection, Promise<RedisConnection> connectionFuture) throws RedisException;
}
在redission中 RedisConnection 類中提供了同步或者異步讀寫資料的方法。
public class RedisConnection implements RedisCommands {
private static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection");
//對于netty client 的封裝
final RedisClient redisClient;
//表示連接配接的狀态
private volatile boolean closed;
//實際socketchannel通道,所有輸入輸出資料都是通過這個通道進行的。
volatile Channel channel;
//斷線重連監聽器
private ReconnectListener reconnectListener;
private long lastUsageTime;
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
this.redisClient = redisClient;
updateChannel(channel);
lastUsageTime = System.currentTimeMillis();
}
}
其實,redission 使用netty4.x 異步io 操作,但是通過public interface Future 來使用了同步操作。