天天看点

基于mina的短连接组件以及与spring、spring boot的集成

尽管各类标准的系统间交互组件已经非常流行,但采用自定义报文、基于原生socket进行系统间数据交互的模式依然存在。

原生socket在做一般的测试性开发时,确实能简便的达成目标。但在做企业应用时,若是在报文交互的需求上,再增加关于传输状态、处理策略等实际需求时,则显得吃力。

mina作为高性能开源网格框架,其功能相当丰富。尽管本文的短连接需求从实现上来看,使用mina作为核心,显得大材小用,但是,从实际应用上,能很好的满足业务需要及便捷的使用(本文很大意义上是马上造了一个简单的轮子)。

为了不陷入细节,后文将首先介绍可使用本文组件的场景、以及如何使用,然后再对组件的内部实现进行介绍,最后提供一个组件的下载地址。

一、本文假定的需求场景

1、两个系统间使用自定义报文进行短连接同步交互。即:一方为服务方,一直处于监听状态,另外一方作为客户端连接后,进行数据发送,当收到回应或者设定的超时时间到达后,断开与服务方的连接;

*2、假设自定义报文的格式为:8字节长度+对象的json字符串(UTF-8编码),其中“8字节长度”的值不包含这8个字节,其值为包含“对象的json字符串”经过UTF-8编码之后得到的长度值,8字节长度的编码为long的高节节在前。

    *:关于第2点数据报文的定义在后文的介绍中,主要是影响工具类MessageUtils以及消息解码方法的实现。若采用其他的报文格式,则需要调整MessageUtils中相关方法的实现,另外就是关于长度,需要调整MessageDecoder的doDecode解码方法的实现

二、使用本文的组件准备相关

本文基于的mina版本是2.0.17,jdk为1.7。

在使用本文提供的组件时,对maven项目来说,需要做如下依赖:

<dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-core</artifactId>
    <version>2.0.17</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.46</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>
           

对非maven项目,则需要在你设定的lib里包含以上的jar包。

三、如何使用

本文组件的目标是在mina的基础上,将server、client的初始化实现的尽量简单且贴近实际,因此,一般情况下,你仅需构造一个MessageServer、若干个MessageShortClient对象(可在短连接的业务实现处即时构造),然后对server、client对象各自实现一个业务层的MessageProcessor对象并进行关联,对业务层的MessageProcessor,可选择性的实现其方法。

以下服务端测试代码、客务端测试代码是对以上使用方法的直观解释。

1、服务端测试代码:

package com.bn.zbase;

import com.alibaba.fastjson.JSON;
import com.bn.zbase.message.*;
import com.bn.zbase.message.biz.MessageProcessor;
import com.bn.zbase.message.biz.DefaultServerMessageProcessor;
import com.bn.zbase.message.util.MessageUtils;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by zcn on 2018/4/20.
 */
public class BizServerTests {
    private static final Logger log = LoggerFactory.getLogger(BizServerTests.class);

    public static void main(String[] args) {
        MessageProcessor messageProcessor = new DefaultServerMessageProcessor() {
            @Override
            public void processReceivedMessage(IoSession session, String strJsonMessage) {
                log.info("接收到客户端消息:" + strJsonMessage);
                Object objRet = JSON.parse(strJsonMessage);
                session.write(MessageUtils.encodeMessageObj2IoBuffer(objRet));
            }
        };

        MessageServer messageServer = new MessageServer("localhost", 8989);
        messageServer.setMessageProcessor(messageProcessor);
        messageServer.start();

        try {
            Thread.sleep(30 * 10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        messageServer.shutdown();
    }
}
           

2、客户端代码:

package com.bn.zbase;

import com.alibaba.fastjson.JSON;
import com.bn.zbase.message.biz.MessageProcessor;
import com.bn.zbase.message.biz.DefaultClientMessageProcessor;
import com.bn.zbase.message.MessageShortClient;
import com.bn.zbase.message.util.MessageSendResult;
import org.apache.mina.core.session.IoSession;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;

/**
 * Created by zcn on 2018/4/23.
 */
public class BizClientTests {
    private static final Logger log = LoggerFactory.getLogger(BizClientTests.class);

    @Test
    public void testSendObj() throws IOException {
        DiagramEntity objMessage = new DiagramEntity();
        objMessage.setName("张三三");
        objMessage.setCode("X001");

        MessageProcessor clientProcessor = new DefaultClientMessageProcessor() {
            @Override
            public void processReceivedMessage(IoSession session, String strJsonMessage) {
                DiagramEntity objRet = JSON.parseObject(strJsonMessage, DiagramEntity.class);
                log.info("收到服务器反馈:" + JSON.toJSONString(objRet));
            }

            @Override
            public void complete(MessageSendResult sendResult, Object objOriginalMessage) {
                log.info("操作完成状态:" + sendResult.getDesc());
            }
        };

        for (int i = 0; i < 1000; i++) {
            MessageShortClient messageShortClient = new MessageShortClient("localhost", 8989, 10);
            messageShortClient.setMessageProcessor(clientProcessor);
            messageShortClient.sendMessage(objMessage);

            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class DiagramEntity {
    private String name;
    private String code;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }
}

           

四、实现细节

1、结构

基于mina的短连接组件以及与spring、spring boot的集成

上图为本组件的实现类图:

1.1 在mina派生上,按照其约定,扩展了IoHandler,将其分为ClientMessageHandler、ServerMessageHandler,当然,其实也没做什么特别的工作,主要是在事件处理方法内,添加了业务可能使用的几个探点(探点的实现是MessageProcessor);

1.2 还是mina派生,对编解码器MessageCodecFactory以及具体的编、解码单元(MessageEncoder、MessageDecoder)进行了扩展,这部分也是标准处理(为适应自定义的报文格式),参考了网友的文章,在组件内部的代码内也做了说明;

1.3 报文数据转换工具类MessageUtils。本类提供了适用于假定报文格式的常用数据转换方法,其使用贯穿于整个组件,你可以随时使用其进行数据转换;

1.4 业务层报文数据处理接口MessageProcessor以及其实现类。mina的IoHandler已经对常用的会话、消息事件等做了抽象,然而我们在这里仍做了一个自己的实现:一方面是由于作者想提供一个相对简单明了的业务层处理派生体系;另外一方面,当面对连接参数错误等情况时,一时没有找到IoHandler如何实现判断的方法(当然,之后会更深入的查看mina)。

    MessageProcessor接口的实现是你在实际业务应用时,应该必需要做的,否则按照默认的体系,你除了很方便的创建出server、client对象,将干不了任何业务相关的事儿……

1.5 枚举MessageSendResult。定义了几个常用的发送结果状态,当你在实现MessageProcessor.complete方法进行发送完成后的处理时,可以获取到某种状态,这些状态当前有以下:

OK("会话正常完成", 0),
CONNECT_ERROR("连接错误", 1),
CONNECT_PARAERROR("连接参数错误", 2),
CONNECT_NORESPONSE("未收到服务器应答", 3);
           

1.6 最后,也是必需要使用到的两个类:MessageServer、MessageShortClient。

对MessageServer,如“三、如何使用”节中的“服务端测试代码”内示例,你通常仅需要使用带参的构造函数(MessageServer(hostname, port))来构造一个server对象,然后设置好其业务消息处理对象MessageProcessor,最后start就完成了(程序结束的时候你应该再使用shutdown关掉服务器)。当然在,spring中集成时,并不需要shutdown。

对MessageClient,当你使用带参构造函数MessageClient(destHostname,destPort)时,默认的空闲超时时间是10秒,即当你发送完消息(sendMessage)后,最多10秒钟,短连接结束,关于连接完成的发送结果状态,你应该使用MessageProcessor的complete完成处理。

五、与spring集成

尽管我们当前使用的spring版本已经基本都是支持注解构造bean了,这里我仍然使用xml配置的方式对组件在spring中的集成做说明。其实是非常简单的,仅需要集成MessageServer:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
  <bean id="defaultMinaServerMessageProcessor" class="com.bn.zbase.message.biz.DefaultServerMessageProcessor"></bean>
  
  <bean id="minaMessageServer" class="com.bn.zbase.message.MessageServer" init-method="start">
    <property name="hostname" value="127.0.0.1"></property>
    <property name="port" value="8989"></property>
    <property name="messageProcessor" ref="defaultMinaServerMessageProcessor"></property>
  </bean>
</beans>
           

六、与spring boot集成

spring boot的方式当然是通吃了,这里使用注解配置实现:

package com.bn.zbase.config;

import com.alibaba.fastjson.JSON;
import com.bn.zbase.message.MessageServer;
import com.bn.zbase.message.biz.MessageProcessor;
import com.bn.zbase.message.biz.DefaultServerMessageProcessor;
import com.bn.zbase.message.terminal.AbstractMessageTerminal;
import com.bn.zbase.message.util.MessageUtils;
import org.apache.mina.core.session.IoSession;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zcn on 2018/4/24.
 */
@Configuration
public class MinaMessageConfiguration {

    @Bean(name="messageServer", initMethod = "start")
    public AbstractMessageTerminal messageServer() {
        MessageProcessor messageProcessor = new DefaultServerMessageProcessor() {
            @Override
            public void processReceivedMessage(IoSession session, String strJsonMessage) {
                System.out.println("收到客户端信息:" + strJsonMessage);
                Object objRet = JSON.parse(strJsonMessage);
                session.write(MessageUtils.encodeMessageObj2IoBuffer(objRet));
            }
        };
        MessageServer obj = new MessageServer("localhost", 8989);
        obj.setMessageProcessor(messageProcessor);
        return obj;
    }
}
           

七、其他问题

1、MessageShortClient类并非是线程安全的,事实上,也没有必要,因为本来就是用完即焚;

2、开源问题,在最后的组件下载中,当前提供的是一个源码类型的jar包,并没有提供本组件的实现工程,原因是本组件是基于当前一个未完成的开源框架内实现的(子功能模块),在未来完成开源框架后,将会提供整个项目。

八、组件下载

见: https://download.csdn.net/download/smartcore/10374257