天天看點

分析 Netty 死鎖異常 BlockingOperationException

最近,我發現一些BlockOperationException異常出現在我的Netty4項目中,為什麼會出現這個異常?有人說,在Netty的ServerBootstrap啟動伺服器的時候,使用sync()或await()方法會造成死鎖,可我發現異常是出現在ChannelRead過程中,而且Bootstrap用的是bossGroup,而ChannelRead用的是workerGroup,兩者使用的EventLoop應該是不用的,我認為是不會互相影響的,那究竟是什麼原因産生思索異常呢?

我将這個問題釋出到了StackOverflow進行提問(https://stackoverflow.com/questions/46020266/what-causes-blockingoperationexception-in-netty-4),非常幸運的得到了Norman Maurer(Netty的核心貢獻者之一)的解答。

分析 Netty 死鎖異常 BlockingOperationException

下面我将整個問題的分析思路整理出來,與大家分享。

正文

在使用Netty的ServerBootstrap啟動伺服器的時候,使用sync()方法會導緻阻塞。

public void init() throws Exception {
    logger.info("start tcp server ...");
    Class clazz = NioServerSocketChannel.class;
    // Server 服務啟動
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(boosGroup, workerGroup);
    bootstrap.channel(clazz);
    bootstrap.childHandler(new ServerChannelInitializer(serverConfig));
    // 可選參數
    bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
    // 綁定接口,同步等待成功
    logger.info("start tcp server at port[" + port + "].");
    ChannelFuture future = bootstrap.bind(port).sync();
    future.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                logger.info("Server have success bind to " + port);
            } else {
                logger.error("Server fail bind to " + port);
                throw new InitErrorException("Server start fail !", future.cause());
            }
        }
    });
}
           

在這一行

ChannelFuture future = bootstrap.bind(port).sync();
           

bootstrap.bind()傳回一個ChannelFuture,檢視源代碼DefaultChannelGroupFuture,sync()方法會調用await(),在await()中對ChannelFuture對象進行了加鎖。

public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed(); // 異步操作失敗抛出異常
    return this;
}
           

sync()和await()很類似。

public Promise<V> await() throws InterruptedException {
    // 異步操作已經完成,直接傳回
    if (isDone()) {
        return this;    
    }
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    // 同步使修改waiters的線程隻有一個
    synchronized (this) {
        // 未完成則一直循環  
        while (!isDone()) { // 等待直到異步操作完成
            // 死鎖檢測
            checkDeadLock();
            incWaiters();   // ++waiters;
            try {
                wait(); // JDK方法
            } finally {
                decWaiters(); // --waiters
            }
        }
    }
    return this;
}
           

注意其中的checkDeadLock()方法用來進行死鎖檢測:

protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
        throw new BlockingOperationException(toString());
    }
}
           

e.inEventLoop()表示目前線程和executor的執行線程是同一個,即該線程上的一個任務等待該線程上的其他任務喚醒自己。我們知道線程的執行是線性,即前面的代碼執行完畢才能執行後面的代碼,是以這裡産生了一個死鎖。

在ChannelHandler方法中調用sync()或await()方法,會有可能引起死鎖,而在實踐中也偶發出現BlockingOperationException死鎖異常:

io.netty.util.concurrent.BlockingOperationException:AbstractChannel$CloseFuture(incomplete)
    at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:391)
    at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:157)
    at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:252)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:129)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:28)
    at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:219)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:117)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:28)
           

那麼什麼樣的代碼會産生這種情況呢,下面給出項目中出現死鎖的代碼:

private void channelWrite(T message) {
    boolean success = true;
    boolean sent = true;
    int timeout = 60;
    try {
        ChannelFuture cf = cxt.write(message);
        cxt.flush();
        if (sent) {
            success = cf.await(timeout);
        }
        if (cf.isSuccess()) {
            logger.debug("send success.");
        }
        Throwable cause = cf.cause();
        if (cause != null) {
            this.fireError(new PushException(cause));
        }
    } catch (LostConnectException e) {
        this.fireError(new PushException(e));
    } catch (Exception e) {
        this.fireError(new PushException(e));
    } catch (Throwable e) {
        this.fireError(new PushException("Failed to send message“, e));
    }
    if (!success) {
        this.fireError(new PushException("Failed to send message"));
    }
}
           

在這一行

ChannelFuture cf = cxt.write(message);
cxt.flush();
           

write方法隻是把發送資料放入一個緩存,而不會真實的發送,而flush則是将放入緩存的資料發送資料,如果不flush會發生什麼情況呢?目前線程會進入wait(),而分發送資料的代碼沒有被執行,因為發送資料的方法也是在目前線程中執行,這樣死鎖就産生了。

實踐中,使用了writeAndFlush方法,仍會小機率的出現死鎖異常,這又是為何?同時存在幾個疑惑和猜測:

  • ServerBootstrap啟動使用EventLoopGroup是bossGroup,而ChannelHandler進行IO讀寫使用EventLoopGroup是workerGroup,兩者的EventExecutor應該是互不影響的。

    而且,實踐中,死鎖異常也并不出現在Netty啟動過程中,多發生在Channel的writeAndFlush後await,是以推斷sync方法雖然會調用await方法,但并不是引起死鎖的主因。

  • 源碼分析checkDeadLock,BlockingOperationException異常抛出,是目前線程和executor的執行線程是同一個。在GIthub/Netty檢視了BlockingOperationException的問題,同樣說不能在同一個IO線程裡調用sync()或await()。
分析 Netty 死鎖異常 BlockingOperationException
  • e.inEventLoop()為true則會抛BlockingOperationException異常,checkDeadLock方法之後就會進入await方法,即current thread進入休眠等待notify或interrupt,如果notify或interrupt的thread是current thread,那麼current thread将出現死鎖,無法被喚醒。
  • 誰會notify或interrupt喚起current thread?

    current thread調用writeAndFlush方法發送資料并傳回一個ChannelFuture,并調用ChannelFuture的await方法進入休眠等待,當發送isDone()時,executor線程池fetch一個線程去喚醒(notify event)current thread,但如果fetch到current thread就會出現死鎖,因為目前線程已經休眠。

    我們回憶下wait和notify,調用某個對象的wait()方法能讓目前線程阻塞,調用該對象的notify()方法能夠喚醒這個個正在等待的線程。wait()、notify()實作的是線程間的協作。

  • 可checkDeadLock是在wait之前進行檢測的,那麼Netty在current thread進入休眠之前,就應該已經fetch出喚醒thread。可猜測BlockingOperationException異常,可能是executor在fetch線程時将current thread作為notify thread而進行的自我容錯檢測。

Netty 的線程模型:

分析 Netty 死鎖異常 BlockingOperationException

EventLoopGroup負責配置設定一個EventLoop到每個新建立的Channel。每個EventLoop處理綁定Channel的所有event和task。每個EventLoop和一個線程關聯。同一個EventLoop可能會被配置設定到多個Channel。

在Netty4,所有的I/O操作和event都是由配置設定給EventLoop的那一個Thread來處理的。

分析 Netty 死鎖異常 BlockingOperationException

讓我們再來分析一下cxt.write(message)的源代碼:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeWrite(msg, promise);
        if (flush) {
            next.invokeFlush();
        }
    } else {
        int size = channel.estimatorHandle().size(msg);
        if (size > 0) {
            ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
            if (buffer != null) {
      	        buffer.incrementPendingOutboundBytes(size);
            }
        }
        Runnable task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, msg, size, promise);
        }  else {
            task = WriteTask.newInstance(next, msg, size, promise);
        }
        safeExecute(executor, task, promise, msg);
    }
}

public EventExecutor executor() {
    if (executor == null) {
        return channel().eventLoop();
    } else {
        return executor;
    }
}

private static void safeExecute(EventExecutor executor, Runnable runnable, 
                                            ChannelPromise promise, Object msg) {
    try {
        executor.execute(runnable);
    } catch (Throwable cause) {
        try {
            promise.setFailure(cause);
        } finally {
            if (msg != null) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
}
           

注意這行:

EventExecutor executor = next.executor();
           

,擷取目前channel所綁定的eventLoop。如果目前調用線程就是配置設定給該Channel的EventLoop,代碼被執行。否則,EventLoop将task放入一個内部的隊列延後執行。

分析 Netty 死鎖異常 BlockingOperationException

EventLoop和EventExecutor什麼關系?

public interface EventLoop extends EventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}
           

是以,我們大緻分析出,在執行write方法時,Netty會判斷current thread是否就是分給該Channe的EventLoop,如果是則行線程執行IO操作,否則送出executor等待配置設定。當執行await方法時,會從executor裡fetch出執行線程,這裡就需要checkDeadLock,判斷執行線程和current threads是否時同一個線程,如果是就檢測為死鎖抛出異常BlockingOperationException。

分析 Netty 死鎖異常 BlockingOperationException

那如何解決?官方建議優先使用addListener(GenericFutureListener),而非await()。

// BAD - NEVER DO THIS
@Override
public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) {
    ChannelFuture future = ctx.channel().close();
    future.awaitUninterruptibly();
    // Perform post-closure operation
    // ...
}
// GOOD
@Override
public void channelRead(ChannelHandlerContext ctx,  GoodByeMessage msg) {
    ChannelFuture future = ctx.channel().close();
    future.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            // Perform post-closure operation
            // ...
        }
    });
}
           

項目代碼改造為:

private void pushMessage(T message) {
    try {
        ChannelFuture cf = cxt.writeAndFlush(message);
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws PushException {
                if (future.isSuccess()) {
                    logger.debug("send success.");
                } else {
                    throw new PushException("Failed to send message.");
                }
                Throwable cause = future.cause();
                if (cause != null) {
                    throw new PushException(cause);
                }
            }
        });
    } catch (LostConnectException e) {
        this.fireError(new PushException(e));
    } catch (Exception e) {
        this.fireError(new PushException(e));
    } catch (Throwable e) {
        this.fireError(new PushException(e));
    }
}