天天看點

emqttd java 即時通訊_使用Emqttd搭建一個聊天室

前言

由于項目需要,目前需要使用Emqttd搭建一個聊天室,自己寫了個demo,特記錄下來

代碼

使用IDEA搭建一個Spring Boot工程

pom.xml檔案,此處我隻列出dependencies部分

com.alibaba

fastjson

1.2.49

org.fusesource.mqtt-client

mqtt-client

1.14

org.springframework.boot

spring-boot-starter-freemarker

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-devtools

runtime

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

application.properties檔案

server.port=8080

emqt.server=填你的Emqttd伺服器位址

emqt.port=1883

emqt.host=tcp://${emqt.server}:${emqt.port}

emqt.clientId=spring-boot-client-${server.port}

emqt.subcribe.topic=/live_test/#

EmqtConfig.java,用于初始化MQTT用戶端,配置監聽器等

import org.fusesource.hawtbuf.Buffer;

import org.fusesource.hawtbuf.UTF8Buffer;

import org.fusesource.hawtdispatch.DispatchQueue;

import org.fusesource.mqtt.client.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

@Configuration

public class EmqtConfig {

@Value("${emqt.host}")

private String host;

@Value("${emqt.clientId}")

private String clientId;

@Value("${emqt.subcribe.topic}")

private String topicName;

private static Lock lock = new ReentrantLock();

private static Map onlineMap = new ConcurrentHashMap<>();

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Bean

public MQTT mqtt() {

try {

logger.info("====連接配接到mqtt===");

MQTT mqtt = new MQTT();

mqtt.setHost(host);

mqtt.setClientId(clientId);

mqtt.setReconnectDelay(100);

mqtt.setKeepAlive((short) 20);

return mqtt;

} catch (Exception e) {

logger.error(e.getMessage(), e);

return null;

}

}

@Bean

public CallbackConnection callbackConnection(MQTT mqtt) {

try {

CallbackConnection connection = mqtt.callbackConnection();

connection.listener(new Listener() {

@Override

public void onConnected() {

logger.info("連接配接成功");

}

@Override

public void onDisconnected() {

logger.info("斷開連接配接");

}

@Override

public void onPublish(UTF8Buffer topic, Buffer message, Runnable callback) {

try {

lock.lock();

logger.info("收到topic:" + topic.toString() + "消息為:" + message.utf8());

//表示監聽成功

String topicName = topic.toString();

if (topicName.startsWith("/liveOnline")) {

Long liveId = findNum(topicName);

Integer integer = onlineMap.get(liveId);

if (integer == null) {

integer = 0;

}

onlineMap.put(liveId, ++integer);

}

}finally {

callback.run();

lock.unlock();

}

}

@Override

public void onFailure(Throwable throwable) {

logger.error(throwable.getMessage(), throwable);

}

});

connection.connect(new Callback() {

@Override

public void onSuccess(Void aVoid) {

//連接配接成功後會預設訂閱主題($client/mengsu)

logger.info(clientId + "連接配接成功");

}

@Override

public void onFailure(Throwable throwable) {

logger.error(throwable.getMessage(), throwable);

}

});

//建立一個主題

Topic[] topic = new Topic[]{new Topic(topicName, QoS.EXACTLY_ONCE),new Topic("/liveOnline/#",QoS.EXACTLY_ONCE)};

connection.subscribe(topic, new Callback() {

@Override

public void onSuccess(byte[] bytes) {

logger.info(clientId + " topic訂閱成功");

}

@Override

public void onFailure(Throwable throwable) {

logger.info(clientId + " topic訂閱 失敗");

logger.error(throwable.getMessage(), throwable);

}

});

DispatchQueue dispatchQueue = connection.getDispatchQueue();

dispatchQueue.execute(new Runnable() {

public void run() {

//在這裡進行相應的訂閱、釋出、停止連接配接等等操作

System.out.println("在這裡進行相應的訂閱、釋出、停止連接配接等等操作");

}

});

return connection;

} catch (Exception e) {

logger.error(e.getMessage(), e);

return null;

}

}

private static Long findNum(String str) {

String regEx="[^0-9]";

Pattern p = Pattern.compile(regEx);

Matcher m = p.matcher(str);

String result = m.replaceAll("").trim();

return Long.valueOf(result);

}

public int getOnlineCount(Long liveId){

try {

lock.lock();

Integer integer = onlineMap.get(liveId);

return integer == null ? 0 : integer;

}finally {

lock.unlock();

}

}

}

至此,MQTT用戶端就配置好了,下面是controller

IndexController.java

import com.alibaba.fastjson.JSONObject;

import com.example.emqtdemo.emqt.EmqtConfig;

import org.fusesource.mqtt.client.*;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

import javax.servlet.http.HttpServletRequest;

@Controller

public class IndexController {

@Autowired

private HttpServletRequest request;

@Resource

private CallbackConnection callbackConnection;

@Resource

private EmqtConfig emqtConfig;

@RequestMapping("/")

public String index(Long liveId,String username){

request.setAttribute("liveId", liveId);

request.setAttribute("username", username);

request.setAttribute("clientId","liveroom" + liveId + username);

request.setAttribute("topic","/live_test/" + liveId);

return "index";

}

// 發送消息

@RequestMapping("/send")

@ResponseBody

public Object send(String topic, String clientId,String msg) {

JSONObject content = new JSONObject();

content.put("clientId", clientId);

content.put("msg", msg);

callbackConnection.publish(topic, content.toJSONString().getBytes(), QoS.EXACTLY_ONCE, false,new Callback() {

@Override

public void onSuccess(Void aVoid) {

}

@Override

public void onFailure(Throwable throwable) {

}

});

JSONObject jsonObject = new JSONObject();

jsonObject.put("success", true);

jsonObject.put("content", msg);

return jsonObject;

}

// 擷取線上人數

@RequestMapping("/getOnlineCount")

@ResponseBody

public Object getOnlineCount(Long liveId) {

int onlineCount = emqtConfig.getOnlineCount(liveId);

JSONObject jsonObject = new JSONObject();

jsonObject.put("success", true);

jsonObject.put("content", onlineCount);

return jsonObject;

}

}

index.ftl檔案,用于展示

title

${username!}成功進入了${liveId!}直播間

目前線上人數:0

輸入消息:

發送

取消

var clientId = '${clientId!}';

var username = '${username!}';

var topic = '${topic!}';

var liveId = '${liveId!}';

index.js,用于控制頁面的一些邏輯

$(document).ready(function () {

// 将在全局初始化一個 mqtt 變量

console.log(mqtt)

// 連接配接選項

const options = {

connectTimeout: 4000, // 逾時時間

// 認證資訊

clientId: clientId, // 用戶端id 這個自己填 盡量唯一

username: username, // 取目前使用者的名字

password: '123',

}

const client = mqtt.connect('ws://your address:8083/mqtt', options)

// let topic = "/live_dev/${liveId}"

client.on('connect', (e) => {

console.log('成功連接配接伺服器')

$("#connectionTip").html("成功連接配接到消息伺服器")

// 訂閱一個主題

client.subscribe(topic, { qos: 2 }, (error) => {

if (!error) {

console.log('訂閱成功')

}

},onSubscribeSuccess)

})

client.on('reconnect', (error) => {

console.log('正在重連:' + error)

})

client.on('error', (error) => {

console.log('連接配接失敗:' + error)

})

function onSubscribeSuccess() {

client.publish('/liveOnline/' + liveId, liveId, { qos: 2, rein: false }, (error) => {

console.log(error || '釋出成功')

})

}

function onUnubscribeSuccess() {

console.log("onUnubscribeSuccess")

}

// 監聽接收消息事件

client.on('message', (topic, message,callback) => {

console.log('收到來自', topic, '的消息', message.toString());

let html = ` 收到來自 ${topic} 的消息:${ message.toString()}

`;

$("#msgContent").append(html);

})

$("#send").click(function () {

let conent = $("input[name='content']").val();

$.post("/send",{clientId:clientId,msg:conent,topic:topic},function (data) {

if (data.success){

console.log("發送成功")

}

},'json')

})

setInterval(function () {

$.post("/getOnlineCount",{liveId:liveId},function (data) {

if (data.success){

$("#onlineCount").html(data.content)

}

},'json')

},3000)

})

參考的文章

代碼位址