天天看點

Mina基礎(六):Mina整合Spring之前的準備工作——統一通信類、擴充Session及其管理、服務端handler細化、心跳機制及處理統一通信類擴充Session及其管理服務端handler細化 心跳機制及處理

此章節比較零散,主要為與Spring整合及業務處理做準備,沒有涉及到具體的配置,都是一些工具類的實作。

具體的整合Spring,使用的一下的這些類,下面的一章描述了如何使用這些類,看的比較暈的,可以多看看直接的配置,了解mina的運作流程。

完整的項目架構:

Mina基礎(六):Mina整合Spring之前的準備工作——統一通信類、擴充Session及其管理、服務端handler細化、心跳機制及處理統一通信類擴充Session及其管理服務端handler細化 心跳機制及處理

統一通信類

  • 規範消息類型

    目的:使用統一的封裝類型,服務端接收固定的消息對象,服務端發送固定的消息對象,規範用戶端、服務端的互動;

    實作:服務端接收SentBody對象,服務端發送ReplyBody對象

    (參考:spring內建mina 實作消息推送以及轉發)

Message消息常量:

/**
 * @author ZERO
 * @Description 消息常量
 */
public class Message {
    public static class ReturnCode {
 
        public static String CODE_404 = "404";
 
        public static String CODE_403 = "403";  //該賬号未綁定
 
        public static String CODE_405 = "405"; //事物未定義
 
        public static String CODE_200 = "200"; //成功
 
        public static String CODE_500 = "500"; //未知錯誤
 
    }
 
 
    public static final String SESSION_KEY = "account";
 
 
    /**
     * 服務端心跳請求指令
     */
    public static final String CMD_HEARTBEAT_REQUEST = "hb_request";
 
    /**
     * 用戶端心跳響應指令
     */
    public static final String CMD_HEARTBEAT_RESPONSE = "hb_response";
 
    /**
     * 逾時次數
     */
    public static final String TIME_OUT_NUM = "timeOutNum";
 
    public static class MessageType {
        // 使用者 踢出下線消息類型
        public static String TYPE_999 = "999";
    }
 
}           

SentBody服務端接收消息對象:

/**
 * @author ZERO
 * @Description 服務端接收消息對象
 */
public class SentBody implements Serializable {
 
    private static final long serialVersionUID = 1L;
 
    private String key;
 
    private HashMap<String, String> data;
 
    private long timestamp;
 
    public SentBody() {
        data = new HashMap<String, String>();
        timestamp = System.currentTimeMillis();
    }
 
    public String getKey() {
        return key;
    }
 
    public String get(String k) {
        return data.get(k);
    }
 
    public void put(String k, String v) {
        data.put(k, v);
    }
 
    public long getTimestamp() {
        return timestamp;
    }
 
    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }
 
    public void setKey(String key) {
        this.key = key;
    }
 
    public void remove(String k) {
        data.remove(k);
    }
 
    public HashMap<String, String> getData() {
        return data;
    }
 
    @Override
    public String toString() {
        StringBuffer buffer = new StringBuffer();
        buffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
        buffer.append("<sent>");
        buffer.append("<key>").append(key).append("</key>");
        buffer.append("<timestamp>").append(timestamp).append("</timestamp>");
        buffer.append("<data>");
        for (String key : data.keySet()) {
            buffer.append("<" + key + ">").append(data.get(key)).append(
                    "</" + key + ">");
        }
        buffer.append("</data>");
        buffer.append("</sent>");
        return buffer.toString();
    }
 
    public String toXmlString() {
        return toString();
    }
}           

ReplyBody服務端發送消息對象:

/**
 * @author ZERO
 * @Description 服務端發送消息對象
 */
public class ReplyBody implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 請求key
     */
    private String key;

    /**
     * 傳回碼
     */
    private String code;

    /**
     * 傳回說明
     */
    private String message;

    /**
     * 傳回資料集合
     */
    private HashMap<String, String> data;


    private long timestamp;

    public ReplyBody()
    {
        data = new HashMap<String, String>();
        timestamp = System.currentTimeMillis();
    }
    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }



    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public void put(String k, String v) {
        data.put(k, v);
    }

    public String get(String k) {
        return data.get(k);
    }

    public void remove(String k) {
        data.remove(k);
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public HashMap<String, String> getData() {
        return data;
    }

    public void setData(HashMap<String, String> data) {
        this.data = data;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }


    public String toString()
    {

        StringBuilder buffer = new StringBuilder();
        buffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
        buffer.append("<reply>");
        buffer.append("<key>").append(this.getKey()).append("</key>");
        buffer.append("<timestamp>").append(timestamp).append("</timestamp>");
        buffer.append("<code>").append(code).append("</code>");
        buffer.append("<message>").append(message).append("</message>");
        buffer.append("<data>");
        for(String key:this.getData().keySet())
        {
            buffer.append("<"+key+">").append(this.get(key)).append("</"+key+">");
        }
        buffer.append("</data>");
        buffer.append("</reply>");
        return buffer.toString();
    }


    public String toXmlString() {
        return toString();
    }

    public String toJson() {
        return new Gson().toJson(this, ReplyBody.class);
    }
}
           

擴充Session及其管理

目的:友善對session會話進行管理,友善對session會話集合擷取和删除

實作:服務端接收到新的Session後,構造一個封裝類,實作session 的部分方法,并額外實作方法

Session 封裝類:

/**
 * IoSession包裝類
 */
public class PcmSession implements Serializable {
 
    // 不參與序列化
    private transient IoSession session;
 
    // 全局ID
    private String gid;
    // session在本機器的ID
    private Long nid;
    // session綁定的服務ip
    private String host;
    // session綁定的賬号
    private String account;
    // session綁定的賬戶消息
    private String message;
    // 掃描數量
    private String scanNum;
    // 經度
    private String longitude;
    // 緯度
    private String latitude;
    // session綁定時間
    private Long bindTime;
    // 心跳時間
    private Long heartbeat;
 
    public PcmSession(){}
 
    public PcmSession(IoSession session) {
        this.session = session;
        this.host = (String) session.getAttribute("address");
        this.nid = session.getId();
    }
 
    /**
     * 将key-value自定義屬性,存儲到IO會話中
     */
    public void setAttribute(String key, Object value) {
        if (null != session) {
            session.setAttribute(key, value);
        }
    }
 
    /**
     * 從IO的會話中,擷取key的value
     */
    public Object getAttribute(String key) {
        if (null != session) {
            return session.getAttribute(key);
        }
        return null;
    }
 
    /**
     *  在IO的會話中,判斷是否存在包含key-value
     */
    public boolean containsAttribute(String key) {
        if (null != session) {
            return session.containsAttribute(key);
        }
        return false;
    }
 
    /**
     * 從IO的會話中,删除key
     */
    public void removeAttribute(String key) {
        if (null != session) {
            session.removeAttribute(key);
        }
    }
 
    /**
     *  擷取IP位址
     */
    public SocketAddress getRemoteAddress() {
        if (null != session) {
            return session.getRemoteAddress();
        }
        return null;
    }
 
    /**
     * 将消息對象 message發送到目前連接配接的對等體(異步)
     * 當消息被真正發送到對等體的時候,IoHandler.messageSent(IoSession,Object)會被調用。
     * @param msg 發送的消息
     */
    public void write(Object msg) {
        if (null != session) {
            CustomPack pack = new CustomPack((String) msg);
            session.write(pack).isWritten();
        }
    }
 
    /**
     * 發送消息重載,是否為請求
     * @param msg 發送的消息
     * @param isRequest 是否為請求
     */
    public void write(Object msg, boolean isRequest) {
        if (null != session) {
            byte flag = isRequest ? CustomPack.REQUEST : CustomPack.RESPONSE;
            CustomPack pack = new CustomPack(flag, (String) msg);
            session.write(pack).isWritten();
        }
    }
 
    /**
     * 會話是否已經連接配接
     */
    public boolean isConnected() {
        if (null != session) {
            return session.isConnected();
        }
        return false;
    }
 
    /**
     *  會話是否為本地連接配接
     */
    public boolean isLocalHost() {
        try {
            String ip = InetAddress.getLocalHost().getHostAddress();
            return ip.endsWith(host);
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return false;
    }
 
    /**
     * 關閉目前連接配接。如果參數 immediately為 true的話
     * 連接配接會等到隊列中所有的資料發送請求都完成之後才關閉;否則的話就立即關閉。
     */
    public void close(boolean immediately) {
        if (null != session) {
            if (immediately) {
                session.closeNow();
            } else {
                session.closeOnFlush();
            }
        }
    }
 
    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (null == obj) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        // 強轉為目前類
        PcmSession session = (PcmSession) obj;
        if (session.nid != null && nid != null) {
            return session.nid.longValue() == nid.longValue() && session.host.equals(host);
        }
        return false;
    }
 
    @Override
    public String toString() {
        StringBuilder buffer = new StringBuilder();
        buffer.append("{");
        buffer.append("\"").append("gid").append("\":").append("\"").append(gid).append("\"").append(",");
        buffer.append("\"").append("nid").append("\":").append(nid).append(",");
        buffer.append("\"").append("host").append("\":").append("\"").append(host).append("\"").append(",");
        buffer.append("\"").append("account").append("\":").append("\"").append(account).append("\"").append(",");
        buffer.append("\"").append("message").append("\":").append(message).append(",");
        buffer.append("\"").append("longitude").append("\":").append(longitude).append(",");
        buffer.append("\"").append("latitude").append("\":").append(latitude).append(",");
        buffer.append("\"").append("bindTime").append("\":").append(bindTime).append(",");
        buffer.append("\"").append("heartbeat").append("\":").append(heartbeat);
        buffer.append("}");
        return buffer.toString();
    }
 
    public IoSession getSession() {
        return session;
    }
 
    public void setSession(IoSession session) {
        this.session = session;
    }
 
    public String getGid() {
        return gid;
    }
 
    public void setGid(String gid) {
        this.gid = gid;
    }
 
    public Long getNid() {
        return nid;
    }
 
    public void setNid(Long nid) {
        this.nid = nid;
    }
 
    public String getHost() {
        return host;
    }
 
    public void setHost(String host) {
        this.host = host;
    }
 
    public String getAccount() {
        return account;
    }
 
    public void setAccount(String account) {
        this.account = account;
    }
 
    public String getMessage() {
        return message;
    }
 
    public void setMessage(String message) {
        this.message = message;
    }
 
    public String getScanNum() {
        return scanNum;
    }
 
    public void setScanNum(String scanNum) {
        this.scanNum = scanNum;
    }
 
    public String getLongitude() {
        return longitude;
    }
 
    public void setLongitude(String longitude) {
        this.longitude = longitude;
    }
 
    public String getLatitude() {
        return latitude;
    }
 
    public void setLatitude(String latitude) {
        this.latitude = latitude;
    }
 
    public Long getBindTime() {
        return bindTime;
    }
 
    public void setBindTime(Long bindTime) {
        this.bindTime = bindTime;
    }
 
    public Long getHeartbeat() {
        return heartbeat;
    }
 
    public void setHeartbeat(Long heartbeat) {
        this.heartbeat = heartbeat;
    }
 
}           

Session管理接口:

/**
 * Session 管理接口
 */
public interface SessionManager {
    /**
     * 添加session
     */
    void addSession(String account, PcmSession session);
 
    /**
     * 擷取session
     */
    PcmSession getSession(String account);
 
    /**
     * 替換Session
     */
    void replaceSession(String account, PcmSession session);
 
    /**
     * 删除session
     */
    void removeSession(String account);
 
    /**
     * 删除session
     */
    void removeSession(PcmSession pcmSession);
 
}           

    SessionManager接口實作類DefaultSessionManagerImpl

    實作:内部建立一個線程安全的ConcurrentHashMap,存放封裝的session對象

/**
 * 預設session管理接口實作類
 */
public class DefaultSessionManagerImpl extends Observable implements SessionManager {
 
    /**
     * 存放session的線程安全的map集合
     */
    private static ConcurrentHashMap<String, PcmSession> sessions = new ConcurrentHashMap<>();
 
    /**
     * 線程安全的自增類,用于統計連接配接數
     */
    private static final AtomicInteger connectionsCounter = new AtomicInteger(0);
 
 
    /**
     * 添加session
     * @param account
     * @param session
     */
    @Override
    public void addSession(String account, PcmSession session) {
        if (null != session) {
            sessions.put(account, session);
            connectionsCounter.incrementAndGet();
            // 被觀察者方法,拉模型
            setChanged();
            notifyObservers();
        }
    }
 
    /**
     * 擷取session
     * @param account
     * @return
     */
    @Override
    public PcmSession getSession(String account) {
        return sessions.get(account);
    }
 
    /**
     * 替換session方法,通過account
     */
    @Override
    public void replaceSession(String account, PcmSession session) {
        sessions.put(account, session);
        // 被觀察者方法,拉模型
        setChanged();
        notifyObservers();
    }
 
 
    /**
     * 移除session通過account
     * @param account
     */
    @Override
    public void removeSession(String account) {
        sessions.remove(account);
        connectionsCounter.decrementAndGet();
        // 被觀察者方法,拉模型
        setChanged();
        notifyObservers();
    }
 
    /**
     * 移除session通過session
     * @param pcmSession
     */
    @Override
    public void removeSession(PcmSession pcmSession) {
        String account = (String) pcmSession.getAttribute(Message.SESSION_KEY);
        removeSession(account);
    }
 
    public static ConcurrentHashMap<String, PcmSession> getSessions() {
        return sessions;
    }
 
 
}           

服務端handler細化

    目的:友善對用戶端發送過來的資料處理,

    實作:服務端接收到用戶端的消息後,根據消息中封裝的key,使用對應的key處理方式,實作一個統一的接口

定義處理接口:

/**
 * Mina的請求處理接口,必須實作此接口
 */
public interface RequestHandler {
    ReplyBody process(PcmSession session, SentBody sent);
}           

這裡我們實作了三個具體的handler是BindHandler、PushMessageHandler、SessionClosedHandler分别代表是綁定、推送、關閉。

BindHandler接口實作:

/**
 * 綁定處理handler
 */
public class BindHandler implements RequestHandler{
 
    private final Logger logger = LogManager.getLogger(BindHandler.class);
 
    /**
     * 邏輯處理方法
     * @param newSession 新的會話
     * @param message 接收的資訊
     * @return
     */
    @Override
    public ReplyBody process(PcmSession newSession, SentBody message) {
        ReplyBody reply = new ReplyBody();
        // 擷取會話管理類
        SessionManager sessionManager = (DefaultSessionManagerImpl) SpringContextUtil.getBean("pcmSessionManager");
        try {
            String account = message.get(Message.SESSION_KEY);
            newSession.setAccount(account);
            newSession.setAttribute(Message.SESSION_KEY, account);
            newSession.setAttribute(Message.TIME_OUT_NUM, 0); // 逾時次數設為0
            newSession.setGid(UuidUtil.get32UUID());
            // 設定部分所需資訊
            newSession.setMessage(message.get("message"));
            newSession.setScanNum(message.get("scanNum"));
            newSession.setLongitude(message.get("longitude"));
            newSession.setLatitude(message.get("latitude"));
            // 設定綁定時間,第一次心跳時間
            newSession.setBindTime(System.currentTimeMillis());
            newSession.setHeartbeat(System.currentTimeMillis());
            // 由于用戶端斷線服務端可能會無法獲知的情況,用戶端重連時,需要關閉舊的連接配接
            PcmSession oldSession = sessionManager.getSession(account);
            if (oldSession != null && !oldSession.equals(newSession)) {
                // 移除account屬性
                oldSession.removeAttribute(Message.SESSION_KEY);
                // 替換oldSession
                sessionManager.replaceSession(account, newSession);
                // 發送t下線的消息
                ReplyBody rb = new ReplyBody();
                rb.setCode(Message.MessageType.TYPE_999);
                rb.put(Message.SESSION_KEY, account);
                // 判斷目前會話是否是屬于本地的會話
                if (oldSession.isLocalHost()) {
                    oldSession.write(rb.toJson());
                    oldSession.close(true);
                    logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + account + "已在别處登陸,目前連接配接已被關閉 <<<<<<<<<<<<<<<<<" );
                } else {
                    // 不是則需要發往目标伺服器處理
                    // 本服務為提供此功能,需要自行添加
                }
            }
            if (oldSession == null) {
                sessionManager.addSession(account, newSession);
            }
            reply.setCode(Message.ReturnCode.CODE_200);
        } catch (Exception e) {
            reply.setCode(Message.ReturnCode.CODE_500);
            e.printStackTrace();
        }
        if (reply.getCode().equals(Message.ReturnCode.CODE_200)) {
            logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + message.get(Message.SESSION_KEY) + "綁定成功 <<<<<<<<<<<<<<<<<<<<");
        } else {
            logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + message.get(Message.SESSION_KEY) + "綁定失敗 <<<<<<<<<<<<<<<<<<<<");
        }
        return reply;
    }
}           

PushMessageHandler實作:

/**
 * 推送消息的handler
 */
public class PushMessageHandler implements RequestHandler{
 
    private final Logger logger = LogManager.getLogger(PushMessageHandler.class);
 
    @Override
    public ReplyBody process(PcmSession session, SentBody sent) {
        ReplyBody reply = new ReplyBody();
        // 擷取綁定的賬戶
        String account = sent.getData().get(Message.SESSION_KEY);
        SessionManager sessionManager = (DefaultSessionManagerImpl)SpringContextUtil.getBean("pcmSessionManager");
        // 擷取會話
        PcmSession ios = sessionManager.getSession(account);
        if (ios != null) {
            sent.remove(Message.SESSION_KEY);
            reply.setKey(sent.getKey());
            reply.setMessage("推送的消息");
            reply.setData(sent.getData());
            reply.setCode(Message.ReturnCode.CODE_200);
            ios.write(reply.toJson());
            logger.info(">>>>>>>>>>>>>>>>>> 伺服器發送消息成功,接收使用者:" + session.getAccount() + " >>>>>>>>>>>>>>>>>>");
        } else {
            reply.setCode(Message.ReturnCode.CODE_500);
            reply.setMessage("Mina push message fail");
        }
        return reply;
    }
}           

SessionClosedHandler實作:

/**
 * 會話關閉處理
 */
public class SessionClosedHandler implements RequestHandler{
 
    /**
     * 邏輯處理方法
     * @param session
     * @param message
     * @return
     */
    @Override
    public ReplyBody process(PcmSession session, SentBody message) {
        ReplyBody rb = new ReplyBody();
        // 擷取會話管理類
        SessionManager sessionManager = (DefaultSessionManagerImpl) SpringContextUtil.getBean("pcmSessionManager");
        if (session.getAttribute(Message.SESSION_KEY) == null) {
            return null;
        }
        // 在管理類的map中移除
        String account = session.getAttribute(Message.SESSION_KEY).toString();
        sessionManager.removeSession(account);
        return null;
    }
}           

Mina服務端消息handler:

/**
 * 服務端handler
 */
public class ServiceHandler extends IoHandlerAdapter {
 
    private final Logger logger = LogManager.getLogger(ServiceHandler.class);
    // 存放本地處理的handler
    private HashMap<String, RequestHandler> handlers = new HashMap<String, RequestHandler>();
 
    /**
     * 接收到消息時
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        logger.info("<<<<<<<<<<<<<<<<<<<< 擷取一條資訊,來自sessionId:" + session.getId() + " <<<<<<<<<<<<<<<<<<<<");
        // 轉為自定義協定,取出内容,轉為接受的對象
        SentBody sentBody = new Gson().fromJson(((CustomPack)message).getContent(), SentBody.class);
        ReplyBody rb = new ReplyBody();
        PcmSession pcmSession = new PcmSession(session);
        String key = sentBody.getKey();
        // 根據key的不同調用不同的handler
        RequestHandler handler = handlers.get(key);
        // 如果沒有這個handler
        if (handler == null) {
            rb.setCode(Message.ReturnCode.CODE_405);
            rb.setMessage("Service undefined this handler :" + key);
        } else {
            rb = handler.process(pcmSession, sentBody);
        }
        rb.setKey(key);
        pcmSession.write(rb.toJson(), false);
    }
 
    /**
     * 發送消息
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        // session.closeOnFlush(); 需要短連接配接,則發送後關閉連接配接
        logger.debug(">>>>>>>>>>>>>>>>>>>> 發送消息成功 >>>>>>>>>>>>>>>>>>>>");
    }
 
    /**
     * 建立連接配接時
     * @param session
     * @throws Exception
     */
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        InetSocketAddress isa = (InetSocketAddress) session.getRemoteAddress();
        // IP
        String address = isa.getAddress().getHostAddress();
        session.setAttribute("address", address);
        logger.info(">>>>>>>>>>>>>>>>>> 來自" + address + " 的終端上線,sessionId:" + session.getId() + "  <<<<<<<<<<<<<");
    }
 
    /**
     * 打開連接配接時
     * @param session
     * @throws Exception
     */
    @Override
    public void sessionOpened(IoSession session) throws Exception {
        logger.debug("Open a connection ...");
    }
 
    /**
     * 連接配接空閑時
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        logger.debug("sessionIdle ... from " + session.getRemoteAddress());
    }
 
    /**
     * 關閉連接配接時
     * @param session
     * @throws Exception
     */
    @Override
    public void sessionClosed(IoSession session) throws Exception {
        PcmSession pcmSession = new PcmSession(session);
        // 擷取連接配接關閉的handler,進行處理
        try {
            RequestHandler handler = handlers.get("clientClose");
            if (handler != null && pcmSession.containsAttribute(Message.SESSION_KEY)) {
                handler.process(pcmSession, null);
                logger.info(">>>>>>>>>>>>>>>>>> 終端使用者:" + session.getAttribute(Message.SESSION_KEY) + "已下線 <<<<<<<<<<<<<<<<<" );
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
//        pcmSession.close(true);
    }
 
    /**
     * 捕獲到異常
     * @param session
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        logger.error(">>>>>>>>>>>>>>>>>> 終端使用者:" + session.getAttribute(Message.SESSION_KEY) + "連接配接發生異常,即将關閉連接配接,原因:" + cause.getMessage() + " <<<<<<<<<<<<<<<<<");
    }
 
    public HashMap<String, RequestHandler> getHandlers() {
        return handlers;
    }
 
    public void setHandlers(HashMap<String, RequestHandler> handlers) {
        this.handlers = handlers;
        logger.info(">>>>>>>>>>>>>>>>>> Mina服務端啟動成功 <<<<<<<<<<<<<<<<<");
    }
}           

心跳機制及處理

先簡單介紹下keepAlive的機制:

    首先,需要搞清楚TCP keepalive是幹什麼用的。從名字了解就能夠知道,keepalive就是用來檢測一個tcp connection是否還連接配接正常。當一個tcpconnection建立好之後,如果雙方都不發送資料的話,tcp協定本身是不會發送其它的任何資料的,也就是說,在一個idle的connection上,兩個socket之間不産生任何的資料交換。從另一個方面講,當一個connection建立之後,連結雙方可以長時間的不發送任何資料,比如幾天,幾星期甚至幾個月,但該connection仍然存在。

    是以,這就可能出現一個問題。舉例來說,server和client建立了一個connection,server負責接收client的request。當connection建立好之後,client由于某種原因機器停機了。但server端并不知道,是以server就會一直監聽着這個connection,但其實這個connection已經失效了。

    keepalive就是為這樣的場景準備的。當把一個socket設定成了keepalive,那麼這個socket空閑一段時間後,它就會向對方發送資料來确認對方仍然存在。放在上面的例子中,如果client停機了,那麼server所發送的keepalive資料就不會有response,這樣server就能夠确認client完蛋了(至少從表面上看是這樣)。

MINA本身提供了一個過濾器類: org.apache.mina.filter.keepalive . KeepAliveFilter ,該過濾器用于在IO空閑的時候發送并且回報心跳包(keep-alive request/response)。 

該類構造函數中參數有三個分别是: 

(1)KeepAvlieMessageFactory:   該執行個體引用用于判斷接受與發送的包是否是心跳包,以及心跳請求包的實作 

(2)IdleStatus:   該過濾器所關注的空閑狀态,預設認為讀取空閑。 即當讀取通道空閑的時候發送心跳包 

(3)KeepAliveRequestTimeoutHandler: 心跳包請求後逾時無回報情況下的處理機制  預設為CLOSE  即關閉連接配接 

參考:

http://www.cnblogs.com/pricks/p/3832882.html

https://blog.csdn.net/kkk0526/article/details/51732437

首先,實作KeepAvlieMessageFactory接口:

/**
 * 心跳實作類
 * 服務端發送的是hb_request,那麼用戶端就應該傳回hb_response
 */
public class KeepAliveFactoryImpl implements KeepAliveMessageFactory {
 
    private final Logger logger = LogManager.getLogger(KeepAliveFactoryImpl.class);
 
    // 服務端需要發送請求,用戶端無需發送
    private boolean sendHbRequest;
 
    public KeepAliveFactoryImpl(boolean isServer) {
        this.sendHbRequest = isServer;
    }
 
    /**
     * 服務端心跳發送請求指令
     */
    private static final String HEART_BEAT_REQUEST = Message.CMD_HEARTBEAT_REQUEST;
 
    /**
     * 用戶端心跳響應指令
     */
    private static final String HEART_BEAT_RESPONSE = Message.CMD_HEARTBEAT_RESPONSE;
 
    // 在需要發送心跳時,用來擷取一個心跳請求包[發送端使用]
    @Override
    public Object getRequest(IoSession session) {
        // 是否需要發送心跳請求
        if (sendHbRequest) {
            return new CustomPack(HEART_BEAT_REQUEST);
        }
        return null;
    }
 
    // 在需要回複心跳時,用來擷取一個心跳回複包[接收端使用]
    @Override
    public Object getResponse(IoSession session, Object request) {
        return new CustomPack(CustomPack.RESPONSE, HEART_BEAT_RESPONSE);
    }
 
    // 用來判斷接收到的消息是不是一個心跳請求包,是就傳回true[接收端使用]
    @Override
    public boolean isRequest(IoSession session, Object message) {
        if (message instanceof CustomPack) {
            CustomPack pack = (CustomPack) message;
            if (pack.getContent().equals(Message.CMD_HEARTBEAT_REQUEST)) {
                // 将逾時次數置為0
                session.setAttribute(Message.TIME_OUT_NUM, 0);
                return true;
            }
            return false;
        }
        return false;
    }
 
    // 用來判斷接收到的消息是不是一個心跳回複包,是就傳回true[發送端使用]
    @Override
    public boolean isResponse(IoSession session, Object message) {
        if (message instanceof CustomPack) {
            CustomPack pack = (CustomPack) message;
            return pack.getContent().equals(Message.CMD_HEARTBEAT_RESPONSE);
        }
        return false;
    }
 
}           

之後實作逾時處理類實作KeepAliveRequestTimeoutHandler接口,業務屬性為:逾時3次後,關閉連接配接:

/**
 * 心跳逾時處理類
 */
public class KeepAliveRequestTimeoutHandlerImpl implements KeepAliveRequestTimeoutHandler {
 
    private static final Logger logger = LogManager.getLogger(KeepAliveRequestTimeoutHandlerImpl.class);
 
    /**
     * 逾時的最大次數
     */
    private int timeoutNum = 3;
 
    public KeepAliveRequestTimeoutHandlerImpl() {}
 
    public KeepAliveRequestTimeoutHandlerImpl(int timeoutNum) {
        this.timeoutNum = timeoutNum;
    }
 
    @Override
    public void keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session) throws Exception {
        int isTimeoutNum = (int) session.getAttribute(Message.TIME_OUT_NUM);
        // 沒有超過最大次數,逾時次數加1
        if (isTimeoutNum <= timeoutNum) {
            session.setAttribute(Message.TIME_OUT_NUM, isTimeoutNum + 1);
        } else {
            // 超過最大次數,關閉會話連接配接
            SessionManager sessionManager = (SessionManager) SpringContextUtil.getBean("pcmSessionManager");
            String account = (String) session.getAttribute(Message.SESSION_KEY);
            sessionManager.removeSession(account);
            logger.info("<<<<<<<<<<<<<<<<<<<< 終端使用者:" + account + " 心跳超過三次無應答,已被關閉 <<<<<<<<<<<<<<<<<<<<");
        }
    }
}           

心跳機制實作及處理類完畢。

具體的如何配置,請檢視下一章節。待續

繼續閱讀