websocket簡單使用
<!-- webSocket start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- webSocket end-->
<!-- 微信支付api-->
<dependency>
<groupId>com.github.wechatpay-apiv3</groupId>
<artifactId>wechatpay-apache-httpclient</artifactId>
<version>0.2.2</version>
</dependency>
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.hikvision.pgmrs.entity.MessageInfoEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 事件訂閱websocket服務端
* @author
* @date 2020/8/3
* @param
* @return
*/
@ServerEndpoint(value = "/webSocket/message/{personId}")
@Component
@Slf4j
public class WebSocketServer {
/**用戶端線上連接配接數量,靜态變量,線程安全。*/
private static AtomicInteger onlineNum = new AtomicInteger();
/** session池,,用來存放每個用戶端對應的WebSocket session對象。*/
private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
/**接收personId*/
private String personId="";
public void sendMessage(MessageInfoEntity messageInfoEntity){
log.info("websocket推送給前端實時消息: {}",JSONObject.toJSON(messageInfoEntity));
String message = JSONObject.toJSONString(messageInfoEntity,SerializerFeature.WriteMapNullValue);
Integer messageType = messageInfoEntity.getMessageType();
switch (messageType){
case 1:
case 6:
String originalPersonId = messageInfoEntity.getOriginalPersonId();
if(Objects.nonNull(originalPersonId)){
sendInfoToPerson(originalPersonId, message);
}
break;
case 3:
case 5:
String applyPersonId = messageInfoEntity.getApplyPersonId();
if(Objects.nonNull(applyPersonId)){
sendInfoToPerson(applyPersonId, message);
}
break;
case 7:
// case 9:
String meetingDelayPersonId = messageInfoEntity.getMeetingDelayPersonId();
if(Objects.nonNull(meetingDelayPersonId)){
sendInfoToPerson(meetingDelayPersonId, message);
}
break;
case 2:
case 4:
case 8:
case 9:
sendInfoToAll(message);
break;
case 10:
case 11:
String presentPersonId = messageInfoEntity.getPresentPersonId();
if(Objects.nonNull(presentPersonId)){
sendInfoToPerson(presentPersonId, message);
}
default:
log.info("未比對到對應的消息類型: {}", message);
}
}
/**
* 發送消息
* @author
* @date 2020/8/3
* @param session, message
* @return void
*/
private void sendMessage(Session session, String message) throws IOException {
if(session != null){
synchronized (session) {
session.getBasicRemote().sendText(message);
}
}
}
/*
* 給指定使用者發送資訊
* @author
* @date 2020/8/3
* @param personId, message
* @return void
*/
public void sendInfoToPerson(String personId, String message){
Session session = sessionPools.get(personId);
if(Objects.nonNull(session)){
try {
sendMessage(session, message);
log.info("推送個人消息: {}, 到{}", message, personId);
} catch (IOException e) {
e.printStackTrace();
}
}else{
log.info("推送個人消息: {}, 到{}, session中未找到對應人員", message, personId);
}
}
/*
* 給指定角色發送資訊
* @author
* @date 2020/8/3
* @param personId, message
* @return void
*/
public void sendInfoToAll(String message){
Collection<Session> sessionList = sessionPools.values();
for(Session session : sessionList){
try {
sendMessage(session, message);
} catch (IOException e) {
e.printStackTrace();
}
}
log.info("推送全部消息: {}", message);
}
/**
* 建立連接配接成功調用 @PathParam(value = "personId") String personId
* @author
* @date 2020/8/3
* @param session, personId
* @return void
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "personId") String personId){
this.personId = personId;
sessionPools.put(personId, session);
addOnlineCount();
log.info("使用者{}通過webSocket訂閱消息成功, 目前總線上使用者為{}", personId, onlineNum);
}
/**
* 關閉連接配接時調用
* @author
* @date 2020/8/3
* @param
* @return void
*/
@OnClose
public void onClose(Session session){
if(sessionPools.containsKey(personId)){
sessionPools.remove(personId);
subOnlineCount();
}
log.info("使用者{}退出, 目前總線上使用者為{}", personId, onlineNum);
}
/**
* 收到用戶端資訊
* @author
* @date 2020/8/3
* @param message
* @return void
*/
@OnMessage
public void onMessage(String message){
log.info("接收到用戶端消息:{}",message);
}
/**
* 錯誤時調用
* @author
* @date 2020/8/3
* @param session, throwable
* @return void
*/
@OnError
public void onError(Session session, Throwable throwable){
log.info("websocket發生錯誤,摘要{},堆棧{}",throwable.getMessage(),throwable.getStackTrace());
throwable.printStackTrace();
}
public static void addOnlineCount(){
onlineNum.incrementAndGet();
}
public static void subOnlineCount() {
onlineNum.decrementAndGet();
}
}