此章節比較零散,主要為與Spring整合及業務處理做準備,沒有涉及到具體的配置,都是一些工具類的實作。
具體的整合Spring,使用的一下的這些類,下面的一章描述了如何使用這些類,看的比較暈的,可以多看看直接的配置,了解mina的運作流程。
完整的項目架構:
統一通信類
- 規範消息類型
目的:使用統一的封裝類型,服務端接收固定的消息對象,服務端發送固定的消息對象,規範用戶端、服務端的互動;
實作:服務端接收SentBody對象,服務端發送ReplyBody對象
(參考:spring內建mina 實作消息推送以及轉發)
Message消息常量:
/**
* @author ZERO
* @Description 消息常量
*/
public class Message {
public static class ReturnCode {
public static String CODE_404 = "404";
public static String CODE_403 = "403"; //該賬号未綁定
public static String CODE_405 = "405"; //事物未定義
public static String CODE_200 = "200"; //成功
public static String CODE_500 = "500"; //未知錯誤
}
public static final String SESSION_KEY = "account";
/**
* 服務端心跳請求指令
*/
public static final String CMD_HEARTBEAT_REQUEST = "hb_request";
/**
* 用戶端心跳響應指令
*/
public static final String CMD_HEARTBEAT_RESPONSE = "hb_response";
/**
* 逾時次數
*/
public static final String TIME_OUT_NUM = "timeOutNum";
public static class MessageType {
// 使用者 踢出下線消息類型
public static String TYPE_999 = "999";
}
}
SentBody服務端接收消息對象:
/**
* @author ZERO
* @Description 服務端接收消息對象
*/
public class SentBody implements Serializable {
private static final long serialVersionUID = 1L;
private String key;
private HashMap<String, String> data;
private long timestamp;
public SentBody() {
data = new HashMap<String, String>();
timestamp = System.currentTimeMillis();
}
public String getKey() {
return key;
}
public String get(String k) {
return data.get(k);
}
public void put(String k, String v) {
data.put(k, v);
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public void setKey(String key) {
this.key = key;
}
public void remove(String k) {
data.remove(k);
}
public HashMap<String, String> getData() {
return data;
}
@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
buffer.append("<sent>");
buffer.append("<key>").append(key).append("</key>");
buffer.append("<timestamp>").append(timestamp).append("</timestamp>");
buffer.append("<data>");
for (String key : data.keySet()) {
buffer.append("<" + key + ">").append(data.get(key)).append(
"</" + key + ">");
}
buffer.append("</data>");
buffer.append("</sent>");
return buffer.toString();
}
public String toXmlString() {
return toString();
}
}
ReplyBody服務端發送消息對象:
/**
* @author ZERO
* @Description 服務端發送消息對象
*/
public class ReplyBody implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 請求key
*/
private String key;
/**
* 傳回碼
*/
private String code;
/**
* 傳回說明
*/
private String message;
/**
* 傳回資料集合
*/
private HashMap<String, String> data;
private long timestamp;
public ReplyBody()
{
data = new HashMap<String, String>();
timestamp = System.currentTimeMillis();
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public void put(String k, String v) {
data.put(k, v);
}
public String get(String k) {
return data.get(k);
}
public void remove(String k) {
data.remove(k);
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public HashMap<String, String> getData() {
return data;
}
public void setData(HashMap<String, String> data) {
this.data = data;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String toString()
{
StringBuilder buffer = new StringBuilder();
buffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
buffer.append("<reply>");
buffer.append("<key>").append(this.getKey()).append("</key>");
buffer.append("<timestamp>").append(timestamp).append("</timestamp>");
buffer.append("<code>").append(code).append("</code>");
buffer.append("<message>").append(message).append("</message>");
buffer.append("<data>");
for(String key:this.getData().keySet())
{
buffer.append("<"+key+">").append(this.get(key)).append("</"+key+">");
}
buffer.append("</data>");
buffer.append("</reply>");
return buffer.toString();
}
public String toXmlString() {
return toString();
}
public String toJson() {
return new Gson().toJson(this, ReplyBody.class);
}
}
擴充Session及其管理
目的:友善對session會話進行管理,友善對session會話集合擷取和删除
實作:服務端接收到新的Session後,構造一個封裝類,實作session 的部分方法,并額外實作方法
Session 封裝類:
/**
* IoSession包裝類
*/
public class PcmSession implements Serializable {
// 不參與序列化
private transient IoSession session;
// 全局ID
private String gid;
// session在本機器的ID
private Long nid;
// session綁定的服務ip
private String host;
// session綁定的賬号
private String account;
// session綁定的賬戶消息
private String message;
// 掃描數量
private String scanNum;
// 經度
private String longitude;
// 緯度
private String latitude;
// session綁定時間
private Long bindTime;
// 心跳時間
private Long heartbeat;
public PcmSession(){}
public PcmSession(IoSession session) {
this.session = session;
this.host = (String) session.getAttribute("address");
this.nid = session.getId();
}
/**
* 将key-value自定義屬性,存儲到IO會話中
*/
public void setAttribute(String key, Object value) {
if (null != session) {
session.setAttribute(key, value);
}
}
/**
* 從IO的會話中,擷取key的value
*/
public Object getAttribute(String key) {
if (null != session) {
return session.getAttribute(key);
}
return null;
}
/**
* 在IO的會話中,判斷是否存在包含key-value
*/
public boolean containsAttribute(String key) {
if (null != session) {
return session.containsAttribute(key);
}
return false;
}
/**
* 從IO的會話中,删除key
*/
public void removeAttribute(String key) {
if (null != session) {
session.removeAttribute(key);
}
}
/**
* 擷取IP位址
*/
public SocketAddress getRemoteAddress() {
if (null != session) {
return session.getRemoteAddress();
}
return null;
}
/**
* 将消息對象 message發送到目前連接配接的對等體(異步)
* 當消息被真正發送到對等體的時候,IoHandler.messageSent(IoSession,Object)會被調用。
* @param msg 發送的消息
*/
public void write(Object msg) {
if (null != session) {
CustomPack pack = new CustomPack((String) msg);
session.write(pack).isWritten();
}
}
/**
* 發送消息重載,是否為請求
* @param msg 發送的消息
* @param isRequest 是否為請求
*/
public void write(Object msg, boolean isRequest) {
if (null != session) {
byte flag = isRequest ? CustomPack.REQUEST : CustomPack.RESPONSE;
CustomPack pack = new CustomPack(flag, (String) msg);
session.write(pack).isWritten();
}
}
/**
* 會話是否已經連接配接
*/
public boolean isConnected() {
if (null != session) {
return session.isConnected();
}
return false;
}
/**
* 會話是否為本地連接配接
*/
public boolean isLocalHost() {
try {
String ip = InetAddress.getLocalHost().getHostAddress();
return ip.endsWith(host);
} catch (UnknownHostException e) {
e.printStackTrace();
}
return false;
}
/**
* 關閉目前連接配接。如果參數 immediately為 true的話
* 連接配接會等到隊列中所有的資料發送請求都完成之後才關閉;否則的話就立即關閉。
*/
public void close(boolean immediately) {
if (null != session) {
if (immediately) {
session.closeNow();
} else {
session.closeOnFlush();
}
}
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (null == obj) {
return false;
}
if (this.getClass() != obj.getClass()) {
return false;
}
// 強轉為目前類
PcmSession session = (PcmSession) obj;
if (session.nid != null && nid != null) {
return session.nid.longValue() == nid.longValue() && session.host.equals(host);
}
return false;
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("{");
buffer.append("\"").append("gid").append("\":").append("\"").append(gid).append("\"").append(",");
buffer.append("\"").append("nid").append("\":").append(nid).append(",");
buffer.append("\"").append("host").append("\":").append("\"").append(host).append("\"").append(",");
buffer.append("\"").append("account").append("\":").append("\"").append(account).append("\"").append(",");
buffer.append("\"").append("message").append("\":").append(message).append(",");
buffer.append("\"").append("longitude").append("\":").append(longitude).append(",");
buffer.append("\"").append("latitude").append("\":").append(latitude).append(",");
buffer.append("\"").append("bindTime").append("\":").append(bindTime).append(",");
buffer.append("\"").append("heartbeat").append("\":").append(heartbeat);
buffer.append("}");
return buffer.toString();
}
public IoSession getSession() {
return session;
}
public void setSession(IoSession session) {
this.session = session;
}
public String getGid() {
return gid;
}
public void setGid(String gid) {
this.gid = gid;
}
public Long getNid() {
return nid;
}
public void setNid(Long nid) {
this.nid = nid;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getScanNum() {
return scanNum;
}
public void setScanNum(String scanNum) {
this.scanNum = scanNum;
}
public String getLongitude() {
return longitude;
}
public void setLongitude(String longitude) {
this.longitude = longitude;
}
public String getLatitude() {
return latitude;
}
public void setLatitude(String latitude) {
this.latitude = latitude;
}
public Long getBindTime() {
return bindTime;
}
public void setBindTime(Long bindTime) {
this.bindTime = bindTime;
}
public Long getHeartbeat() {
return heartbeat;
}
public void setHeartbeat(Long heartbeat) {
this.heartbeat = heartbeat;
}
}
Session管理接口:
/**
* Session 管理接口
*/
public interface SessionManager {
/**
* 添加session
*/
void addSession(String account, PcmSession session);
/**
* 擷取session
*/
PcmSession getSession(String account);
/**
* 替換Session
*/
void replaceSession(String account, PcmSession session);
/**
* 删除session
*/
void removeSession(String account);
/**
* 删除session
*/
void removeSession(PcmSession pcmSession);
}
SessionManager接口實作類DefaultSessionManagerImpl
實作:内部建立一個線程安全的ConcurrentHashMap,存放封裝的session對象
/**
* 預設session管理接口實作類
*/
public class DefaultSessionManagerImpl extends Observable implements SessionManager {
/**
* 存放session的線程安全的map集合
*/
private static ConcurrentHashMap<String, PcmSession> sessions = new ConcurrentHashMap<>();
/**
* 線程安全的自增類,用于統計連接配接數
*/
private static final AtomicInteger connectionsCounter = new AtomicInteger(0);
/**
* 添加session
* @param account
* @param session
*/
@Override
public void addSession(String account, PcmSession session) {
if (null != session) {
sessions.put(account, session);
connectionsCounter.incrementAndGet();
// 被觀察者方法,拉模型
setChanged();
notifyObservers();
}
}
/**
* 擷取session
* @param account
* @return
*/
@Override
public PcmSession getSession(String account) {
return sessions.get(account);
}
/**
* 替換session方法,通過account
*/
@Override
public void replaceSession(String account, PcmSession session) {
sessions.put(account, session);
// 被觀察者方法,拉模型
setChanged();
notifyObservers();
}
/**
* 移除session通過account
* @param account
*/
@Override
public void removeSession(String account) {
sessions.remove(account);
connectionsCounter.decrementAndGet();
// 被觀察者方法,拉模型
setChanged();
notifyObservers();
}
/**
* 移除session通過session
* @param pcmSession
*/
@Override
public void removeSession(PcmSession pcmSession) {
String account = (String) pcmSession.getAttribute(Message.SESSION_KEY);
removeSession(account);
}
public static ConcurrentHashMap<String, PcmSession> getSessions() {
return sessions;
}
}
服務端handler細化
目的:友善對用戶端發送過來的資料處理,
實作:服務端接收到用戶端的消息後,根據消息中封裝的key,使用對應的key處理方式,實作一個統一的接口
定義處理接口:
/**
* Mina的請求處理接口,必須實作此接口
*/
public interface RequestHandler {
ReplyBody process(PcmSession session, SentBody sent);
}
這裡我們實作了三個具體的handler是BindHandler、PushMessageHandler、SessionClosedHandler分别代表是綁定、推送、關閉。
BindHandler接口實作:
/**
* 綁定處理handler
*/
public class BindHandler implements RequestHandler{
private final Logger logger = LogManager.getLogger(BindHandler.class);
/**
* 邏輯處理方法
* @param newSession 新的會話
* @param message 接收的資訊
* @return
*/
@Override
public ReplyBody process(PcmSession newSession, SentBody message) {
ReplyBody reply = new ReplyBody();
// 擷取會話管理類
SessionManager sessionManager = (DefaultSessionManagerImpl) SpringContextUtil.getBean("pcmSessionManager");
try {
String account = message.get(Message.SESSION_KEY);
newSession.setAccount(account);
newSession.setAttribute(Message.SESSION_KEY, account);
newSession.setAttribute(Message.TIME_OUT_NUM, 0); // 逾時次數設為0
newSession.setGid(UuidUtil.get32UUID());
// 設定部分所需資訊
newSession.setMessage(message.get("message"));
newSession.setScanNum(message.get("scanNum"));
newSession.setLongitude(message.get("longitude"));
newSession.setLatitude(message.get("latitude"));
// 設定綁定時間,第一次心跳時間
newSession.setBindTime(System.currentTimeMillis());
newSession.setHeartbeat(System.currentTimeMillis());
// 由于用戶端斷線服務端可能會無法獲知的情況,用戶端重連時,需要關閉舊的連接配接
PcmSession oldSession = sessionManager.getSession(account);
if (oldSession != null && !oldSession.equals(newSession)) {
// 移除account屬性
oldSession.removeAttribute(Message.SESSION_KEY);
// 替換oldSession
sessionManager.replaceSession(account, newSession);
// 發送t下線的消息
ReplyBody rb = new ReplyBody();
rb.setCode(Message.MessageType.TYPE_999);
rb.put(Message.SESSION_KEY, account);
// 判斷目前會話是否是屬于本地的會話
if (oldSession.isLocalHost()) {
oldSession.write(rb.toJson());
oldSession.close(true);
logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + account + "已在别處登陸,目前連接配接已被關閉 <<<<<<<<<<<<<<<<<" );
} else {
// 不是則需要發往目标伺服器處理
// 本服務為提供此功能,需要自行添加
}
}
if (oldSession == null) {
sessionManager.addSession(account, newSession);
}
reply.setCode(Message.ReturnCode.CODE_200);
} catch (Exception e) {
reply.setCode(Message.ReturnCode.CODE_500);
e.printStackTrace();
}
if (reply.getCode().equals(Message.ReturnCode.CODE_200)) {
logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + message.get(Message.SESSION_KEY) + "綁定成功 <<<<<<<<<<<<<<<<<<<<");
} else {
logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + message.get(Message.SESSION_KEY) + "綁定失敗 <<<<<<<<<<<<<<<<<<<<");
}
return reply;
}
}
PushMessageHandler實作:
/**
* 推送消息的handler
*/
public class PushMessageHandler implements RequestHandler{
private final Logger logger = LogManager.getLogger(PushMessageHandler.class);
@Override
public ReplyBody process(PcmSession session, SentBody sent) {
ReplyBody reply = new ReplyBody();
// 擷取綁定的賬戶
String account = sent.getData().get(Message.SESSION_KEY);
SessionManager sessionManager = (DefaultSessionManagerImpl)SpringContextUtil.getBean("pcmSessionManager");
// 擷取會話
PcmSession ios = sessionManager.getSession(account);
if (ios != null) {
sent.remove(Message.SESSION_KEY);
reply.setKey(sent.getKey());
reply.setMessage("推送的消息");
reply.setData(sent.getData());
reply.setCode(Message.ReturnCode.CODE_200);
ios.write(reply.toJson());
logger.info(">>>>>>>>>>>>>>>>>> 伺服器發送消息成功,接收使用者:" + session.getAccount() + " >>>>>>>>>>>>>>>>>>");
} else {
reply.setCode(Message.ReturnCode.CODE_500);
reply.setMessage("Mina push message fail");
}
return reply;
}
}
SessionClosedHandler實作:
/**
* 會話關閉處理
*/
public class SessionClosedHandler implements RequestHandler{
/**
* 邏輯處理方法
* @param session
* @param message
* @return
*/
@Override
public ReplyBody process(PcmSession session, SentBody message) {
ReplyBody rb = new ReplyBody();
// 擷取會話管理類
SessionManager sessionManager = (DefaultSessionManagerImpl) SpringContextUtil.getBean("pcmSessionManager");
if (session.getAttribute(Message.SESSION_KEY) == null) {
return null;
}
// 在管理類的map中移除
String account = session.getAttribute(Message.SESSION_KEY).toString();
sessionManager.removeSession(account);
return null;
}
}
Mina服務端消息handler:
/**
* 服務端handler
*/
public class ServiceHandler extends IoHandlerAdapter {
private final Logger logger = LogManager.getLogger(ServiceHandler.class);
// 存放本地處理的handler
private HashMap<String, RequestHandler> handlers = new HashMap<String, RequestHandler>();
/**
* 接收到消息時
* @param session
* @param message
* @throws Exception
*/
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
logger.info("<<<<<<<<<<<<<<<<<<<< 擷取一條資訊,來自sessionId:" + session.getId() + " <<<<<<<<<<<<<<<<<<<<");
// 轉為自定義協定,取出内容,轉為接受的對象
SentBody sentBody = new Gson().fromJson(((CustomPack)message).getContent(), SentBody.class);
ReplyBody rb = new ReplyBody();
PcmSession pcmSession = new PcmSession(session);
String key = sentBody.getKey();
// 根據key的不同調用不同的handler
RequestHandler handler = handlers.get(key);
// 如果沒有這個handler
if (handler == null) {
rb.setCode(Message.ReturnCode.CODE_405);
rb.setMessage("Service undefined this handler :" + key);
} else {
rb = handler.process(pcmSession, sentBody);
}
rb.setKey(key);
pcmSession.write(rb.toJson(), false);
}
/**
* 發送消息
* @param session
* @param message
* @throws Exception
*/
@Override
public void messageSent(IoSession session, Object message) throws Exception {
// session.closeOnFlush(); 需要短連接配接,則發送後關閉連接配接
logger.debug(">>>>>>>>>>>>>>>>>>>> 發送消息成功 >>>>>>>>>>>>>>>>>>>>");
}
/**
* 建立連接配接時
* @param session
* @throws Exception
*/
@Override
public void sessionCreated(IoSession session) throws Exception {
InetSocketAddress isa = (InetSocketAddress) session.getRemoteAddress();
// IP
String address = isa.getAddress().getHostAddress();
session.setAttribute("address", address);
logger.info(">>>>>>>>>>>>>>>>>> 來自" + address + " 的終端上線,sessionId:" + session.getId() + " <<<<<<<<<<<<<");
}
/**
* 打開連接配接時
* @param session
* @throws Exception
*/
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.debug("Open a connection ...");
}
/**
* 連接配接空閑時
* @param session
* @param status
* @throws Exception
*/
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
logger.debug("sessionIdle ... from " + session.getRemoteAddress());
}
/**
* 關閉連接配接時
* @param session
* @throws Exception
*/
@Override
public void sessionClosed(IoSession session) throws Exception {
PcmSession pcmSession = new PcmSession(session);
// 擷取連接配接關閉的handler,進行處理
try {
RequestHandler handler = handlers.get("clientClose");
if (handler != null && pcmSession.containsAttribute(Message.SESSION_KEY)) {
handler.process(pcmSession, null);
logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + session.getAttribute(Message.SESSION_KEY) + "已下線 <<<<<<<<<<<<<<<<<" );
}
} catch (Exception e) {
e.printStackTrace();
}
// pcmSession.close(true);
}
/**
* 捕獲到異常
* @param session
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
logger.error(">>>>>>>>>>>>>>>>>> 終端使用者:" + session.getAttribute(Message.SESSION_KEY) + "連接配接發生異常,即将關閉連接配接,原因:" + cause.getMessage() + " <<<<<<<<<<<<<<<<<");
}
public HashMap<String, RequestHandler> getHandlers() {
return handlers;
}
public void setHandlers(HashMap<String, RequestHandler> handlers) {
this.handlers = handlers;
logger.info(">>>>>>>>>>>>>>>>>> Mina服務端啟動成功 <<<<<<<<<<<<<<<<<");
}
}
心跳機制及處理
先簡單介紹下keepAlive的機制:
首先,需要搞清楚TCP keepalive是幹什麼用的。從名字了解就能夠知道,keepalive就是用來檢測一個tcp connection是否還連接配接正常。當一個tcpconnection建立好之後,如果雙方都不發送資料的話,tcp協定本身是不會發送其它的任何資料的,也就是說,在一個idle的connection上,兩個socket之間不産生任何的資料交換。從另一個方面講,當一個connection建立之後,連結雙方可以長時間的不發送任何資料,比如幾天,幾星期甚至幾個月,但該connection仍然存在。
是以,這就可能出現一個問題。舉例來說,server和client建立了一個connection,server負責接收client的request。當connection建立好之後,client由于某種原因機器停機了。但server端并不知道,是以server就會一直監聽着這個connection,但其實這個connection已經失效了。
keepalive就是為這樣的場景準備的。當把一個socket設定成了keepalive,那麼這個socket空閑一段時間後,它就會向對方發送資料來确認對方仍然存在。放在上面的例子中,如果client停機了,那麼server所發送的keepalive資料就不會有response,這樣server就能夠确認client完蛋了(至少從表面上看是這樣)。
MINA本身提供了一個過濾器類: org.apache.mina.filter.keepalive . KeepAliveFilter ,該過濾器用于在IO空閑的時候發送并且回報心跳包(keep-alive request/response)。
該類構造函數中參數有三個分别是:
(1)KeepAvlieMessageFactory: 該執行個體引用用于判斷接受與發送的包是否是心跳包,以及心跳請求包的實作
(2)IdleStatus: 該過濾器所關注的空閑狀态,預設認為讀取空閑。 即當讀取通道空閑的時候發送心跳包
(3)KeepAliveRequestTimeoutHandler: 心跳包請求後逾時無回報情況下的處理機制 預設為CLOSE 即關閉連接配接
參考:
http://www.cnblogs.com/pricks/p/3832882.html
https://blog.csdn.net/kkk0526/article/details/51732437
首先,實作KeepAvlieMessageFactory接口:
/**
* 心跳實作類
* 服務端發送的是hb_request,那麼用戶端就應該傳回hb_response
*/
public class KeepAliveFactoryImpl implements KeepAliveMessageFactory {
private final Logger logger = LogManager.getLogger(KeepAliveFactoryImpl.class);
// 服務端需要發送請求,用戶端無需發送
private boolean sendHbRequest;
public KeepAliveFactoryImpl(boolean isServer) {
this.sendHbRequest = isServer;
}
/**
* 服務端心跳發送請求指令
*/
private static final String HEART_BEAT_REQUEST = Message.CMD_HEARTBEAT_REQUEST;
/**
* 用戶端心跳響應指令
*/
private static final String HEART_BEAT_RESPONSE = Message.CMD_HEARTBEAT_RESPONSE;
// 在需要發送心跳時,用來擷取一個心跳請求包[發送端使用]
@Override
public Object getRequest(IoSession session) {
// 是否需要發送心跳請求
if (sendHbRequest) {
return new CustomPack(HEART_BEAT_REQUEST);
}
return null;
}
// 在需要回複心跳時,用來擷取一個心跳回複包[接收端使用]
@Override
public Object getResponse(IoSession session, Object request) {
return new CustomPack(CustomPack.RESPONSE, HEART_BEAT_RESPONSE);
}
// 用來判斷接收到的消息是不是一個心跳請求包,是就傳回true[接收端使用]
@Override
public boolean isRequest(IoSession session, Object message) {
if (message instanceof CustomPack) {
CustomPack pack = (CustomPack) message;
if (pack.getContent().equals(Message.CMD_HEARTBEAT_REQUEST)) {
// 将逾時次數置為0
session.setAttribute(Message.TIME_OUT_NUM, 0);
return true;
}
return false;
}
return false;
}
// 用來判斷接收到的消息是不是一個心跳回複包,是就傳回true[發送端使用]
@Override
public boolean isResponse(IoSession session, Object message) {
if (message instanceof CustomPack) {
CustomPack pack = (CustomPack) message;
return pack.getContent().equals(Message.CMD_HEARTBEAT_RESPONSE);
}
return false;
}
}
之後實作逾時處理類實作KeepAliveRequestTimeoutHandler接口,業務屬性為:逾時3次後,關閉連接配接:
/**
* 心跳逾時處理類
*/
public class KeepAliveRequestTimeoutHandlerImpl implements KeepAliveRequestTimeoutHandler {
private static final Logger logger = LogManager.getLogger(KeepAliveRequestTimeoutHandlerImpl.class);
/**
* 逾時的最大次數
*/
private int timeoutNum = 3;
public KeepAliveRequestTimeoutHandlerImpl() {}
public KeepAliveRequestTimeoutHandlerImpl(int timeoutNum) {
this.timeoutNum = timeoutNum;
}
@Override
public void keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session) throws Exception {
int isTimeoutNum = (int) session.getAttribute(Message.TIME_OUT_NUM);
// 沒有超過最大次數,逾時次數加1
if (isTimeoutNum <= timeoutNum) {
session.setAttribute(Message.TIME_OUT_NUM, isTimeoutNum + 1);
} else {
// 超過最大次數,關閉會話連接配接
SessionManager sessionManager = (SessionManager) SpringContextUtil.getBean("pcmSessionManager");
String account = (String) session.getAttribute(Message.SESSION_KEY);
sessionManager.removeSession(account);
logger.info("<<<<<<<<<<<<<<<<<<<< 終端使用者:" + account + " 心跳超過三次無應答,已被關閉 <<<<<<<<<<<<<<<<<<<<");
}
}
}
心跳機制實作及處理類完畢。
具體的如何配置,請檢視下一章節。待續