天天看點

(二十七)admin-boot項目之內建websocket實時推送消息(二十七)內建websocket實時推送消息

文章目錄

  • (二十七)內建websocket實時推送消息
    • 一、rabbitmq推送方案
    • 二、websocket方案(mica-mqtt-core)
    • 三、mica-mqtt方案測試

(二十七)內建websocket實時推送消息

基礎項目位址:

https://gitee.com/springzb/admin-boot

此次整合推送消息有兩種方案:

1. 采用 rabbitmq 詳細插件 rabbitmq_web_stomp

2.采用 websocket的方案(mica-mqtt-core底層采用 t-io 架構, mica-mqtt-core提供了 mqtt 和 websocket 兩種通信協定)

方案一 适用于連接配接少,消息多的情況

方案二 适用于連接配接多,消息多的情況

rabbitmq方案具體實施檢視《(二十五)內建rabbitmq消息推送》

引入依賴:

<!--整合 rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--mica-mqtt依賴-->
<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-core</artifactId>
    <version>1.2.10</version>
</dependency>
           

統一配置:

package cn.mesmile.admin.modules.message;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @author zb
 * @Description
 */
@Data
@ConfigurationProperties(prefix = "admin.message")
public class MessageProperties {

    /**
     * 是否啟用消息發送
     */
    private Boolean enabled = Boolean.FALSE;

    /**
     * 發送消息底層調用邏輯
     */
    private MessageServiceTypeEnum serviceType = MessageServiceTypeEnum.RABBIT_MQ;

    /**
     * mqtt配置 服務端ip
     */
    private String mqttServerIp = "0.0.0.0";

    /**
     * mqtt服務端 端口号
     */
    private Integer mqttServerPort = 1883;

    /**
     * webSocket端口号
     */
    private Integer mqttServerWebsocketPort = 5883;

}
           
package cn.mesmile.admin.modules.message;

/**
 * @author zb
 * @Description 發送消息調用的底層服務
 */
public enum  MessageServiceTypeEnum {

    /**
     * rabbitMq
     */
    RABBIT_MQ,
    /**
     * rocketMq
     */
    ROCKET_MQ,
    /**
     * mqtt
     */
    MQTT
    
}

           

一、rabbitmq推送方案

package cn.mesmile.admin.modules.message.operational;

import cn.mesmile.admin.common.rabbit.constant.RabbitConstant;
import cn.mesmile.admin.modules.message.vo.MqMessageVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Set;

/**
 * rabbitmq方式實作接口
 * 生産者-直連交換機模式
 * 此模式下路由鍵比對,同一賬号多處線上都可接收到消息,未登入賬号無法接收推送,且再登入賬号不會有推送,路由消息丢失,适用于web端
 *
 * @author zb
 * @Description
 */
@Slf4j
@Component
@ConditionalOnProperty(
  name = "admin.message.service-type",
  havingValue = "rabbit_mq"
)
public class RabbitMqSendServiceImpl implements ISendService{

  /**
     rabbitmq-plugins enable rabbitmq_web_stomp rabbitmq_web_stomp_examples
     直連交換機
  String MESSAGE_EXCHANGE_DIRECT = "message.direct.exchange";
   普通通知消息
   String MESSAGE_QUEUE_SINGLE_ROUTE_KEY = "message.single_";
   群發通知消息
  String MESSAGE_QUEUE_GROUP_ROUTE_KEY = "message.group_";
   */

    @Resource
    public RabbitTemplate rabbitTemplate;

  @Override
  public void sendSingleMessage(String receiveUserId, MqMessageVO mqMessageVO) {
    try {
      rabbitTemplate.convertAndSend(RabbitConstant.MESSAGE_EXCHANGE_DIRECT,
          RabbitConstant.MESSAGE_QUEUE_SINGLE_ROUTE_KEY + receiveUserId, JSONObject.toJSONString(mqMessageVO));
    }catch (Exception e){
      log.error("發送消息失敗:",e);
    }
  }

  @Override
  public void sendGroupMessage(Set<String> userSet, MqMessageVO mqMessageVO) {
    try {
      userSet.forEach(
          userId-> rabbitTemplate.convertAndSend(RabbitConstant.MESSAGE_EXCHANGE_DIRECT,
          RabbitConstant.MESSAGE_QUEUE_SINGLE_ROUTE_KEY +userId, JSONObject.toJSONString(mqMessageVO))
      );
    }catch (Exception e){
      log.error("發送消息失敗:",e);
    }
  }
}
           

采用rabbitmq 前端需要安裝插件 stomp 插件,對接rabbitmq 消息

安裝使用stomp插件請參考:https://blog.csdn.net/suprezheng/article/details/126613531 【插件】

二、websocket方案(mica-mqtt-core)

mica-mqtt-core

https://gitee.com/596392912/mica-mqtt

package cn.mesmile.admin.modules.message;

import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;

/**
 * @author zb
 * @Description
 */
@Slf4j
@Configuration(
        proxyBeanMethods = false
)
@EnableConfigurationProperties({MessageProperties.class})
@ConditionalOnProperty(
        value = {"admin.message.enabled"},
        havingValue = "true"
)
public class MessageConfiguration {

    /**
     *  用戶端工具下載下傳: https://mqttx.app/
     *  mqtt://127.0.0.1:1883
     *  ws://127.0.0.1:5883
     *
     *  TODO 切換為 springboot 方式
     * https://gitee.com/596392912/mica-mqtt/blob/master/starter/mica-mqtt-server-spring-boot-starter/README.md
     */
    @ConditionalOnProperty(
            value = {"admin.message.service-type"},
            havingValue = "mqtt"
    )
    @Bean
    public MqttServer get(MessageProperties messageProperties) {
        // 注意:為了能接受更多連結(降低記憶體),請添加 jvm 參數 -Xss129k
        MqttServer mqttServer = MqttServer.create()
                // 預設:0.0.0.0
                .ip(messageProperties.getMqttServerIp())
                // 預設:1883
                .port(messageProperties.getMqttServerPort())
                .webPort(messageProperties.getMqttServerWebsocketPort())
                // 預設為: 8092(mqtt 預設最大消息大小),為了降低記憶體可以減小小此參數,如果消息過大 t-io 會嘗試解析多次
                .readBufferSize(512)
                // 最大包體長度,如果包體過大需要設定此參數,預設為: 8092
                .maxBytesInMessage(1024 * 10)
                // 自定義認證
                .authHandler((context, uniqueId, clientId, userName, password) -> true)
                // 消息監聽
                .messageListener((context, clientId, message) -> {
                    log.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
                })
                .connectStatusListener(new IMqttConnectStatusListener() {
                    @Override
                    public void online(ChannelContext channelContext, String s) {
                        log.info("--------用戶端【上線】----------{}, 用戶端id:{}",channelContext, s);
                    }

                    @Override
                    public void offline(ChannelContext channelContext, String s) {
                        log.info("--------用戶端【下線】----------{}, 用戶端id:{}",channelContext, s);
                    }
                })
                .httpEnable(true)
                .debug() // 開啟 debug 資訊日志
                .start();
        return mqttServer;
    }


}

           
package cn.mesmile.admin.modules.message.operational;

import cn.mesmile.admin.modules.message.vo.MqMessageVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Set;

/**
 * @author zb
 * @Description
 */
@Slf4j
@ConditionalOnProperty(
    value = {"admin.message.service-type"},
    havingValue = "mqtt"
)
@Component
public class MqttSendServiceImpl implements  ISendService{

  @Resource
  private MqttServer mqttServer;

  @Override
  public void sendSingleMessage(String receiveUserId, MqMessageVO mqMessageVO) {
    boolean b = mqttServer.publishAll("/userId_" + receiveUserId,
        ByteBuffer.wrap(JSONObject.toJSONString(mqMessageVO).getBytes(StandardCharsets.UTF_8)));
  }

  @Override
  public void sendGroupMessage(Set<String> userSet, MqMessageVO mqMessageVO) {
    userSet.forEach(
        userId-> this.sendSingleMessage(userId, mqMessageVO)
    );
  }

}
           

三、mica-mqtt方案測試

@Resource
    private ISendService sendService;

  /**
   *  用戶端工具:https://mqttx.app/
   */
  @GetMapping("/message")
  public R message(){
      MqMessageVO mqMessageVO = new MqMessageVO();
      mqMessageVO.setReceiveUser("123");
      mqMessageVO.setContent("設定内容11111111111");
      sendService.sendSingleMessage("123",mqMessageVO);
      return R.data("test");
  }
           

用戶端工具下載下傳: https://mqttx.app/

mqtt://127.0.0.1:1883

ws://127.0.0.1:5883

(二十七)admin-boot項目之內建websocket實時推送消息(二十七)內建websocket實時推送消息