前言
由于項目需要,目前需要使用Emqttd搭建一個聊天室,自己寫了個demo,特記錄下來
代碼
使用IDEA搭建一個Spring Boot工程
pom.xml檔案,此處我隻列出dependencies部分
com.alibaba
fastjson
1.2.49
org.fusesource.mqtt-client
mqtt-client
1.14
org.springframework.boot
spring-boot-starter-freemarker
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-devtools
runtime
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
application.properties檔案
server.port=8080
emqt.server=填你的Emqttd伺服器位址
emqt.port=1883
emqt.host=tcp://${emqt.server}:${emqt.port}
emqt.clientId=spring-boot-client-${server.port}
emqt.subcribe.topic=/live_test/#
EmqtConfig.java,用于初始化MQTT用戶端,配置監聽器等
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.mqtt.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Configuration
public class EmqtConfig {
@Value("${emqt.host}")
private String host;
@Value("${emqt.clientId}")
private String clientId;
@Value("${emqt.subcribe.topic}")
private String topicName;
private static Lock lock = new ReentrantLock();
private static Map onlineMap = new ConcurrentHashMap<>();
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Bean
public MQTT mqtt() {
try {
logger.info("====連接配接到mqtt===");
MQTT mqtt = new MQTT();
mqtt.setHost(host);
mqtt.setClientId(clientId);
mqtt.setReconnectDelay(100);
mqtt.setKeepAlive((short) 20);
return mqtt;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
@Bean
public CallbackConnection callbackConnection(MQTT mqtt) {
try {
CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {
@Override
public void onConnected() {
logger.info("連接配接成功");
}
@Override
public void onDisconnected() {
logger.info("斷開連接配接");
}
@Override
public void onPublish(UTF8Buffer topic, Buffer message, Runnable callback) {
try {
lock.lock();
logger.info("收到topic:" + topic.toString() + "消息為:" + message.utf8());
//表示監聽成功
String topicName = topic.toString();
if (topicName.startsWith("/liveOnline")) {
Long liveId = findNum(topicName);
Integer integer = onlineMap.get(liveId);
if (integer == null) {
integer = 0;
}
onlineMap.put(liveId, ++integer);
}
}finally {
callback.run();
lock.unlock();
}
}
@Override
public void onFailure(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
});
connection.connect(new Callback() {
@Override
public void onSuccess(Void aVoid) {
//連接配接成功後會預設訂閱主題($client/mengsu)
logger.info(clientId + "連接配接成功");
}
@Override
public void onFailure(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
});
//建立一個主題
Topic[] topic = new Topic[]{new Topic(topicName, QoS.EXACTLY_ONCE),new Topic("/liveOnline/#",QoS.EXACTLY_ONCE)};
connection.subscribe(topic, new Callback() {
@Override
public void onSuccess(byte[] bytes) {
logger.info(clientId + " topic訂閱成功");
}
@Override
public void onFailure(Throwable throwable) {
logger.info(clientId + " topic訂閱 失敗");
logger.error(throwable.getMessage(), throwable);
}
});
DispatchQueue dispatchQueue = connection.getDispatchQueue();
dispatchQueue.execute(new Runnable() {
public void run() {
//在這裡進行相應的訂閱、釋出、停止連接配接等等操作
System.out.println("在這裡進行相應的訂閱、釋出、停止連接配接等等操作");
}
});
return connection;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
private static Long findNum(String str) {
String regEx="[^0-9]";
Pattern p = Pattern.compile(regEx);
Matcher m = p.matcher(str);
String result = m.replaceAll("").trim();
return Long.valueOf(result);
}
public int getOnlineCount(Long liveId){
try {
lock.lock();
Integer integer = onlineMap.get(liveId);
return integer == null ? 0 : integer;
}finally {
lock.unlock();
}
}
}
至此,MQTT用戶端就配置好了,下面是controller
IndexController.java
import com.alibaba.fastjson.JSONObject;
import com.example.emqtdemo.emqt.EmqtConfig;
import org.fusesource.mqtt.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
@Controller
public class IndexController {
@Autowired
private HttpServletRequest request;
@Resource
private CallbackConnection callbackConnection;
@Resource
private EmqtConfig emqtConfig;
@RequestMapping("/")
public String index(Long liveId,String username){
request.setAttribute("liveId", liveId);
request.setAttribute("username", username);
request.setAttribute("clientId","liveroom" + liveId + username);
request.setAttribute("topic","/live_test/" + liveId);
return "index";
}
// 發送消息
@RequestMapping("/send")
@ResponseBody
public Object send(String topic, String clientId,String msg) {
JSONObject content = new JSONObject();
content.put("clientId", clientId);
content.put("msg", msg);
callbackConnection.publish(topic, content.toJSONString().getBytes(), QoS.EXACTLY_ONCE, false,new Callback() {
@Override
public void onSuccess(Void aVoid) {
}
@Override
public void onFailure(Throwable throwable) {
}
});
JSONObject jsonObject = new JSONObject();
jsonObject.put("success", true);
jsonObject.put("content", msg);
return jsonObject;
}
// 擷取線上人數
@RequestMapping("/getOnlineCount")
@ResponseBody
public Object getOnlineCount(Long liveId) {
int onlineCount = emqtConfig.getOnlineCount(liveId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("success", true);
jsonObject.put("content", onlineCount);
return jsonObject;
}
}
index.ftl檔案,用于展示
title
${username!}成功進入了${liveId!}直播間
目前線上人數:0
輸入消息:
發送
取消
var clientId = '${clientId!}';
var username = '${username!}';
var topic = '${topic!}';
var liveId = '${liveId!}';
index.js,用于控制頁面的一些邏輯
$(document).ready(function () {
// 将在全局初始化一個 mqtt 變量
console.log(mqtt)
// 連接配接選項
const options = {
connectTimeout: 4000, // 逾時時間
// 認證資訊
clientId: clientId, // 用戶端id 這個自己填 盡量唯一
username: username, // 取目前使用者的名字
password: '123',
}
const client = mqtt.connect('ws://your address:8083/mqtt', options)
// let topic = "/live_dev/${liveId}"
client.on('connect', (e) => {
console.log('成功連接配接伺服器')
$("#connectionTip").html("成功連接配接到消息伺服器")
// 訂閱一個主題
client.subscribe(topic, { qos: 2 }, (error) => {
if (!error) {
console.log('訂閱成功')
}
},onSubscribeSuccess)
})
client.on('reconnect', (error) => {
console.log('正在重連:' + error)
})
client.on('error', (error) => {
console.log('連接配接失敗:' + error)
})
function onSubscribeSuccess() {
client.publish('/liveOnline/' + liveId, liveId, { qos: 2, rein: false }, (error) => {
console.log(error || '釋出成功')
})
}
function onUnubscribeSuccess() {
console.log("onUnubscribeSuccess")
}
// 監聽接收消息事件
client.on('message', (topic, message,callback) => {
console.log('收到來自', topic, '的消息', message.toString());
let html = ` 收到來自 ${topic} 的消息:${ message.toString()}
`;
$("#msgContent").append(html);
})
$("#send").click(function () {
let conent = $("input[name='content']").val();
$.post("/send",{clientId:clientId,msg:conent,topic:topic},function (data) {
if (data.success){
console.log("發送成功")
}
},'json')
})
setInterval(function () {
$.post("/getOnlineCount",{liveId:liveId},function (data) {
if (data.success){
$("#onlineCount").html(data.content)
}
},'json')
},3000)
})
參考的文章
代碼位址