天天看點

netty心跳過程中 發送消息失敗_Netty 中的心跳檢測機制

心跳檢測一般存在于建立長連接配接 或者 需要保活的場景。

心跳的使用場景

長連接配接的應用場景非常的廣泛,比如監控系統,IM系統,即時報價系統,推送服務等等。像這些場景都是比較注重實時性,如果每次發送資料都要進行一次DNS解析,建立連接配接的過程肯定是極其影響體驗。

而長連接配接的維護必然需要一套機制來控制。比如 HTTP/1.0 通過在 header 頭中添加 Connection:Keep-Alive參數,如果目前請求需要保活則添加該參數作為辨別,否則服務端就不會保持該連接配接的狀态,發送完資料之後就關閉連接配接。HTTP/1.1以後 Keep-Alive 是預設打開的。

Netty 是 基于 TCP 協定開發的,在四層協定 TCP 協定的實作中也提供了 keepalive 封包用來探測對端是否可用。TCP 層将在定時時間到後發送相應的 KeepAlive 探針以确定連接配接可用性。

tcp-keepalive,作業系統核心支援,但是不預設開啟,應用需要自行開啟,開啟之後有三個參數會生效,來決定一個 keepalive 的行為。

net.ipv4.tcp_keepalive_time = 7200

net.ipv4.tcp_keepalive_probes = 9

net.ipv4.tcp_keepalive_intvl = 75

tcp_keepalive_time: 在 TCP 保活打開的情況下,最後一次資料交換到 TCP 發送第一個保活探測包的間隔,即允許的持續空閑時長,或者說每次正常發送心跳的周期,預設值為7200s(2h);

tcp_keepalive_probes: 在 tcp_keepalive_time 之後,沒有接收到對方确認,繼續發送保活探測包次數,預設值為9(次);

tcp_keepalive_intvl:在 tcp_keepalive_time 之後,沒有接收到對方确認,繼續發送保活探測包的發送頻率,預設值為75s。

TCP KeepAlive 是用于檢測連接配接的死活,而心跳機制則附帶一個額外的功能:檢測通訊雙方的存活狀态。兩者聽起來似乎是一個意思,但實際上卻大相徑庭。

考慮一種情況,某台伺服器因為某些原因導緻負載超高,CPU 100%,無法響應任何業務請求,但是使用 TCP 探針則仍舊能夠确定連接配接狀态,這就是典型的連接配接活着但業務提供方已死的狀态,對用戶端而言,這時的最好選擇就是斷線後重新連接配接其他伺服器,而不是一直認為目前伺服器是可用狀态一直向目前伺服器發送些必然會失敗的請求。

可以通過如下指令檢視系統tcp-keepalive參數配置:

sysctl -a | grep keepalive

cat /proc/sys/net/ipv4/tcp_keepalive_time

sysctl net.ipv4.tcp_keepalive_time

Netty 中也提供了設定 tcp-keepalive 的設定:

netty心跳過程中 發送消息失敗_Netty 中的心跳檢測機制

設定:ChannelOption.SO_KEEPALIVE, true 表示打開 TCP 的 keepAlive 設定。

是以基礎協定對應用來說不是那麼盡善盡美,一個 Netty 服務端可能會面臨上萬個連接配接,如何去維護這些連接配接是應用應該去處理的事情。在 Netty 中提供了 IdleStateHandler 類專門用于處理心跳。

IdleStateHandler 的構造函數如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime,

long allIdleTime,TimeUnit unit){

}

第一個參數是隔多久檢查一下讀事件是否發生,如果 channelRead() 方法超過 readerIdleTime 時間未被調用則會觸發逾時事件調用 userEventTrigger() 方法;

第二個參數是隔多久檢查一下寫事件是否發生,writerIdleTime 寫空閑逾時時間設定,如果 write() 方法超過 writerIdleTime 時間未被調用則會觸發逾時事件調用 userEventTrigger() 方法;

第三個參數是全能型參數,隔多久檢查讀寫事件;

第四個參數表示目前的時間機關。

是以這裡可以分别控制讀,寫,讀寫逾時的時間,機關為秒,如果是0表示不檢測,是以如果全是0,則相當于沒添加這個 IdleStateHandler,連接配接是個普通的短連接配接。

Netty 中的心跳邏輯

下面示範一下在 Netty 中如果使用 IdleStateHandler, 整體代碼流程請見 :

先上代碼:

Server端:

package com.rickiyang.learn.keepAlive;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class KpServer {

private int port;

public KpServer(int port) {

this.port = port;

}

public void start(){

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ServerChannelInitializer());

try {

ChannelFuture future = server.bind(port).sync();

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

log.error("server start fail",e);

}finally {

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

public static void main(String[] args) {

KpServer server = new KpServer(7788);

server.start();

}

}

Initializer:

package com.rickiyang.learn.keepAlive;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class ServerChannelInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));

// 字元串解碼 和 編碼

pipeline.addLast("decoder", new StringDecoder());

pipeline.addLast("encoder", new StringEncoder());

// 自己的邏輯Handler

pipeline.addLast("handler", new KpServerHandler());

}

}

Handler:

package com.rickiyang.learn.keepAlive;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class KpServerHandler extends SimpleChannelInboundHandler {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info("server channelActive");

}

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

String message = (String) msg;

if ("heartbeat".equals(message)) {

log.info(ctx.channel().remoteAddress() + "===>server: " + message);

ctx.write("heartbeat");

ctx.flush();

}

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

IdleStateEvent event = (IdleStateEvent) evt;

if (IdleState.READER_IDLE.equals((event.state()))) {

ctx.writeAndFlush("heartbeat").addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;

}

}

super.userEventTriggered(ctx, evt);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

super.exceptionCaught(ctx, cause);

ctx.close();

}

}

用戶端代碼:

Client:

package com.rickiyang.learn.keepAlive;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class KpClient {

private int port;

private String address;

public KpClient(int port, String address) {

this.port = port;

this.address = address;

}

public void start(){

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group)

.channel(NioSocketChannel.class)

.handler(new ClientChannelInitializer());

try {

ChannelFuture future = bootstrap.connect(address,port).sync();

future.channel().writeAndFlush("Hello world, i'm online");

future.channel().closeFuture().sync();

} catch (Exception e) {

log.error("client start fail",e);

}finally {

group.shutdownGracefully();

}

}

public static void main(String[] args) {

KpClient client = new KpClient(7788,"127.0.0.1");

client.start();

}

}

Initializer:

package com.rickiyang.learn.keepAlive;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class ClientChannelInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

pipeline.addLast("decoder", new StringDecoder());

pipeline.addLast("encoder", new StringEncoder());

// 用戶端的邏輯

pipeline.addLast("handler", new KpClientHandler());

}

}

Handler:

package com.rickiyang.learn.keepAlive;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

import io.netty.util.CharsetUtil;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class KpClientHandler extends SimpleChannelInboundHandler {

private static final ByteBuf HEARTBEAT_SEQUENCE =

Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.UTF_8));

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

String message = (String)msg;

if("heartbeat".equals(message)) {

log.info(ctx.channel().remoteAddress() + "===>client: " + msg);

}

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if(evt instanceof IdleStateEvent) {

IdleStateEvent event = (IdleStateEvent) evt;

if(IdleState.WRITER_IDLE.equals(event.state())) {

ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;

}

}

super.userEventTriggered(ctx, evt);

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info("client channelActive");

ctx.fireChannelActive();

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

log.info("Client is close");

}

}

解釋一下代碼的邏輯:

服務端添加了:

pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));

每隔5s檢查一下是否有讀事件發生,如果沒有就處罰 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。

用戶端添加了:

new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)

每隔4s檢查一下是否有寫事件,如果沒有就觸發 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。

大家可以再本地啟動工程,看一下觸發的邏輯。

IdleStateHandler邏輯分析

心跳檢測也是一種 Handler,在啟動時添加到 ChannelPipeline 管道中,當有讀寫操作時消息在其中傳遞。首先我們看到 IdleStateHandler 繼承了 ChannelDuplexHandler:

public class IdleStateHandler extends ChannelDuplexHandler {

...

}

表明 IdleStateHandler 也可以同時處理入站和出站事件,是以可以同時監控讀事件和寫事件。

IdleStateHandler 的 channelActive() 方法在 socket 通道建立時被觸發:

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

initialize(ctx);

super.channelActive(ctx);

}

其中 channelActive() 方法調用 Initialize() 方法,根據配置的 readerIdleTime、writeIdleTIme 等逾時事件參數往任務隊列 taskQueue 中添加定時任務 task:

private void initialize(ChannelHandlerContext ctx) {

// Avoid the case where destroy() is called before scheduling timeouts.

// See: https://github.com/netty/netty/issues/143

//這裡判斷狀态,避免重複初始化

switch (state) {

case 1:

case 2:

return;

}

state = 1;

EventExecutor loop = ctx.executor();

//初始化最後一次讀寫時間

lastReadTime = lastWriteTime = System.nanoTime();

// 根據使用者設定的讀空閑時間啟動一個定時任務,讀空閑時間為頻率執行

// 這裡的 schedule 方法會調用 eventLoop 的 schedule 方法,将定時任務添加進隊列中

if (readerIdleTimeNanos > 0) {

readerIdleTimeout = loop.schedule(

new ReaderIdleTimeoutTask(ctx),

readerIdleTimeNanos, TimeUnit.NANOSECONDS);

}

// 根據使用者設定的寫空閑時間啟動一個定時任務,寫空閑時間為頻率執行

if (writerIdleTimeNanos > 0) {

writerIdleTimeout = loop.schedule(

new WriterIdleTimeoutTask(ctx),

writerIdleTimeNanos, TimeUnit.NANOSECONDS);

}

// 根據使用者設定的讀寫空閑時間啟動一個定時任務,讀寫空閑時間為頻率執行

if (allIdleTimeNanos > 0) {

allIdleTimeout = loop.schedule(

new AllIdleTimeoutTask(ctx),

allIdleTimeNanos, TimeUnit.NANOSECONDS);

}

}

看到這裡或者沒看這裡你也應該能想到,這種監控性的任務肯定是使用定時任務類似這種機制來進行。

上面有一個 state 字段:

private byte state;

0:初始狀态,1:已經初始化, 2: 已經銷毀。

上面的 switch 判斷隻有目前狀态為 0 即初始化狀态的時候才執行下面的操作,避免多次送出定時任務。

定時任務添加到對應線程 EventLoopExecutor 對應的任務隊列 taskQueue 中,在對應線程的 run() 方法中循環執行:

用目前時間減去最後一次 channelRead 方法調用的時間判斷是否空閑逾時;

如果空閑逾時則建立空閑逾時事件并傳遞到 channelPipeline 中。

隻要給定的參數大于0,就建立一個定時任務,每個事件都建立。同時,将 state 狀态設定為 1,防止重複初始化。

讀事件處理:ReaderIdleTimeoutTask

來看讀事件是如何處理的, ReaderIdleTimeoutTask:

private final class ReaderIdleTimeoutTask implements Runnable {

private final ChannelHandlerContext ctx;

ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {

this.ctx = ctx;

}

@Override

public void run() {

if (!ctx.channel().isOpen()) {

return;

}

// nextDelay = 目前時間-最後一次時間

long nextDelay = readerIdleTimeNanos;

if (!reading) {

nextDelay -= System.nanoTime() - lastReadTime;

}

if (nextDelay <= 0) {

// 重新定義readerIdleTimeout schedule,與initialize方法設定的相同,繼續執行定時任務

readerIdleTimeout =

ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

try {

// event = new IdleStateEvent(IdleState.READER_IDLE, true),将event設定為讀空閑

IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, firstReaderIdleEvent);

if (firstReaderIdleEvent) {

firstReaderIdleEvent = false;

}

//channelIdle的主要工作就是将evt傳輸給下一個Handler

channelIdle(ctx, event);

} catch (Throwable t) {

ctx.fireExceptionCaught(t);

}

} else {

// 如果nextDelay>0,則說明用戶端在規定時間内已經寫入資料了

// 重新定義readerIdleTimeout schedule,以nextDelay為執行頻率

readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);

}

}

}

nextDelay的初始化值為逾時秒數readerIdleTimeNanos,如果檢測的時候沒有正在讀,就計算多久沒讀了:

nextDelay = nextDelay - 目前時間 - 上次讀取時間

如果小于0,說明左邊的 readerIdleTimeNanos 小于空閑時間(目前時間 - 上次讀取時間),表示已經逾時,

建立 IdleStateEvent 事件,IdleState 枚舉值為 READER_IDLE,然後調用 channelIdle(ctx, event) 方法分發給下一個 ChannelInboundHandler。

總的來說,每次讀取操作都會記錄一個時間,定時任務時間到了,會計算目前時間和最後一次讀的時間的間隔,如果間隔超過了設定的時間,就觸發 UserEventTriggered() 方法。

寫事件處理:WriterIdleTimeoutTask

寫事件,WriterIdleTimeoutTask:

private final class WriterIdleTimeoutTask implements Runnable {

private final ChannelHandlerContext ctx;

WriterIdleTimeoutTask(ChannelHandlerContext ctx) {

this.ctx = ctx;

}

@Override

public void run() {

if (!ctx.channel().isOpen()) {

return;

}

long lastWriteTime = IdleStateHandler.this.lastWriteTime;

long nextDelay = writerIdleTimeNanos - (System.nanoTime() - lastWriteTime);

if (nextDelay <= 0) {

// Writer is idle - set a new timeout and notify the callback.

writerIdleTimeout = ctx.executor().schedule(

this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

try {

IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, firstWriterIdleEvent);

if (firstWriterIdleEvent) {

firstWriterIdleEvent = false;

}

channelIdle(ctx, event);

} catch (Throwable t) {

ctx.fireExceptionCaught(t);

}

} else {

// Write occurred before the timeout - set a new timeout with shorter delay.

writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);

}

}

}

寫逾時時間:

nextDelay = writerIdleTimeNanos - (System.nanoTime() - lastWriteTime)

寫逾時也是跟讀逾時同理,每次寫操作都記錄操作時間。

IdleStateHandler 心跳檢測主要是通過向線程任務隊列中添加定時任務,判斷 channelRead() 方法或 write() 方法是否調用空閑逾時,如果逾時則觸發逾時事件執行自定義 userEventTrigger() 方法。

Netty 通過 IdleStateHandler 實作最常見的心跳機制不是一種雙向心跳的 PING-PONG 模式,而是用戶端發送心跳資料包,服務端接收心跳但不回複,因為如果服務端同時有上千個連接配接,心跳的回複需要消耗大量網絡資源。

如果服務端一段時間内一直收到用戶端的心跳資料包則認為用戶端已經下線,将通道關閉避免資源的浪費。在這種心跳模式下服務端可以感覺用戶端的存活情況,無論是當機的正常下線還是網絡問題的非正常下線,服務端都能感覺到,而用戶端不能感覺到服務端的非正常下線。

要想實作用戶端感覺服務端的存活情況,需要進行雙向的心跳;Netty 中的 channelInactive() 方法是通過 Socket 連接配接關閉時揮手資料包觸發的,是以可以通過 channelInactive() 方法感覺正常的下線情況,但是因為網絡異常等非正常下線則無法感覺。上面的示例隻做了用戶端和服務端雙向心跳測試,大家可以補充一下如果一段時間内都收到的是用戶端的心跳包則判定連接配接無效關閉連接配接的邏輯。