1、介绍
Apache MINA 是一个网络应用框架,有助于用户非常方便地开发高性能、高伸缩性的网络应用。它通过Java NIO提供了一个抽象的、事件驱动的、异步的位于各种传输协议(如TCP/IP和UDP/IP)之上的API,
Apache MINA 通常可被称之为:
NIO 框架库;
客户端/服务器框架库;
或者一个网络socket库。
然而,它所提供的功能远不止这些。
(以上内容大致翻译自Apache MINA网站)
如期官方文档的介绍,Apache MINA 是一个网络应用程序框架,它对Java中的socket和NIO进行了有效和清晰的封装,方便开发人员开发TCP/UDP程序,从而抛开在使用原始的socket时需要考虑的各种繁杂而又烦人问题(线程、性能、会话等),把更多精力专著在应用中的业务逻辑的开发上。
Apache MINA 有两个主要版本:2.0 和 1.1,2.0与1.1有较大的区别,其采用java NIO进行开发,使得性能得到有效的提升,在接口方面也有不小的变化,具体信息可以参见其网站说明。
下面的介绍以 Apache MINA 2.0 为例。
2、服务器端
import im.filter.CoderFactory;
import im.handler.ServerHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class SocketServer {
private static final int PORT = 2023; // 链接端口号
private static IoAcceptor acceptor;
public static void startMinaServer() {
acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());//日志过滤
acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new CoderFactory()));//消息协议过滤
acceptor.getFilterChain().addLast("executor",new ExecutorFilter(5, 800));//
acceptor.setHandler(new ServerHandler());//自己处理的业务
acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));//端口
acceptor.getSessionConfig().setReadBufferSize(2048);//服务端读缓存大小
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 12);//客户端和服务器心跳间隔时间(单位:秒)
try {
acceptor.bind();
} catch (IOException e) {
e.printStackTrace();
}
}
执行startMinaServer方法,Mina的服务就开启了。
其中:a)LoggingFilter 类是Mina自带的日志过滤器
b)ProtocolCoderFilter类是mina自带的,协议编码过滤器,其构造函数带参数ProtocolCodecFactory 接口;CoderFactory 类实现了ProtocolCodecFactory接口
类CoderFactory.java
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
public class CoderFactory implements ProtocolCodecFactory{
private final Encoder encoder;
private final Decoder decoder;
public CoderFactory() {
this(Charset.defaultCharset());
}
public CoderFactory(Charset charSet) {
this.encoder = new Encoder(charSet);
this.decoder = new Decoder(charSet);
}
@Override
public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
return decoder;
}
@Override
public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
return encoder;
}
}
c)ExecutorFilter类是Mina自带的执行线程模型,第一个参数是初始化线程数,第二个参数是最大线程数
d)ServerHandler类是自定义的,处理自己业务的类:
ServerHandler.java
import im.entity.Header;
import im.entity.LoginBean;
import im.utils.SessionMap;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
public class ServerHandler implements IoHandler{
private final static Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable throwable)
throws Exception {
LOGGER.info("exceptionCaught"+throwable.getMessage());
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
LOGGER.info("messageReceived");
Header header = (Header)message;
LOGGER.info("收到的消息请求"+new Gson().toJson(message));
new Execute(session, header).handler();//根据消息类型,处理不同的操作
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
LOGGER.info("messageSent");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
LOGGER.info("sessionClosed");
LoginBean bean = (LoginBean)session.getAttribute("USERNAME");
LOGGER.info(bean.getUsername()+"退出登录");
if(null!=bean.getUsername()){
SessionMap.unregisterSession(bean.getId());
}
session.close(true);
LOGGER.info(session+"关闭");
}
@Override
public void sessionCreated(IoSession session) throws Exception {
LOGGER.debug("sessionCreated");
SocketSessionConfig cfg = (SocketSessionConfig) session.getConfig();
cfg.setReceiveBufferSize(2 * 1024 * 1024);
cfg.setReadBufferSize(2 * 1024 * 1024);
cfg.setKeepAlive(true);//是否保持长连接
cfg.setSoLinger(0);
}
@Override
public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception {
LOGGER.info("sessionIdle");
LoginBean bean = (LoginBean)session.getAttribute("USERNAME");
if(null!=bean){
LOGGER.info(bean.getUsername()+"退出登录");
if(null!=bean.getUsername()){
SessionMap.unregisterSession(bean.getId());
}
}
session.close(true);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
LOGGER.info("sessionOpened");
}
}
e)Execute类,自定义类,根据消息类型处理不同的业务
Execute.java
import im.entity.Header;
import im.entity.LoginBean;
import im.utils.SessionMap;
import org.apache.mina.core.session.IoSession;
public class Execute {
private IoSession ioSession;
private Header header;
public Execute(IoSession ioSession, Header header) {
this.ioSession = ioSession;
this.header = header;
}
public void handler() {
if (null != header.getId() && !"".equals(header.getId())) {
IoSession session = SessionMap.getSession(header.getId());
if (null != session) {
if (!session.equals(ioSession)) {
SessionMap.unregisterSession(header.getId());
SessionMap.registerSession((LoginBean) session
.getAttribute("USERNAME"), ioSession);
}
}
}
if ((null != header.getUsername() && !"".equals(header.getUsername()))) {
IoSession session = SessionMap.getSession(header.getUsername());
if (null != session) {
if (!session.equals(ioSession)) {
SessionMap.unregisterSession(header.getUsername());
SessionMap.registerSession((LoginBean) session
.getAttribute("USERNAME"), ioSession);
}
}
}
System.out.println("---------------------" + header.getType()
+ " Systemtime:" + System.currentTimeMillis());
// 登陆请求
if ("login".equals(header.getType())) {
LoginHandler.execute(ioSession, header);
// 单聊
} else if ("single".equals(header.getType())) {
SingleHandler.execute(ioSession, header);
// 群聊
} else if ("group".equals(header.getType())) {
GroupHandler.execute(ioSession, header);
// 客户端离线消息删除请求
} else if ("offlinemessagereceive".equals(header.getType())) {
DelOffLineMessageHandler.execute(ioSession, header);
// 消息转发
} else if ("forward".equals(header.getType())) {
ForwardMessageHandler.execute(ioSession, header);
// 获取离线消息
} else if ("offline".equals(header.getType())) {
FindOfflLineMessageHandler.execute(ioSession, header);
// 获取指定人聊天记录
} else if ("findsinglemessage".equals(header.getType())) {
FindSingleMessageHandler.execute(ioSession, header);
// 获取指定群组聊天记录
} else if ("findgroupmessage".equals(header.getType())) {
FindGroupMessageHandler.execute(ioSession, header);
// 获取离线通讯录
} else if ("offlineaddress".equals(header.getType())) {
FindOfflineAddress.execute(ioSession, header);
} else if ("delsingleonemsg".equals(header.getType())) {
// 删除一条消息,在客户端长按删除
DelSingleOneMessageHandler.execute(ioSession, header);
// 删除全部单聊消息
} else if ("delsingleallmsg".equals(header.getType())) {
DelSingleAllMessageHandler.execute(ioSession, header);
// 删除群组消息
} else if ("delgrouponemsg".equals(header.getType())) {
DelGroupOneMessageHandler.execute(ioSession, header);
// 客户端更新离线通讯录之后的回执
} else if ("contactserverupdate".equals(header.getType())) {
SontactServerUpdate.execute(ioSession, header);
// web服务器发送更新请求(通讯录)
} else if ("serverupdate".equals(header.getType())) {
SendMessageAllHandler.execute(header);
// web服务器发送更新请求(群组)
} else if ("serverroom".equals(header.getType())) {
SendGroupMessageHandler.execute(header);
// web发送即时公告 分享
} else if ("notice".equals(header.getType())) {
NoticeMessageHandler.execute(header);
// 获取新朋友,对外发布
} else if ("addfriend".equals(header.getType())) {
AddFriendHandler.execute(header);
} else if ("findnewfriend".equals(header.getType())) {
FindNewFriendHandler.execute(ioSession, header);
} else if ("findnewfriendreceive".equals(header.getType())) {
DelNewFriendHandler.execute(header);
} else if ("agreeAddFriend".equals(header.getType())) {
AgreeAddFriendHandler.execute(ioSession, header);
// 所有http请求都用socket
} else if ("url".equals(header.getType())) {
HttpClient.execute(ioSession, header);
// 心跳设置 时间长则没必要这么频繁
} else if ("heard".equals(header.getType())) {
HeardHandler.execute(ioSession, header);
// 下线
} else if ("logout".equals(header.getType())) {
SessionMap.unregisterSession(header.getUsername());
ioSession.close(true);
// 支付
} else if ("".equalsIgnoreCase(header.getType())) {
ReapalPushHandler.execute(header);
}
}
}
f)SessionMap类,自定义类,用来存储连接服务器的所有客户端的长连接信息
import im.entity.LoginBean;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
public class SessionMap
{
private final static Logger logger = Logger.getLogger(SessionMap.class);
public static Map<String, IoSession> SESSION_MAP = new HashMap<String, IoSession>();
public static void registerSession(LoginBean bean ,IoSession IoSession)
{
synchronized (SESSION_MAP)
{
IoSession.setAttribute("USERNAME", bean);
SessionMap.SESSION_MAP.put(getSessionid(bean.getId()), IoSession);
logger.info("注册Session成功");
}
}
public static IoSession getSession(String username)
{
synchronized (SESSION_MAP)
{
return SessionMap.SESSION_MAP.get(getSessionid(username));
}
}
public static void unregisterSession(String username)
{
synchronized (SESSION_MAP)
{
SessionMap.SESSION_MAP.remove(getSessionid(username));
logger.info("注销sessionid成功");
}
}
public static String getSessionid(String username)
{
return username.toUpperCase();
}
}
g)Header类 自定义类,消息格式类
Header.java
public class Header {
private String id; // 用户id
private String msgid; // 消息的唯一标识
private String to; // 消息接收人
private String from; // 消息发送人
private String username; // 用户名
private String password; // 密码
private String msgtype; // 消息类型
private String type; // 请求类型
private String error; // 错位信息
private long contentLength; // 消息长度
private int groupid; // 群组id
private int length;
private String device; // 设备号
private String date; // 时间
private String result; // 登陆时 返回结果
private String content; // 消息内容
private String voicetime; // 语音时长
private String loginpwdstatus; // 是否强制修改密码
private String imageWidth; // 图片宽度
private String imageHeight; // 图片高度
private String deletewithuser;
private String userids; // 转发多个用户 用户集合
private String groupids; // 转发多个群组 群组集合
private String sortid; // 排序id
private String isoffline; // 是否离线
private String offlinecount; // 离线数量
}