基于springMVC
1.先修改pom文件,添加websocket依赖包
依赖包
2.配置XML 拦截器
bean需要导入对应的包,否则下面的标签会报错
拦截器类,指向自己拦截器类所在的位置,准备工作做完,开始贴后台代码了
3.JAVA后台实现部分代码
先创建一个websocket’config类,指定自己需要拦截的websocket地址,根据自己的实际情况而定,包含了websocket请求的重定向功能,
/**
* 配置类
* @author k.li
*
*/
@Configuration
@EnableWebMvc
@EnableWebSocket
@Service
public class SpringWebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// "/mcloud/cloudclub/cloudService/websocket/*",
//允许连接的域,只能以http或https开头
String[] allowsOrigins = {"http://www.xxx.com"};
//WebIM WebSocket通道
String[] path = new String[]{
"/websocket",
"/meicloud/test/*"
};
registry.addHandler(webSocketHandler(),path).addInterceptors(new SpringWebSocketHandlerInterceptor())
.addInterceptors(interceptors()).setAllowedOrigins("*");
/*registry.addHandler(chatWebSocketHandler(),"/webSocketIMServer").setAllowedOrigins(allowsOrigins).addInterceptors(myInterceptor());
registry.addHandler(chatWebSocketHandler(), "/sockjs/webSocketIMServer").setAllowedOrigins(allowsOrigins).addInterceptors(myInterceptor()).withSockJS();*/
}
@Bean
public TextWebSocketHandler webSocketHandler(){
return new SpringWebSocketHandler();
}
@Bean
public HttpSessionHandshakeInterceptor interceptors(){
return new SpringWebSocketHandlerInterceptor();
}
定义拦截器类,使用websocket应该知道,首先他需要和HTTP一样进行三次握手,不懂的先百度,然后在进行websocket通讯,发送点对点消息数据,这个根据自己的业务,比如你需要拦截用户登录session,取用户名密码,也可以在这个类做,具体代码如下
import java.util.Map;
import javax.servlet.http.HttpSession;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
/**
* websocket 拦截器定义
* @author k.li
*
*/
public class SpringWebSocketHandlerInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
//根据具体业务进行拦截
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession(false);
HttpHeaders headers = request.getHeaders();
if(headers.get("username") != null){
attributes.put("meiid",headers.get("useranme"));
}
if(!attributes.containsKey("username") ||!attributes.containsKey("meiid") ||!attributes.containsKey("meiid") ){
return false;
}else{
return super.beforeHandshake(request, response, wsHandler, attributes);
}
//return false;
}
// 握手后
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception ex) {
super.afterHandshake(request, response, wsHandler, ex);
}
好了。下面到最关键的消息处理类,我是根据我自己的业务写的,你们需要添加自己的业务进去,地方我会注释说明,搬代码要会灵活用用
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.mbk.mcloud.model.GroupUserMsg;
import com.mbk.mcloud.model.MyFriendMsg;
import com.mbk.mcloud.model.MyFriendVerify;
import com.mbk.mcloud.model.User;
import com.mbk.mcloud.service.GroupService;
import com.mbk.mcloud.service.MyFriendService;
import net.sf.json.JSONObject;
public class SpringWebSocketHandler extends TextWebSocketHandler {
@Autowired
private MyFriendService myFriendService;
@Autowired
private GroupService groupService;
private static Logger logger = LoggerFactory.getLogger(SpringWebSocketHandler.class);
public static int onlineNumber = 0;
private static final ConcurrentHashMap<String, WebSocketSession> users; // Map来存储WebSocketSession,key用USER_ID
// 即在线用户列表
// 保存用户ID
private static ConcurrentHashMap<String, String> sessionMap = new ConcurrentHashMap<String, String>();
static {
users = new ConcurrentHashMap<String, WebSocketSession>();
}
MyFriendMsg msg = new MyFriendMsg();
public SpringWebSocketHandler() {
}
/**
* 连接成功时候,会触发页面上onopen方法
*/
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
//连接
logger.info("send msg!!!!!!!!!!!!!!!!!!!!!!!!!!!");
// 验证用户名密码,验证通过则建立连接开始通讯,否则关闭连接
if (session.getAttributes().containsKey("meiid") && session.getAttributes().containsKey("pwd")
&& session.getAttributes().containsKey("device_id")) {
String meiid = session.getAttributes().get("meiid").toString();
String pwd = session.getAttributes().get("pwd").toString();
String device_id = session.getAttributes().get("device_id").toString();
boolean isAuth = isUserPwdAuth(meiid, pwd);
if (isAuth) {
logger.info("websocket connect success!");
session.sendMessage(new TextMessage("webscoket connect success!"));
Map<String, Object> dMap = new HashMap<>();
meiid = meiid.substring(1, meiid.length() - 1);
device_id = device_id.substring(1, device_id.length() - 1);
String key = meiid + "_" + device_id;
users.put(key, session);
System.out.println(" 当前在线人数:" + users.size());
// 保存登录设备信息
dMap.put("meiid", meiid);
dMap.put("device_id", device_id);
// 先判断数据库有有没有该条记录
try {
logger.info(" groupService ExistDevice mothed start.....");
int isT = groupService.ExistDevice(dMap);
logger.info(" groupService ExistDevice mothed end......");
// 没查到保存 查到了不保存
if (isT == 0) {
logger.info(" groupService addLoginDevice mothed start.....");
groupService.addLoginDevice(meiid, device_id);
logger.info(" groupService addLoginDevice mothed end.....");
}
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
logger.error("addLoginDevice mothed error! {}", e.getStackTrace().toString());
}
// 判断自己有没有离线好友消息(需要知道用户的meiid)
try {
List<MyFriendMsg> offLineMsg = isOffLineMsg(meiid);
// 判断有没有好友离线消息
if (offLineMsg.size() > 0) {
// 发送离线消息给自己
for (MyFriendMsg msg : offLineMsg) {
if (msg.getDeviceId().equals(device_id)) {
Map<String, Object> map = new HashMap<>();
msg.setMsgStatus(0);
// 修改状态
logger.info("updateOffLineMsgByMsgRecMeiId mothed start.........");
// 更新好友消息状态
myFriendService.updateOffLineMsgByMsgRecMeiId(msg);
logger.info("updateOffLineMsgByMsgRecMeiId mothed end.........");
map.put("id", msg.getId());
map.put("targer_type", "0");
map.put("meiid", meiid);
map.put("targer_id", msg.getMsgSendMeiId());
map.put("msg_type", msg.getMsgType());
map.put("msg_content", msg.getMsgText());
map.put("msg_time", msg.getMsgSendTime().getTime());
map.put("function", "40001");
session.sendMessage(new TextMessage(JSONObject.fromObject(map).toString()));
}
}
}
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
logger.error("updateOffLineMsgByMsgRecMeiId mothed error {}", e.getStackTrace().toString());
}
// 离线群消息
try {
// 判断自己有没有离线群消息
List<GroupUserMsg> offLineGroupMsg = isOffLineGroupMsg(meiid);
if (offLineGroupMsg.size() > 0) {
for (GroupUserMsg msg : offLineGroupMsg) {
if (msg.getDeviceId().equals(device_id)) {
Map<String, Object> map = new HashMap<>();
msg.setStatus(0);
// 修改消息状态
logger.info("updateGroupUserMsg mothed start.........");
groupService.updateGroupUserMsg(msg);
logger.info("updateGroupUserMsg mothed end.........");
map.put("id", msg.getId());
map.put("targer_type", "1");
map.put("meiid", meiid);
map.put("targer_id", msg.getGroupId());
map.put("msg_content", msg.getMsgContent());
map.put("msg_time", msg.getSendTime().getTime());
map.put("status", msg.getStatus());
map.put("function", "40001");
session.sendMessage(new TextMessage(JSONObject.fromObject(map).toString()));
}
}
}
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
logger.error("updateGroupUserMsg mothed error {}", e.getStackTrace().toString());
}
try {
// 判断好友申请表有没有我的离线消息
List<MyFriendVerify> verifyList = isOffLineMyFriendVerify(meiid);
if (verifyList.size() > 0) {
for (MyFriendVerify mf : verifyList) {
if (mf.getDeviceId().equals(device_id)) {
mf.setIsSend(0);
logger.info("updateMyFriendVerifyStatus mothed start.........");
myFriendService.updateMyFriendVerifyStatus(mf);
logger.info("updateMyFriendVerifyStatus mothed start.........");
logger.info("selectToUserTabelById mothed start.........");
User user = myFriendService.selectToUserTabelById(String.valueOf(mf.getFriendMeiId()));
logger.info("selectToUserTabelById mothed start.........");
Map<String, Object> msg = new HashMap<>();
// 组装数据
if (user.getEmail() != null) {
msg.put("userid", user.getEmail());
}
if (user.getMobile() != null) {
msg.put("userid", user.getMobile());
}
if (user.getFaceUrl() != null) {
msg.put("faceurl", user.getFaceUrl());
} else {
msg.put("faceurl", "");
}
if (user.getUserName() != null) {
msg.put("username", user.getUserName());
} else {
msg.put("username", "");
}
msg.put("id", mf.getId());
msg.put("meiid", mf.getFriendMeiId());
msg.put("friend_meiid", mf.getMeiId());
msg.put("create_time", mf.getCreateTime().getTime());
msg.put("status", mf.getStatus());
msg.put("function", "40002");
session.sendMessage(new TextMessage(JSONObject.fromObject(msg).toString()));
}
}
}
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
logger.error("get friends verify mothed error {}", e.getStackTrace().toString());
}
} else {
session.close();
logger.error("websocket connect error! message {}!", "用户名密码参数验证失败!");
}
} else {
session.close();
logger.error("websocket connect error! message {}!", "用户名密码参数验证失败!");
}
}
/**
* 关闭连接时触发
*/
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
try {
logger.info("closed websocket start .......");
String meiid = session.getAttributes().get("meiid").toString();
meiid = meiid.substring(1, meiid.length() - 1);
String device_id = session.getAttributes().get("device_id").toString();
device_id = device_id.substring(1, device_id.length() - 1);
String key = meiid + "_" + device_id;
users.remove(key);
System.out.println("剩余在线用户" + users.size());
logger.info("closed websocket end .......");
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
logger.error("closed websocket error!{}",e.getStackTrace().toString());
}
super.afterConnectionClosed(session, closeStatus);
// System.out.println(i);
}
/**
* 关闭连接异常处理
*/
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
try {
logger.info("websocket error close start...........");
if (session.isOpen()) {
session.close();
}
String meiid = session.getAttributes().get("meiid").toString();
meiid = meiid.substring(1, meiid.length() - 1);
String device_id = session.getAttributes().get("device_id").toString();
device_id = device_id.substring(1, device_id.length() - 1);
String key = meiid + "_" + device_id;
users.remove(key);
logger.info("websocket error close end ...........");
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
logger.error("websocket error close error{}" ,e.getStackTrace().toString());
}
}
// 判断自己有没有离线消息
public List<MyFriendMsg> isOffLineMsg(String meiid) {
return myFriendService.OffLineMsgByMsgRecMeiId(meiid);
}
// 判断自己有没有离线群消息
public List<GroupUserMsg> isOffLineGroupMsg(String meiid) {
// 查询离线群消息
return groupService.isOffLineGroupMsg(meiid);
}
// 判断自己有没有离线的好友申请消息
public List<MyFriendVerify> isOffLineMyFriendVerify(String meiid) {
// 查询离线群消息
return groupService.isOffLineMyFriendVerify(meiid);
}
/*
* 验证用户名密码
*/
public boolean isUserPwdAuth(String meiid, String pwd) {
boolean isAuth = false;
try {
logger.info("isUserPwdAuth mothed start.............");
meiid = meiid.substring(1, meiid.length() - 1);
pwd = pwd.substring(1, pwd.length() - 1);
List<User> list = myFriendService.isUserPwdAuth(meiid, pwd);
logger.info("isUserPwdAuth mothed end.............");
if (list.size() > 0) {
isAuth = true;
}
} catch (Exception e) {
e.getMessage();
e.printStackTrace();
logger.error("isUserPwdAuth error {}",e.getStackTrace().toString());
}
return isAuth;
}
/**
* js调用websocket.send时候,会调用该方法
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
}
public boolean supportsPartialMessages() {
return false;
}
/**
* 给某个用户发送消息
*
* @param userName
* @param message
*/
public boolean sendMessageToUser(String meiid, TextMessage message) {
//TODO 获取队列消息
boolean isStatus = false;
for (String s : users.keySet()) {
if (s.equals(meiid)) {
try {
if (users.get(s).isOpen()) {
users.get(s).sendMessage(message);
isStatus = true;
}
} catch (Exception e) {
e.getMessage();
e.getStackTrace();
logger.error("sendFriendMsg error {}{}",e.getMessage(),e.getStackTrace().toString());
}
break;
}
}
return isStatus;
}
// 群发消息
public boolean sendMessageToGroup(String meiid, TextMessage message) {
// 查看成员在不在线
boolean isStatus = false;
for (String s : users.keySet()) {
if (s.equals(meiid)) {
try {
if (users.get(s).isOpen()) {
users.get(s).sendMessage(message);
isStatus = true;
}
} catch (Exception e) {
e.getMessage();
e.getStackTrace();
logger.error("sendGroupMsg error {}{}",e.getMessage(),e.getStackTrace().toString());
}
break;
}
}
return isStatus;
}
}
这是我的消息处理类,然后我们到controller层实现消息发送,具体看代码,这是单对单好友消息发送
需要注意的是发送的key是我自己组装的,你们要根据具体的业务去自己组装,
@RequestMapping(value = "/sendFriend", produces = { "application/json;charset=UTF-8" })
@ResponseBody
public String sendFriend(HttpServletRequest request, HttpServletResponse response) {
String meiid = StringUtil.trim(jsonObj.getString("meiid"));
String friend_meiid = StringUtils.trim(jsonObj.getString("friend_meiid"));
String msg_content = StringUtils.trim(jsonObj.getString("msg_content"));
String msg_type = StringUtils.trim(jsonObj.getString("msg_type"));
// 新增设备id
try {
Map<String, Object> loginMap = new HashMap<>();
// 保存消息 次数(根据登录设备决定)
map.put("id", id);
map.put("targer_type", "0");
map.put("meiid", meiid);
map.put("targer_id", friend_meiid);
map.put("msg_content", msg.getMsgText());
map.put("msg_time", msg.getMsgSendTime().getTime());
//keyz指向的是自己websocket的ID,我这里是自己组装的
String key = friend_meiid + "_" + ld.getDevice();
boolean isTrue = infoHandler().sendMessageToUser(key,
new TextMessage(JSONObject.fromObject(map).toString()));
if (!isTrue) {
msg.setMsgStatus(1);
myFriendService.updateOffLineMsgByMsgRecMeiId(msg);
}
// 发送测试数据
retMap.put("code", SysConstants.RET_CODE_SUCCESS);
retMap.put("message", "成功");
return jsonFmt(retMap, logger, commonparamVo);
} catch (Exception e) {
retMap.put("code", SysConstants.RET_CODE_999);
retMap.put("message", "数据库读取异常");
e.printStackTrace();
return jsonFmt(retMap, logger, commonparamVo);
}
}
群消息发送
@RequestMapping(value = “/sendgroup”, produces = { “application/json;charset=UTF-8” })
@ResponseBody
public String sendgroup(HttpServletRequest request, HttpServletResponse response) throws Exception {
业务参数请求
try {
// 1选保存一条群消息记录
// 组装发送数据
Map<String, Object> map = new HashMap<>();
map.put("id", userMsgId);
map.put("targer_type", "1");
map.put("meiid", userMsg.getRecMeiid());
map.put("targer_id", group_id);
map.put("msg_content", msg_content);
map.put("msg_type", msg.getGroupMsgType());
map.put("msg_time", msg.getSendTime().getTime());
String key = ids + "_" + ld.getDevice();
// 给群成员好友挨着发送消息
if (!String.valueOf(userMsg.getSendMeiid()).equals(String.valueOf(userMsg.getRecMeiid()))) {
boolean isTrue = infoHandler().sendMessageToGroup(key,
new TextMessage(JSONObject.fromObject(map).toString()));
if (!isTrue) {
userMsg.setStatus(1);
// 如果群成员不在线,就修改消息状态
groupService.updateGroupUserMsg(userMsg);
}
}
}
}
}
}
retMap.put("code", SysConstants.RET_CODE_SUCCESS);
retMap.put("message", "成功");
return jsonFmt(retMap, logger, commonparamVo);
} catch (Exception e) {
retMap.put("code", SysConstants.RET_CODE_999);
retMap.put("message", "数据库读取异常");
e.printStackTrace();
return jsonFmt(retMap, logger, commonparamVo);
}
}
到这里完整后台代码就实现了,在附送一个前端测试连接的代码
修改WS连接地址即可直接使用,我当时是测试wensocket连接数加的一个循环
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
String path = request.getContextPath();
String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort()
+ path + "/";
String ppp = request.getServerName() + ":" + request.getServerPort() + path + "/";
%>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<base href="<%=basePath%>" target="_blank" rel="external nofollow" >
<title>My JSP 'MyJsp.jsp' starting page</title>
<script src="baseui/js/plugins/jquery.js"></script>
</head>
<script>
function test() {
//每秒刷新一次
var ws = new WebSocket("ws://XXXXXXXXXXXXXX/websocket");
ws.onopen = function(evt) {
console.log("Connection open ...");
ws.send("Hello WebSockets!");
};
ws.onmessage = function(evt) {
console.log("Received Message: " + evt.data);
//ws.close();
};
ws.onclose = function(evt) {
console.log("Connection closed.");
};
}
var c=0;
function showLogin()
{
test();
console.log(c++);
}
setInterval("showLogin()","1000");
</script>
<body>
<button type="button" onclick="showLogin()">连接websocket</button>
<div id="msgcount"></div>
</body>
</html>
好了,文档分享到这,有问题或者疑问,可以随时留言!!!