雲栖号資訊:【 點選檢視更多行業資訊】
在這裡您可以找到不同行業的第一手的上雲資訊,還在等什麼,快來!
前言
談到RPC肯定繞不開TCP通信,而主流的RPC架構都依賴于Netty等通信架構,這時候我們還要考慮是使用長連接配接還是短連接配接:
- 短連接配接:每次通信結束後關閉連接配接,下次通信需要重新建立連接配接;優點就是無需管理連接配接,無需保活連接配接;
-
長連接配接:每次通信結束不關閉連接配接,連接配接可以複用,保證了性能;缺點就是連接配接需要統一管理,并且需要保活;
主流的RPC架構都會追求性能選擇使用長連接配接,是以如何保活連接配接就是一個重要的話題,也是本文的主題,下面會重點介紹一些保活政策;
為什麼需要保活
上面介紹的長連接配接、短連接配接并不是TCP提供的功能,是以長連接配接是需要應用端自己來實作的,包括:連接配接的統一管理,如何保活等;如何保活之前我們了解一下為什麼需要保活?
主要原因是網絡不是100%可靠的,我們建立好的連接配接可能由于網絡原因導緻連接配接已經不可用了,如果連接配接一直有消息往來,那麼系統馬上可以感覺到連接配接斷開;
但是我們系統可能長時間沒有消息來往,導緻系統不能及時感覺到連接配接不可用,也就是不能及時處理重連或者釋放連接配接;常見的保活政策使用心跳機制由應用層來實作,還有網絡層提供的TCP Keepalive保活探測機制;
TCP Keepalive機制
TCP Keepalive是作業系統實作的功能,并不是TCP協定的一部分,需要在作業系統下進行相關配置,開啟此功能後,如果連接配接在一段時間内沒有資料往來,TCP将發送Keepalive探針來确認連接配接的可用性,Keepalive幾個核心參數配置:
- tcp_keepalive_time:連接配接多長時間沒有資料往來發送探針請求,預設為7200s(2h);
- tcp_keepalive_probes:探測失敗重試的次數預設為10次;
-
tcp_keepalive_intvl:重試的間隔時間預設75s;
以上參數可以修改到/etc/sysctl.conf檔案中;是否使用Keepalive用來保活就夠了,其實還不夠,Keepalive隻是在網絡層就行保活,如果網絡本身沒有問題,但是系統由于其他原因已經不可用了,這時候Keepalive并不能發現;是以往往還需要結合心跳機制來一起使用;
心跳機制
何為心跳機制,簡單來講就是用戶端啟動一個定時器用來定時發送請求,服務端接到請求進行響應,如果多次沒有接受到響應,那麼用戶端認為連接配接已經斷開,可以斷開半打開的連接配接或者進行重連處理;下面以Dubbo為例來看看是如何具體實施的;
Dubbo2.6.X
在HeaderExchangeClient中啟動了定時器ScheduledThreadPoolExecutor來定期執行心跳請求:
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2,
new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
在執行個體化HeaderExchangeClient時啟動心跳定時器:
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
heartbeat預設為60秒,heartbeatTimeout預設為heartbeat*3,可以了解至少出現三次心跳請求還未收到回複才會任務連接配接已經斷開;HeartBeatTask為執行心跳的任務:
public void run() {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_READ\_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_WRITE\_TIMESTAMP);
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
// 發送心跳
}
if (lastRead != null && now - lastRead > heartbeatTimeout) {
if (channel instanceof Client) {
((Client) channel).reconnect();
} else {
channel.close();
}
}
}
}
因為Dubbo雙端都會發送心跳請求,是以可以發現有兩個時間點分别是:lastRead和lastWrite;當然時間和最後讀取,最後寫的時間間隔大于heartbeat就會發送心跳請求;
如果多次心跳未傳回結果,也就是最後讀取消息時間大于heartbeatTimeout會判定目前是Client還是Server,如果是Client會發起reconnect,Server會關閉連接配接,這樣的考慮是合理的,用戶端調用是強依賴可用連接配接的,而服務端可以等待用戶端重建立立連接配接;
以上隻是介紹的Client,同樣Server端也有相同的心跳處理,在可以檢視HeaderExchangeServer;
Dubbo2.7.0
Dubbo2.7.0的心跳機制在2.6.X的基礎上得到了加強,同樣在HeaderExchangeClient中使用HashedWheelTimer開啟心跳檢測,這是Netty提供的一個時間輪定時器,在任務非常多,并且任務執行時間很短的情況下,HashedWheelTimer比Schedule性能更好,特别适合心跳檢測;
HashedWheelTimer heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
TimeUnit.MILLISECONDS, Constants.TICKS\_PER\_WHEEL);
分别啟動了兩個定時任務:startHeartBeatTask和startReconnectTask:
private void startHeartbeatTimer() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); // init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
HeartbeatTimerTask:用來定時發送心跳請求,心跳間隔時間預設為60秒;這裡重新計算了時間,其實就是在原來的基礎上除以3,其實就是縮短了檢測間隔時間,增大了及時發現死鍊的機率;分别看一下兩個任務:
protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
}
}
同上檢測最後讀寫時間和heartbeat的大小,注:普通請求和心跳請求都會更新讀寫時間;Netty 在 Dubbo 中是如何應用的?這篇推薦大家看一下。
protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long now = now(); if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {
((Client) channel).reconnect();
} else {
channel.close();
}
}
}
同樣的在逾時的情況下,Client重連,Server關閉連接配接;同樣Server端也有相同的心跳處理,在可以檢視HeaderExchangeServer;
Dubbo2.7.1-X
在Dubbo2.7.1之後,借助了Netty提供的IdleStateHandler來實作心跳機制服務:
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
- readerIdleTime:讀逾時時間;
- writerIdleTime:寫逾時時間;
- allIdleTime:所有類型的逾時時間;
根據設定的逾時時間,循環檢查讀寫事件多久沒有發生了,在pipeline中加入IdleSateHandler之後,可以在此pipeline的任意Handler的userEventTriggered方法之中檢測IdleStateEvent事件;下面看看具體Client和Server端添加的IdleStateHandler:
Client端
protected void initChannel(Channel ch) throws Exception {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
ch.pipeline().addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
}
Client端在NettyClient中添加了IdleStateHandler,指定了讀寫逾時時間預設為60秒;60秒内沒有讀寫事件發生,會觸發IdleStateEvent事件在NettyClientHandler處理:
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
可以發現接收到IdleStateEvent事件發送了心跳請求;至于Client端如何處理重連,同樣在HeaderExchangeClient中使用HashedWheelTimer定時器啟動了兩個任務:心跳任務和重連任務,感覺這裡已經不需要心跳任務了,至于重連任務其實也可以放到userEventTriggered中處理;
Server端
protected void initChannel(NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
Server端指定的逾時時間預設為60*3秒,在NettyServerHandler中處理userEventTriggered
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try {
channel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
super.userEventTriggered(ctx, evt);
}
Server端在指定的逾時時間内沒有發生讀寫,會直接關閉連接配接;相比之前現在隻有Client發送心跳,單向發送心跳;
同樣的在HeaderExchangeServer中并沒有啟動多個認為,僅僅啟動了一個CloseTimerTask,用來檢測逾時時間關閉連接配接;感覺這個任務是不是也可以不需要了,IdleStateHandler已經實作了此功能;
綜上:在使用IdleStateHandler的情況下來同時在HeaderExchangeClient啟動心跳+重連機制,HeaderExchangeServer啟動了關閉連接配接機制;主要是因為IdleStateHandler是Netty架構特有了,而Dubbo是支援多種底層通訊架構的包括Mina,Grizzy等,應該是為了相容此類架構存在的;
總結
本文首先介紹了RPC中引入的長連接配接方式,繼而引出長連接配接的保活機制,為什麼需要保活?然後分别介紹了網絡層保活機制TCP Keepalive機制,應用層心跳機制;最後已Dubbo為例看各個版本中對心跳機制的進化。
【雲栖号線上課堂】每天都有産品技術專家分享!
課程位址:
https://yqh.aliyun.com/zhibo立即加入社群,與專家面對面,及時了解課程最新動态!
【雲栖号線上課堂 社群】
https://c.tb.cn/F3.Z8gvnK
原文釋出時間:2020-06-05
本文作者: ksfzhaohui317
本文來自:“
網際網路架構師 微信公衆号”,了解相關資訊可以關注“
網際網路架構師”