天天看点

MINA学习笔记一

1、介绍

Apache MINA 是一个网络应用框架,有助于用户非常方便地开发高性能、高伸缩性的网络应用。它通过Java NIO提供了一个抽象的、事件驱动的、异步的位于各种传输协议(如TCP/IP和UDP/IP)之上的API,

Apache MINA 通常可被称之为:

NIO 框架库;

客户端/服务器框架库;

或者一个网络socket库。

然而,它所提供的功能远不止这些。

(以上内容大致翻译自Apache MINA网站)

如期官方文档的介绍,Apache MINA 是一个网络应用程序框架,它对Java中的socket和NIO进行了有效和清晰的封装,方便开发人员开发TCP/UDP程序,从而抛开在使用原始的socket时需要考虑的各种繁杂而又烦人问题(线程、性能、会话等),把更多精力专著在应用中的业务逻辑的开发上。

Apache MINA 有两个主要版本:2.0 和 1.1,2.0与1.1有较大的区别,其采用java NIO进行开发,使得性能得到有效的提升,在接口方面也有不小的变化,具体信息可以参见其网站说明。

下面的介绍以 Apache MINA 2.0 为例。

2、服务器端

import im.filter.CoderFactory;
import im.handler.ServerHandler;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class SocketServer {
    private static final int PORT = 2023; // 链接端口号
    private static IoAcceptor acceptor;
    public static void startMinaServer() {
        acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast("logger", new LoggingFilter());//日志过滤
        acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new CoderFactory()));//消息协议过滤
        acceptor.getFilterChain().addLast("executor",new ExecutorFilter(5, 800));//
        acceptor.setHandler(new ServerHandler());//自己处理的业务
        acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));//端口
        acceptor.getSessionConfig().setReadBufferSize(2048);//服务端读缓存大小
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 12);//客户端和服务器心跳间隔时间(单位:秒)
        try {
            acceptor.bind();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
           

执行startMinaServer方法,Mina的服务就开启了。

其中:a)LoggingFilter 类是Mina自带的日志过滤器

b)ProtocolCoderFilter类是mina自带的,协议编码过滤器,其构造函数带参数ProtocolCodecFactory 接口;CoderFactory 类实现了ProtocolCodecFactory接口

类CoderFactory.java

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

public class CoderFactory implements ProtocolCodecFactory{
            
            private final Encoder encoder;  
	    private final Decoder decoder;  
	   
	    public CoderFactory() {  
	        this(Charset.defaultCharset());  
	    }  
	   
	    public CoderFactory(Charset charSet) {  
	        this.encoder = new Encoder(charSet);  
	        this.decoder = new Decoder(charSet);  
	    }  
	   
	@Override
	public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
		return decoder;
	}

	@Override
	public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
		return encoder;
	} 
}
           

c)ExecutorFilter类是Mina自带的执行线程模型,第一个参数是初始化线程数,第二个参数是最大线程数

d)ServerHandler类是自定义的,处理自己业务的类:

ServerHandler.java
           
import im.entity.Header;
import im.entity.LoginBean;
import im.utils.SessionMap;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;

public class ServerHandler implements IoHandler{

    private final static Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
    @Override
    public void exceptionCaught(IoSession session, Throwable throwable)
            throws Exception {
        LOGGER.info("exceptionCaught"+throwable.getMessage());
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        LOGGER.info("messageReceived");
        Header header = (Header)message;
        LOGGER.info("收到的消息请求"+new Gson().toJson(message));
        new Execute(session, header).handler();//根据消息类型,处理不同的操作
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        LOGGER.info("messageSent");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        LOGGER.info("sessionClosed");
        LoginBean bean = (LoginBean)session.getAttribute("USERNAME");
        LOGGER.info(bean.getUsername()+"退出登录");
        if(null!=bean.getUsername()){
            SessionMap.unregisterSession(bean.getId());
        }
        session.close(true);
        LOGGER.info(session+"关闭");
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        LOGGER.debug("sessionCreated");
        SocketSessionConfig cfg = (SocketSessionConfig) session.getConfig();   
         cfg.setReceiveBufferSize(2 * 1024 * 1024);   
         cfg.setReadBufferSize(2 * 1024 * 1024);   
         cfg.setKeepAlive(true);//是否保持长连接
         cfg.setSoLinger(0); 
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception {
        LOGGER.info("sessionIdle");
        LoginBean bean = (LoginBean)session.getAttribute("USERNAME");
        if(null!=bean){
            LOGGER.info(bean.getUsername()+"退出登录");
            if(null!=bean.getUsername()){
                SessionMap.unregisterSession(bean.getId());
            }
        }
        session.close(true);
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        LOGGER.info("sessionOpened");
    }
}
           

e)Execute类,自定义类,根据消息类型处理不同的业务

Execute.java

import im.entity.Header;
import im.entity.LoginBean;
import im.utils.SessionMap;

import org.apache.mina.core.session.IoSession;

public class Execute {
	private IoSession ioSession;
	private Header header;

	public Execute(IoSession ioSession, Header header) {
		this.ioSession = ioSession;
		this.header = header;
	}

	public void handler() {
		if (null != header.getId() && !"".equals(header.getId())) {
			IoSession session = SessionMap.getSession(header.getId());
			if (null != session) {
				if (!session.equals(ioSession)) {
					SessionMap.unregisterSession(header.getId());
					SessionMap.registerSession((LoginBean) session
							.getAttribute("USERNAME"), ioSession);
				}
			}
		}
		if ((null != header.getUsername() && !"".equals(header.getUsername()))) {
			IoSession session = SessionMap.getSession(header.getUsername());
			if (null != session) {
				if (!session.equals(ioSession)) {
					SessionMap.unregisterSession(header.getUsername());
					SessionMap.registerSession((LoginBean) session
							.getAttribute("USERNAME"), ioSession);
				}
			}
		}
		System.out.println("---------------------" + header.getType()
				+ "  Systemtime:" + System.currentTimeMillis());
		// 登陆请求
		if ("login".equals(header.getType())) {
			LoginHandler.execute(ioSession, header);
			// 单聊
		} else if ("single".equals(header.getType())) {
			SingleHandler.execute(ioSession, header);
			// 群聊
		} else if ("group".equals(header.getType())) {
			GroupHandler.execute(ioSession, header);
			// 客户端离线消息删除请求
		} else if ("offlinemessagereceive".equals(header.getType())) {
			DelOffLineMessageHandler.execute(ioSession, header);
			// 消息转发
		} else if ("forward".equals(header.getType())) {
			ForwardMessageHandler.execute(ioSession, header);
			// 获取离线消息
		} else if ("offline".equals(header.getType())) {
			FindOfflLineMessageHandler.execute(ioSession, header);
			// 获取指定人聊天记录
		} else if ("findsinglemessage".equals(header.getType())) {
			FindSingleMessageHandler.execute(ioSession, header);
			// 获取指定群组聊天记录
		} else if ("findgroupmessage".equals(header.getType())) {
			FindGroupMessageHandler.execute(ioSession, header);
			// 获取离线通讯录
		} else if ("offlineaddress".equals(header.getType())) {
			FindOfflineAddress.execute(ioSession, header);
		} else if ("delsingleonemsg".equals(header.getType())) {
			// 删除一条消息,在客户端长按删除
			DelSingleOneMessageHandler.execute(ioSession, header);
			// 删除全部单聊消息
		} else if ("delsingleallmsg".equals(header.getType())) {
			DelSingleAllMessageHandler.execute(ioSession, header);
			// 删除群组消息
		} else if ("delgrouponemsg".equals(header.getType())) {
			DelGroupOneMessageHandler.execute(ioSession, header);
			// 客户端更新离线通讯录之后的回执
		} else if ("contactserverupdate".equals(header.getType())) {
			SontactServerUpdate.execute(ioSession, header);
			// web服务器发送更新请求(通讯录)
		} else if ("serverupdate".equals(header.getType())) {
			SendMessageAllHandler.execute(header);
			// web服务器发送更新请求(群组)
		} else if ("serverroom".equals(header.getType())) {
			SendGroupMessageHandler.execute(header);
			// web发送即时公告 分享
		} else if ("notice".equals(header.getType())) {
			NoticeMessageHandler.execute(header);
			// 获取新朋友,对外发布
		} else if ("addfriend".equals(header.getType())) {
			AddFriendHandler.execute(header);
		} else if ("findnewfriend".equals(header.getType())) {
			FindNewFriendHandler.execute(ioSession, header);
		} else if ("findnewfriendreceive".equals(header.getType())) {
			DelNewFriendHandler.execute(header);
		} else if ("agreeAddFriend".equals(header.getType())) {
			AgreeAddFriendHandler.execute(ioSession, header);
			// 所有http请求都用socket
		} else if ("url".equals(header.getType())) {
			HttpClient.execute(ioSession, header);
			// 心跳设置 时间长则没必要这么频繁
		} else if ("heard".equals(header.getType())) {
			HeardHandler.execute(ioSession, header);
			// 下线
		} else if ("logout".equals(header.getType())) {
			SessionMap.unregisterSession(header.getUsername());
			ioSession.close(true);
			// 支付
		} else if ("".equalsIgnoreCase(header.getType())) {
			ReapalPushHandler.execute(header);
		}
	}
}
           

f)SessionMap类,自定义类,用来存储连接服务器的所有客户端的长连接信息

import im.entity.LoginBean;

import java.util.HashMap;
import java.util.Map;

import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;

public class SessionMap
{
    private final static Logger            logger        = Logger.getLogger(SessionMap.class);
    public static Map<String, IoSession>    SESSION_MAP    = new HashMap<String, IoSession>();
    public static void registerSession(LoginBean bean ,IoSession IoSession)
    {
        synchronized (SESSION_MAP)
        {
            IoSession.setAttribute("USERNAME", bean);
            SessionMap.SESSION_MAP.put(getSessionid(bean.getId()), IoSession);
            logger.info("注册Session成功");
        }
    }
    
    public static IoSession getSession(String username)
    {
        synchronized (SESSION_MAP)
        {
            return SessionMap.SESSION_MAP.get(getSessionid(username));
        }
    }
    public static void unregisterSession(String username)
    {
        synchronized (SESSION_MAP)
        {
            SessionMap.SESSION_MAP.remove(getSessionid(username));
            logger.info("注销sessionid成功");
        }
    }
    public static String getSessionid(String username)
    {
        return username.toUpperCase();
    }
}
           

g)Header类  自定义类,消息格式类

Header.java

public class Header {
	private String id; // 用户id
	private String msgid; // 消息的唯一标识
	private String to; // 消息接收人
	private String from; // 消息发送人
	private String username; // 用户名
	private String password; // 密码
	private String msgtype; // 消息类型
	private String type; // 请求类型
	private String error; // 错位信息
	private long contentLength; // 消息长度
	private int groupid; // 群组id
	private int length;
	private String device; // 设备号
	private String date; // 时间
	private String result; // 登陆时 返回结果
	private String content; // 消息内容
	private String voicetime; // 语音时长
	private String loginpwdstatus; // 是否强制修改密码
	private String imageWidth; // 图片宽度
	private String imageHeight; // 图片高度
	private String deletewithuser;
	private String userids; // 转发多个用户 用户集合
	private String groupids; // 转发多个群组 群组集合
	private String sortid; // 排序id
	private String isoffline; // 是否离线
	private String offlinecount; // 离线数量
}
           

继续阅读