天天看點

websocket redis實作叢集即時消息聊天websocket與redismq實作叢集消息聊天

轉至:https://www.cnblogs.com/zwcry/p/9723447.html

websocket與redismq實作叢集消息聊天

1.application.properties

server.port=8081

#thymeleaf配置
#是否啟用模闆緩存。
spring.thymeleaf.cache=false
#是否為Web架構啟用Thymeleaf視圖解析。
spring.thymeleaf.enabled=true
#在SpringEL表達式中啟用SpringEL編譯器。
spring.thymeleaf.enable-spring-el-compiler=true
#模闆檔案編碼。
spring.thymeleaf.encoding=UTF-8
#要應用于模闆的模闆模式。另請參見Thymeleaf的TemplateMode枚舉。
spring.thymeleaf.mode=HTML5
#在建構URL時添加字首以檢視名稱的字首。
spring.thymeleaf.prefix=classpath:/templates/
#Content-Type寫入HTTP響應的值。
spring.thymeleaf.servlet.content-type=text/html
#在建構URL時附加到視圖名稱的字尾。
spring.thymeleaf.suffix=.html

##單伺服器
spring.redis.host=192.168.159.129
##單端口
spring.redis.port=6379
## 連接配接池最大連接配接數(使用負值表示沒有限制) 
spring.redis.pool.max-active=300
## Redis資料庫索引(預設為0) 
spring.redis.database=0
## 連接配接池最大阻塞等待時間(使用負值表示沒有限制) 
spring.redis.pool.max-wait=-1
## 連接配接池中的最大空閑連接配接 
spring.redis.pool.max-idle=100
## 連接配接池中的最小空閑連接配接 
spring.redis.pool.min-idle=20
## 連接配接逾時時間(毫秒) 
spring.redis.timeout=60000
           

2.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.szw.learn</groupId>
    <artifactId>websocket_redis_mq_01</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>websocket_redis_mq_01</name>
    
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.16.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.test.skip>true</maven.test.skip>
        <skipTests>true</skipTests>
        <thymeleaf.version>3.0.7.RELEASE</thymeleaf.version>
        <thymeleaf-layout-dialect.version>2.1.2</thymeleaf-layout-dialect.version>
        <start-class>com.szw.learn.WsMqApplication</start-class>
    </properties>

    <dependencies>
        <!-- 使用web啟動器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 測試 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 模闆引擎 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        
        <!-- redis id與1.5之前的變了 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        
        <!-- websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <plugins>
            <!-- 要将源碼放上去,需要加入這個插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <configuration>
                    <attach>true</attach>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打包 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
           

3.SpringUtils.java

package com.szw.learn.util;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Repository;

@Repository
public final class SpringUtils implements BeanFactoryPostProcessor {

    private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }
    
    public static ConfigurableListableBeanFactory getBeanFactory() {
        return beanFactory;
    }

    /**
     * 擷取對象
     *
     * @param name
     * @return Object 一個以所給名字注冊的bean的執行個體
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T) getBeanFactory().getBean(name);
    }

    /**
     * 擷取類型為requiredType的對象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException {
        T result = (T) getBeanFactory().getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一個與所給名稱比對的bean定義,則傳回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return getBeanFactory().containsBean(name);
    }

    /**
     * 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應的bean定義沒有被找到,将會抛出一個異常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注冊對象的類型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getType(name);
    }

    /**
     * 如果給定的bean名字在bean定義中有别名,則傳回這些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return getBeanFactory().getAliases(name);
    }

}




4.redis

釋出service:



package com.szw.learn.redismq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

/**
 * @author 七脈 描述:釋出service
 */
@Component
public class PublishService {
    @Autowired
    StringRedisTemplate redisTemplate;

    /**
     * @author 七脈 描述:釋出方法
     * @param channel 消息釋出訂閱 主題
     * @param message 消息資訊
     */
    public void publish(String channel, Object message) {
        // 該方法封裝的 connection.publish(rawChannel, rawMessage);
        redisTemplate.convertAndSend(channel, message);
    }
}
           

訂閱監聽類:

package com.szw.learn.redismq;

import java.io.IOException;

import javax.websocket.Session;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;

/**
 * @author 七脈 描述:訂閱監聽類
 */
public class SubscribeListener implements MessageListener {
    
    private StringRedisTemplate stringRedisTemplate;
    
    private Session session;
    
    /**
     * 訂閱接收釋出者的消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String msg = new String(message.getBody());
        System.out.println(new String(pattern) + "主題釋出:" + msg);
        if(null!=session){
            try {
                session.getBasicRemote().sendText(msg);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public StringRedisTemplate getStringRedisTemplate() {
        return stringRedisTemplate;
    }

    public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
    
}


注冊redis監聽容器:



package com.szw.learn.redismq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

@Configuration
public class RedisConfig {
    @Autowired
    private JedisConnectionFactory jedisConnectionFactory;
    
    /**
     * @author 七脈 描述:需要手動注冊RedisMessageListenerContainer加入IOC容器
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        
        container.setConnectionFactory(jedisConnectionFactory);

        return container;

    }
}
           

5.websocket

websocket注冊:

package com.szw.learn.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebsocketConfig {
    /**
     * <br>描 述:    @Endpoint注解的websocket交給ServerEndpointExporter自動注冊管理
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}
           

websocket端點:

package com.szw.learn.websocket;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

import com.szw.learn.redismq.PublishService;
import com.szw.learn.redismq.SubscribeListener;
import com.szw.learn.util.SpringUtils;
/**
 *@ServerEndpoint(value="/websocket")value值必須以/開路
 *備注:@ServerEndpoint注解類不支援使用@Autowire
 *{topic}指:向哪個頻道主題裡發消息
 *{myname}指:這個消息是誰的。真實環境裡可以使用目前登入使用者資訊
 */
@Component
@ServerEndpoint(value="/websocket/{topic}/{myname}")
public class WebsocketEndpoint {
    
    /**
     * 因為@ServerEndpoint不支援注入,是以使用SpringUtils擷取IOC執行個體
     */
    private StringRedisTemplate redisTampate = SpringUtils.getBean(StringRedisTemplate.class);
    
    private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
    
    //存放該伺服器該ws的所有連接配接。用處:比如向所有連接配接該ws的使用者發送通知消息。
    private static CopyOnWriteArraySet<WebsocketEndpoint> sessions = new CopyOnWriteArraySet<>();
    
    private Session session;
    
    @OnOpen
    public void onOpen(Session session,@PathParam("topic")String topic){
        System.out.println("java websocket:打開連接配接");
        this.session = session;
        sessions.add(this);
        SubscribeListener subscribeListener = new SubscribeListener();
        subscribeListener.setSession(session);
        subscribeListener.setStringRedisTemplate(redisTampate);
        //設定訂閱topic
        redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic(topic));
    }
    
    @OnClose
    public void onClose(Session session){
        System.out.println("java websocket:關閉連接配接");
        sessions.remove(this);
    }
    
    @OnMessage
    public void onMessage(Session session,String message,@PathParam("topic")String topic,@PathParam("myname")String myname) throws IOException{
        message = myname+":"+message;
        System.out.println("java websocket 收到消息=="+message);
        PublishService publishService = SpringUtils.getBean(PublishService.class);
        publishService.publish(topic, message);
    }
    
    @OnError
    public void onError(Session session,Throwable error){
        System.out.println("java websocket 出現錯誤");
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}
           

測試controller

package com.szw.learn.websocket;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.ModelAndView;

@Controller
@RequestMapping("websocket")
public class WebsocketController {
    
    @Value("${server.port}")
    private String port;
    
    public static final String INDEX = "websocket/index";
    
    /**
     * @author 七脈
     * 描述:聊天頁
     * @param topic 釋出訂閱的頻道主題
     * @param myname 釋出者的顯示名稱
     * @return
     */
    @RequestMapping("index/{topic}/{myname}")
    public ModelAndView index(@PathVariable("topic")String topic,@PathVariable("myname")String myname){
        ModelAndView mav = new ModelAndView(INDEX);
        mav.addObject("port", port);
        mav.addObject("topic",topic);
        mav.addObject("myname",myname);
        return mav;
    }
}
           

6.啟動類

package com.szw.learn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class WsMqApplication {
    public static void main(String[] args) {
        System.setProperty("spring.devtools.restart.enabled", "false");
        SpringApplication.run(WsMqApplication.class, args);
    }
}
           

7.測試頁面

<!doctype html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="utf-8"></meta>
<title>websocket叢集</title>
</head>
<body>
    本服務端口号:[[${port}]],使用redismq實作websocket叢集<br/>
    [[${topic}]] 頻道 聊天中。。。<br/>
    <input id="input_id" type="text" /><button onclick="sendMessage()">發送</button>    <button onclick="closeWebsocket()">關閉</button>
    <div id="message_id"></div>
</body>
<script type="text/javascript">
    document.getElementById('input_id').focus();
    var websocket = null;
    //目前浏覽前是否支援websocket
    if("WebSocket" in window){
        var url = "ws://127.0.0.1:[[${port}]]/websocket/[[${topic}]]/[[${myname}]]";
        websocket = new WebSocket(url);
    }else{
        alert("浏覽器不支援websocket");
    }
    
    websocket.onopen = function(event){
        setMessage("打開連接配接");
    }
    
    websocket.onclose = function(event){
        setMessage("關閉連接配接");
    }
    
    websocket.onmessage = function(event){
        setMessage(event.data);
    }
    
    websocket.onerror = function(event){
        setMessage("連接配接異常");
    }
    
    //監聽視窗關閉事件,當視窗關閉時,主動去關閉websocket連接配接,防止連接配接還沒斷開就關閉視窗,server端會抛異常。
    window.onbeforeunload = function(){
        closeWebsocket();
    }
    
    //關閉websocket
    function closeWebsocket(){
        //3代表已經關閉
        if(3!=websocket.readyState){
            websocket.close();
        }else{
            alert("websocket之前已經關閉");
        }
    }
    
    //将消息顯示在網頁上
    function setMessage(message){
        document.getElementById('message_id').innerHTML += message + '<br/>';
    }
     
    //發送消息
    function sendMessage(){
        //1代表正在連接配接
        if(1==websocket.readyState){
            var message = document.getElementById('input_id').value;
            //setMessage(message);
            websocket.send(message);
        }else{
            alert("websocket未連接配接");
        }
        document.getElementById('input_id').value="";
        document.getElementById('input_id').focus();
    }
</script>
</html>
           

繼續閱讀