我們知道在TCP長連接配接或者WebSocket長連接配接中一般我們都會使用心跳機制–即發送特殊的資料包來通告對方自己的業務還沒有辦完,不要關閉連結。
那麼心跳機制可以用來做什麼呢?
我們知道網絡的傳輸是不可靠的,當我們發起一個連結請求的過程之中會發生什麼事情誰都無法預料,或者斷電,伺服器重新開機,斷網線之類。
如果有這種情況的發生對方也無法判斷你是否還線上。是以這時候我們引入心跳機制,在長連結中雙方沒有資料互動的時候互相發送資料(可能是空包,也可能是特殊資料),對方收到該資料之後也回複相應的資料用以確定雙方都線上,這樣就可以確定目前連結是有效的。
1. 如何實作心跳機制
一般實作心跳機制由兩種方式:
TCP協定自帶的心跳機制來實作;
在應用層來實作。
但是TCP協定自帶的心跳機制系統預設是設定的是2小時的心跳頻率。它檢查不到機器斷電、網線拔出、防火牆這些斷線。而且邏輯層處理斷線可能也不是那麼好處理。另外該心跳機制是與TCP協定綁定的,那如果我們要是使用UDP協定豈不是用不了?是以一般我們都不用。
而一般我們自己實作呢大緻的政策是這樣的:
Client啟動一個定時器,不斷發送心跳;
Server收到心跳後,做出回應;
Server啟動一個定時器,判斷Client是否存在,這裡做判斷有兩種方法:時間差和簡單辨別。
時間差:
收到一個心跳包之後記錄目前時間;
判斷定時器到達時間,計算多久沒收到心跳時間=目前時間-上次收到心跳時間。如果改時間大于設定值則認為逾時。
簡單辨別:
收到心跳後設定連接配接辨別為true;
判斷定時器到達時間,如果未收到心跳則設定連接配接辨別為false;
今天我們來看一下Netty的心跳機制的實作,在Netty中提供了IdleStateHandler類來進行心跳的處理,它可以對一個 Channel 的 讀/寫設定定時器, 當 Channel 在一定事件間隔内沒有資料互動時(即處于 idle 狀态), 就會觸發指定的事件。
該類可以對三種類型的逾時做心跳機制檢測:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
readerIdleTimeSeconds:設定讀逾時時間;
writerIdleTimeSeconds:設定寫逾時時間;
allIdleTimeSeconds:同時為讀或寫設定逾時時間;
下面我們還是通過一個例子來講解IdleStateHandler的使用。
服務端:
public class HeartBeatServer {
private int port;
public HeartBeatServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerChannelInitializer());
try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
HeartBeatServer server = new HeartBeatServer(7788);
server.start();
}
}

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
private int loss_connect_time = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
//服務端對應着讀事件,當為READER_IDLE時觸發
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.READER_IDLE){
loss_connect_time++;
System.out.println("接收消息逾時");
if(loss_connect_time > 2){
System.out.println("關閉不活動的連結");
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
我們看到在handler中調用了userEventTriggered方法,IdleStateEvent的state()方法一個有三個值: READER_IDLE,WRITER_IDLE,ALL_IDLE。正好對應讀事件寫事件和讀寫事件。
再來寫一下用戶端:
public class HeartBeatsClient {
private int port;
private String address;
public HeartBeatsClient(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());
try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
client.start();
}
}
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
private static final int TRY_TIMES = 3;
private int currentTime = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活時間是:"+new Date());
System.out.println("連結已經激活");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止時間是:"+new Date());
System.out.println("關閉連結");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("目前輪詢時間:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
啟動服務端和用戶端我們看到輸出為:
我們再來屢一下思路:
首先用戶端激活channel,因為用戶端中并沒有發送消息是以會觸發用戶端的IdleStateHandler,它設定的寫逾時時間為3s;
然後觸發用戶端的事件機制進入userEventTriggered方法,在觸發器中計數并向用戶端發送消息;
服務端接收消息;
用戶端觸發器繼續輪詢發送消息,直到計數器滿不再向服務端發送消息;
服務端在IdleStateHandler設定的讀消息逾時時間5s内未收到消息,觸發了服務端中handler的userEventTriggered方法,于是關閉用戶端的連結。
大體我們的簡單心跳機制就是這樣的思路,通過事件觸發機制以及計數器的方式來實作,上面我們的案例中最後用戶端沒有發送消息的時候我們是強制斷開了用戶端的連結,那麼既然可以關閉,我們是不是也可是重新連結用戶端呢?因為萬一用戶端本身并不想關閉而是由于别的原因導緻他無法與服務端通信。下面我們來說一下重連機制。
當我們的服務端在未讀到用戶端消息逾時而關閉用戶端的時候我們一般在用戶端的finally塊中方的是關閉用戶端的代碼,這時我們可以做一下修改的,finally是一定會被執行新的,是以我們可以在finally塊中重新調用一下啟動用戶端的代碼,這樣就又重新啟動了用戶端了,上用戶端代碼:
/**
* 本Client為測試netty重連機制
* Server端代碼都一樣,是以不做修改
* 隻用在client端中做一下判斷即可
*/
public class HeartBeatsClient2 {
private int port;
private String address;
ChannelFuture future;
public HeartBeatsClient2(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());
try {
future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
//group.shutdownGracefully();
if (null != future) {
if (future.channel() != null && future.channel().isOpen()) {
future.channel().close();
}
}
System.out.println("準備重連");
start();
System.out.println("重連成功");
}
}
public static void main(String[] args) {
HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");
client.start();
}
}
其餘部分的代碼與上面的執行個體并無異同,隻需改造用戶端即可,我們再運作服務端和用戶端會看到用戶端雖然被關閉了,但是立馬又被重新開機: