轉至:https://www.cnblogs.com/zwcry/p/9723447.html
websocket與redismq實作叢集消息聊天
1.application.properties
server.port=8081
#thymeleaf配置
#是否啟用模闆緩存。
spring.thymeleaf.cache=false
#是否為Web架構啟用Thymeleaf視圖解析。
spring.thymeleaf.enabled=true
#在SpringEL表達式中啟用SpringEL編譯器。
spring.thymeleaf.enable-spring-el-compiler=true
#模闆檔案編碼。
spring.thymeleaf.encoding=UTF-8
#要應用于模闆的模闆模式。另請參見Thymeleaf的TemplateMode枚舉。
spring.thymeleaf.mode=HTML5
#在建構URL時添加字首以檢視名稱的字首。
spring.thymeleaf.prefix=classpath:/templates/
#Content-Type寫入HTTP響應的值。
spring.thymeleaf.servlet.content-type=text/html
#在建構URL時附加到視圖名稱的字尾。
spring.thymeleaf.suffix=.html
##單伺服器
spring.redis.host=192.168.159.129
##單端口
spring.redis.port=6379
## 連接配接池最大連接配接數(使用負值表示沒有限制)
spring.redis.pool.max-active=300
## Redis資料庫索引(預設為0)
spring.redis.database=0
## 連接配接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.pool.max-wait=-1
## 連接配接池中的最大空閑連接配接
spring.redis.pool.max-idle=100
## 連接配接池中的最小空閑連接配接
spring.redis.pool.min-idle=20
## 連接配接逾時時間(毫秒)
spring.redis.timeout=60000
2.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.szw.learn</groupId>
<artifactId>websocket_redis_mq_01</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>websocket_redis_mq_01</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.16.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<maven.test.skip>true</maven.test.skip>
<skipTests>true</skipTests>
<thymeleaf.version>3.0.7.RELEASE</thymeleaf.version>
<thymeleaf-layout-dialect.version>2.1.2</thymeleaf-layout-dialect.version>
<start-class>com.szw.learn.WsMqApplication</start-class>
</properties>
<dependencies>
<!-- 使用web啟動器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 測試 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 模闆引擎 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<!-- redis id與1.5之前的變了 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<build>
<plugins>
<!-- 要将源碼放上去,需要加入這個插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<configuration>
<attach>true</attach>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打包 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.SpringUtils.java
package com.szw.learn.util;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Repository;
@Repository
public final class SpringUtils implements BeanFactoryPostProcessor {
private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
public static ConfigurableListableBeanFactory getBeanFactory() {
return beanFactory;
}
/**
* 擷取對象
*
* @param name
* @return Object 一個以所給名字注冊的bean的執行個體
* @throws org.springframework.beans.BeansException
*
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) getBeanFactory().getBean(name);
}
/**
* 擷取類型為requiredType的對象
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*
*/
public static <T> T getBean(Class<T> clz) throws BeansException {
T result = (T) getBeanFactory().getBean(clz);
return result;
}
/**
* 如果BeanFactory包含一個與所給名稱比對的bean定義,則傳回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name) {
return getBeanFactory().containsBean(name);
}
/**
* 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應的bean定義沒有被找到,将會抛出一個異常(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().isSingleton(name);
}
/**
* @param name
* @return Class 注冊對象的類型
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getType(name);
}
/**
* 如果給定的bean名字在bean定義中有别名,則傳回這些别名
*
* @param name
* @return
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getAliases(name);
}
}
4.redis
釋出service:
package com.szw.learn.redismq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
* @author 七脈 描述:釋出service
*/
@Component
public class PublishService {
@Autowired
StringRedisTemplate redisTemplate;
/**
* @author 七脈 描述:釋出方法
* @param channel 消息釋出訂閱 主題
* @param message 消息資訊
*/
public void publish(String channel, Object message) {
// 該方法封裝的 connection.publish(rawChannel, rawMessage);
redisTemplate.convertAndSend(channel, message);
}
}
訂閱監聽類:
package com.szw.learn.redismq;
import java.io.IOException;
import javax.websocket.Session;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* @author 七脈 描述:訂閱監聽類
*/
public class SubscribeListener implements MessageListener {
private StringRedisTemplate stringRedisTemplate;
private Session session;
/**
* 訂閱接收釋出者的消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = new String(message.getBody());
System.out.println(new String(pattern) + "主題釋出:" + msg);
if(null!=session){
try {
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public StringRedisTemplate getStringRedisTemplate() {
return stringRedisTemplate;
}
public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
}
注冊redis監聽容器:
package com.szw.learn.redismq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {
@Autowired
private JedisConnectionFactory jedisConnectionFactory;
/**
* @author 七脈 描述:需要手動注冊RedisMessageListenerContainer加入IOC容器
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory);
return container;
}
}
5.websocket
websocket注冊:
package com.szw.learn.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebsocketConfig {
/**
* <br>描 述: @Endpoint注解的websocket交給ServerEndpointExporter自動注冊管理
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
websocket端點:
package com.szw.learn.websocket;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import com.szw.learn.redismq.PublishService;
import com.szw.learn.redismq.SubscribeListener;
import com.szw.learn.util.SpringUtils;
/**
*@ServerEndpoint(value="/websocket")value值必須以/開路
*備注:@ServerEndpoint注解類不支援使用@Autowire
*{topic}指:向哪個頻道主題裡發消息
*{myname}指:這個消息是誰的。真實環境裡可以使用目前登入使用者資訊
*/
@Component
@ServerEndpoint(value="/websocket/{topic}/{myname}")
public class WebsocketEndpoint {
/**
* 因為@ServerEndpoint不支援注入,是以使用SpringUtils擷取IOC執行個體
*/
private StringRedisTemplate redisTampate = SpringUtils.getBean(StringRedisTemplate.class);
private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
//存放該伺服器該ws的所有連接配接。用處:比如向所有連接配接該ws的使用者發送通知消息。
private static CopyOnWriteArraySet<WebsocketEndpoint> sessions = new CopyOnWriteArraySet<>();
private Session session;
@OnOpen
public void onOpen(Session session,@PathParam("topic")String topic){
System.out.println("java websocket:打開連接配接");
this.session = session;
sessions.add(this);
SubscribeListener subscribeListener = new SubscribeListener();
subscribeListener.setSession(session);
subscribeListener.setStringRedisTemplate(redisTampate);
//設定訂閱topic
redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic(topic));
}
@OnClose
public void onClose(Session session){
System.out.println("java websocket:關閉連接配接");
sessions.remove(this);
}
@OnMessage
public void onMessage(Session session,String message,@PathParam("topic")String topic,@PathParam("myname")String myname) throws IOException{
message = myname+":"+message;
System.out.println("java websocket 收到消息=="+message);
PublishService publishService = SpringUtils.getBean(PublishService.class);
publishService.publish(topic, message);
}
@OnError
public void onError(Session session,Throwable error){
System.out.println("java websocket 出現錯誤");
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
}
測試controller
package com.szw.learn.websocket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.ModelAndView;
@Controller
@RequestMapping("websocket")
public class WebsocketController {
@Value("${server.port}")
private String port;
public static final String INDEX = "websocket/index";
/**
* @author 七脈
* 描述:聊天頁
* @param topic 釋出訂閱的頻道主題
* @param myname 釋出者的顯示名稱
* @return
*/
@RequestMapping("index/{topic}/{myname}")
public ModelAndView index(@PathVariable("topic")String topic,@PathVariable("myname")String myname){
ModelAndView mav = new ModelAndView(INDEX);
mav.addObject("port", port);
mav.addObject("topic",topic);
mav.addObject("myname",myname);
return mav;
}
}
6.啟動類
package com.szw.learn;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WsMqApplication {
public static void main(String[] args) {
System.setProperty("spring.devtools.restart.enabled", "false");
SpringApplication.run(WsMqApplication.class, args);
}
}
7.測試頁面
<!doctype html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="utf-8"></meta>
<title>websocket叢集</title>
</head>
<body>
本服務端口号:[[${port}]],使用redismq實作websocket叢集<br/>
[[${topic}]] 頻道 聊天中。。。<br/>
<input id="input_id" type="text" /><button onclick="sendMessage()">發送</button> <button onclick="closeWebsocket()">關閉</button>
<div id="message_id"></div>
</body>
<script type="text/javascript">
document.getElementById('input_id').focus();
var websocket = null;
//目前浏覽前是否支援websocket
if("WebSocket" in window){
var url = "ws://127.0.0.1:[[${port}]]/websocket/[[${topic}]]/[[${myname}]]";
websocket = new WebSocket(url);
}else{
alert("浏覽器不支援websocket");
}
websocket.onopen = function(event){
setMessage("打開連接配接");
}
websocket.onclose = function(event){
setMessage("關閉連接配接");
}
websocket.onmessage = function(event){
setMessage(event.data);
}
websocket.onerror = function(event){
setMessage("連接配接異常");
}
//監聽視窗關閉事件,當視窗關閉時,主動去關閉websocket連接配接,防止連接配接還沒斷開就關閉視窗,server端會抛異常。
window.onbeforeunload = function(){
closeWebsocket();
}
//關閉websocket
function closeWebsocket(){
//3代表已經關閉
if(3!=websocket.readyState){
websocket.close();
}else{
alert("websocket之前已經關閉");
}
}
//将消息顯示在網頁上
function setMessage(message){
document.getElementById('message_id').innerHTML += message + '<br/>';
}
//發送消息
function sendMessage(){
//1代表正在連接配接
if(1==websocket.readyState){
var message = document.getElementById('input_id').value;
//setMessage(message);
websocket.send(message);
}else{
alert("websocket未連接配接");
}
document.getElementById('input_id').value="";
document.getElementById('input_id').focus();
}
</script>
</html>