天天看点

spring实现mqtt服务端_SpringBoot 集成Mqtt,protobuf服务端搭建

代码已开源在GitHub,如果有帮助欢迎star。

Architecture

spring实现mqtt服务端_SpringBoot 集成Mqtt,protobuf服务端搭建

Mqtt-server-arch.jpg

Server side 构成

broker (mqtt核心:用于消息的发送管理) 类似 pub-sub 队列

Application Server用于处理RestFul的请求,转发为Mqtt消息

Publisher 本质是Mqtt client,用于发布server端消息到broker

Subscriber 本质是Mqtt client,用于从broker订阅client端消息

Client side

Publisher用于发布client端消息到broker

Subscriber用于从broker订阅server端的消息

Client 用于发送RestFul 请求给Application Server触发消息pub/sub

总结:从结构上Broker算是Mqtt的本质上的Server端,从业务上讲封装了Mqtt Client pub/sub的Application server和Broker共同构成了业务上的Server端

安装mosquitto及基本使用

安装

# Install Mosquitto Broker

sudo apt-get update

sudo apt-get install mosquitto

# Install the Clients

sudo apt-get install mosquitto-clients

开启、停止查看状态

# 查看状态

sudo service mosquitto status

# 使用默认配置打开mosquitto, 使用-v打开log功能

sudo mosquitto -c /etc/mosquitto/mosquitto.conf -v

# 停止

sudo service mosquitto stop

#开启

sudo service mosquitto start

使用mosquitto测试pub/sub

注意 pub和sub的clientid不能相同,相同会刷屏。

# 简单测试发布。 -h host -t topic -m message

mosquitto_pub -h localhost -t mqtt-test -m 'hello mqtt'

# 简单测试订阅。

mosquitto_sub -h localhost -t mqtt-test

# 发布设置用户密码 -u user -P password

mosquitto_pub -u admin -P admin -h localhost -t mqtt/loop/message -m 'test mqtt'

mosquitto_sub -u admin -P admin -h localhost -t mqtt/loop/message

# 指定发布clientid -i (id to use for this client)

mosquitto_sub -u admin -P admin -i shuai-ubuntu-test -h localhost -t mqtt/loop/message

mosquitto_pub -u admin -P admin -i shuai-ubuntu-test-client -h localhost -t mqtt/loop/message -m 'test mqtt client'

查看broker的log

mosquitto的默认log 地址是:/var/log/mosquitto/xxx.log

tailf /var/log/mosquitto/mosquitto.log

构建Java-Mqtt-Server(Springboot + Mqtt)

requirement依赖

mosquitto broker

可以使用Eclipse公开的broker,据说底层也是mosquitto。地址为iot.eclipse.org

可以部署安装mosquitto(本文方案)

springboot (2.1.5.RELEASE)

Eclipse Paho

curl/postman

构建springboot项目

1. 使用idea springboot initializer 初始化springboot工程

使用springboot版本2.1.5.RELEASE

2. pom中添加

org.springframework.boot

spring-boot-starter-integration

org.springframework.integration

spring-integration-stream

org.springframework.integration

spring-integration-mqtt

org.projectlombok

lombok

1.16.10

provided

3. MQTT Configuration

配置broker地址,

端口号,

是否使用ssl,

用户名

密码

public abstract class MQTTConfig {

protected final String broker = "10.156.2.132";

protected final int qos = 2;

protected Boolean hasSSL = false;

protected Integer port = 1883;

protected final String userName = "admin";

protected final String password = "admin";

protected final String TCP = "tcp://";

protected final String SSL = "ssl://";

protected abstract void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass);

protected abstract void config();

}

4. Publisher推送者

定义接口

public interface IMQTTPublisher {

public void publishMessage(String topic, String message);

public void disconnect();

}

定义类

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

@Component

public class MQTTPublisher extends MQTTConfig implements MqttCallback, IMQTTPublisher {

private String brokerUrl = null;

final private String colon = ":";

final private String clientId = "mqtt_server_pub";

private MqttClient mqttClient = null;

private MqttConnectOptions connectionOptions = null;

private MemoryPersistence persistence = null;

private static final Logger logger = LoggerFactory.getLogger(MQTTPublisher.class);

private MQTTPublisher() {

this.config();

}

private MQTTPublisher(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

this.config(broker, port, ssl, withUserNamePass);

}

public static MQTTPublisher getInstance() {

return new MQTTPublisher();

}

public static MQTTPublisher getInstance(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

return new MQTTPublisher(broker, port, ssl, withUserNamePass);

}

@Override

protected void config() {

this.brokerUrl = this.TCP + this.broker + colon + this.port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

String protocal = this.TCP;

if (true == ssl) {

protocal = this.SSL;

}

this.brokerUrl = protocal + this.broker + colon + port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

if (true == withUserNamePass) {

if (password != null) {

this.connectionOptions.setPassword(this.password.toCharArray());

}

if (userName != null) {

this.connectionOptions.setUserName(this.userName);

}

}

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

public void publishMessage(String topic, String message) {

try {

MqttMessage mqttmessage = new MqttMessage(message.getBytes());

mqttmessage.setQos(this.qos);

this.mqttClient.publish(topic, mqttmessage);

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

public void connectionLost(Throwable arg0) {

logger.info("Connection Lost");

}

@Override

public void deliveryComplete(IMqttDeliveryToken arg0) {

logger.info("delivery completed");

}

@Override

public void messageArrived(String arg0, MqttMessage arg1) throws Exception {

// Leave it blank for Publisher

}

@Override

public void disconnect() {

try {

this.mqttClient.disconnect();

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

}

5. Subscriber 订阅者

定义接口

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public interface IMQTTSubscriber {

public static final Logger logger = LoggerFactory.getLogger(IMQTTSubscriber.class);

public void subscribeMessage(String topic);

public void disconnect();

}

类定义

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import java.sql.Timestamp;

@Component

public class MQTTSubscriber extends MQTTConfig implements MqttCallback, IMQTTSubscriber {

private String brokerUrl = null;

final private String colon = ":";

final private String clientId = "mqtt_server_sub";

private MqttClient mqttClient = null;

private MqttConnectOptions connectionOptions = null;

private MemoryPersistence persistence = null;

private static final Logger logger = LoggerFactory.getLogger(MQTTSubscriber.class);

public MQTTSubscriber() {

this.config();

}

@Override

public void connectionLost(Throwable cause) {

logger.info("Connection Lost");

}

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

// Called when a message arrives from the server that matches any

// subscription made by the client

String time = new Timestamp(System.currentTimeMillis()).toString();

System.out.println();

System.out.println("***********************************************************************");

System.out.println("Message Arrived at Time: " + time + " Topic: " + topic + " Message: "

+ new String(message.getPayload()));

System.out.println("***********************************************************************");

System.out.println();

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

// Leave it blank for subscriber

}

@Override

public void subscribeMessage(String topic) {

try {

this.mqttClient.subscribe(topic, this.qos);

} catch (MqttException me) {

me.printStackTrace();

}

}

public void disconnect() {

try {

this.mqttClient.disconnect();

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

String protocal = this.TCP;

if (true == ssl) {

protocal = this.SSL;

}

this.brokerUrl = protocal + this.broker + colon + port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

if (true == withUserNamePass) {

if (password != null) {

this.connectionOptions.setPassword(this.password.toCharArray());

}

if (userName != null) {

this.connectionOptions.setUserName(this.userName);

}

}

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

me.printStackTrace();

}

}

@Override

protected void config() {

this.brokerUrl = this.TCP + this.broker + colon + this.port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

me.printStackTrace();

}

}

}

6. 构建 RestFul接口

构建Controller

import lombok.extern.slf4j.Slf4j;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.*;

import javax.annotation.PostConstruct;

@Slf4j

@RestController

public class DemoRestController {

public static String TOPIC_LOOP_TEST = "mqtt/loop/message";

@Autowired

IMQTTPublisher publisher;

@Autowired

IMQTTSubscriber subscriber;

@PostConstruct

public void init() {

subscriber.subscribeMessage(TOPIC_LOOP_TEST);

}

@RequestMapping(value = "/mqtt/loop/message", method = RequestMethod.POST)

public String index(@RequestBody String data) {

publisher.publishMessage(TOPIC_LOOP_TEST, data);

return "Success";

}

}

7. 使用curl命令进行api调用测试

❯ curl -X POST "http://127.0.0.1:8080/mqtt/loop/message" -d "test"

Success%

# springboot 窗口中可以看到自己sub的回显

***********************************************************************

Message Arrived at Time: 2019-05-21 16:11:13.675 Topic: mqtt/loop/message Message: test=

***********************************************************************

也可以使用postman 调用8080 端口调试。

构建Java-Mqtt-Server (Springboot + Mqtt +protobuf)

在现有基础上添加protobuf包装pub/sub 消息

1. proto文件

将.proto文件放到src/main/proto/下

2. 使用maven生成protobuf java代码

pom中properties中添加

1.8

1.6.1

3.3.0

pom dependencies中添加

io.grpc

grpc-netty

${grpc.version}

provided

io.grpc

grpc-protobuf

${grpc.version}

provided

io.grpc

grpc-stub

${grpc.version}

provided

com.google.protobuf

protobuf-java

${protobuf.version}

pom build中添加,pom plugins中添加

kr.motd.maven

os-maven-plugin

1.5.0.Final

org.springframework.boot

spring-boot-maven-plugin

org.xolstice.maven.plugins

protobuf-maven-plugin

0.5.0

com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}

grpc-java

io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}

compile

compile-custom

使用IDE中右侧Maven Projects -> Lifecycle ->compile 生成java对应的protobuf文件

生成的路径在:target/generated-sources/protobuf/java/对应的包名下

3. 使用proto封装mqtt message

使用Proto中的newBuilder构建builder。使用builder中的set方法设置proto中的参数,例如:

KylinProto.Group.Builder builder = KylinProto.Group.newBuilder();

KylinProto.Group group = builder.setThreshold(85.f)

.setTop(1)

.setGroup(group_name).build();

publisher.publish(topic, group.toByteArray(), 2, false);