天天看點

spring實作mqtt服務端_SpringBoot 內建Mqtt,protobuf服務端搭建

代碼已開源在GitHub,如果有幫助歡迎star。

Architecture

spring實作mqtt服務端_SpringBoot 內建Mqtt,protobuf服務端搭建

Mqtt-server-arch.jpg

Server side 構成

broker (mqtt核心:用于消息的發送管理) 類似 pub-sub 隊列

Application Server用于處理RestFul的請求,轉發為Mqtt消息

Publisher 本質是Mqtt client,用于釋出server端消息到broker

Subscriber 本質是Mqtt client,用于從broker訂閱client端消息

Client side

Publisher用于釋出client端消息到broker

Subscriber用于從broker訂閱server端的消息

Client 用于發送RestFul 請求給Application Server觸發消息pub/sub

總結:從結構上Broker算是Mqtt的本質上的Server端,從業務上講封裝了Mqtt Client pub/sub的Application server和Broker共同構成了業務上的Server端

安裝mosquitto及基本使用

安裝

# Install Mosquitto Broker

sudo apt-get update

sudo apt-get install mosquitto

# Install the Clients

sudo apt-get install mosquitto-clients

開啟、停止檢視狀态

# 檢視狀态

sudo service mosquitto status

# 使用預設配置打開mosquitto, 使用-v打開log功能

sudo mosquitto -c /etc/mosquitto/mosquitto.conf -v

# 停止

sudo service mosquitto stop

#開啟

sudo service mosquitto start

使用mosquitto測試pub/sub

注意 pub和sub的clientid不能相同,相同會刷屏。

# 簡單測試釋出。 -h host -t topic -m message

mosquitto_pub -h localhost -t mqtt-test -m 'hello mqtt'

# 簡單測試訂閱。

mosquitto_sub -h localhost -t mqtt-test

# 釋出設定使用者密碼 -u user -P password

mosquitto_pub -u admin -P admin -h localhost -t mqtt/loop/message -m 'test mqtt'

mosquitto_sub -u admin -P admin -h localhost -t mqtt/loop/message

# 指定釋出clientid -i (id to use for this client)

mosquitto_sub -u admin -P admin -i shuai-ubuntu-test -h localhost -t mqtt/loop/message

mosquitto_pub -u admin -P admin -i shuai-ubuntu-test-client -h localhost -t mqtt/loop/message -m 'test mqtt client'

檢視broker的log

mosquitto的預設log 位址是:/var/log/mosquitto/xxx.log

tailf /var/log/mosquitto/mosquitto.log

建構Java-Mqtt-Server(Springboot + Mqtt)

requirement依賴

mosquitto broker

可以使用Eclipse公開的broker,據說底層也是mosquitto。位址為iot.eclipse.org

可以部署安裝mosquitto(本文方案)

springboot (2.1.5.RELEASE)

Eclipse Paho

curl/postman

建構springboot項目

1. 使用idea springboot initializer 初始化springboot工程

使用springboot版本2.1.5.RELEASE

2. pom中添加

org.springframework.boot

spring-boot-starter-integration

org.springframework.integration

spring-integration-stream

org.springframework.integration

spring-integration-mqtt

org.projectlombok

lombok

1.16.10

provided

3. MQTT Configuration

配置broker位址,

端口号,

是否使用ssl,

使用者名

密碼

public abstract class MQTTConfig {

protected final String broker = "10.156.2.132";

protected final int qos = 2;

protected Boolean hasSSL = false;

protected Integer port = 1883;

protected final String userName = "admin";

protected final String password = "admin";

protected final String TCP = "tcp://";

protected final String SSL = "ssl://";

protected abstract void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass);

protected abstract void config();

}

4. Publisher推送者

定義接口

public interface IMQTTPublisher {

public void publishMessage(String topic, String message);

public void disconnect();

}

定義類

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

@Component

public class MQTTPublisher extends MQTTConfig implements MqttCallback, IMQTTPublisher {

private String brokerUrl = null;

final private String colon = ":";

final private String clientId = "mqtt_server_pub";

private MqttClient mqttClient = null;

private MqttConnectOptions connectionOptions = null;

private MemoryPersistence persistence = null;

private static final Logger logger = LoggerFactory.getLogger(MQTTPublisher.class);

private MQTTPublisher() {

this.config();

}

private MQTTPublisher(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

this.config(broker, port, ssl, withUserNamePass);

}

public static MQTTPublisher getInstance() {

return new MQTTPublisher();

}

public static MQTTPublisher getInstance(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

return new MQTTPublisher(broker, port, ssl, withUserNamePass);

}

@Override

protected void config() {

this.brokerUrl = this.TCP + this.broker + colon + this.port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

String protocal = this.TCP;

if (true == ssl) {

protocal = this.SSL;

}

this.brokerUrl = protocal + this.broker + colon + port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

if (true == withUserNamePass) {

if (password != null) {

this.connectionOptions.setPassword(this.password.toCharArray());

}

if (userName != null) {

this.connectionOptions.setUserName(this.userName);

}

}

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

public void publishMessage(String topic, String message) {

try {

MqttMessage mqttmessage = new MqttMessage(message.getBytes());

mqttmessage.setQos(this.qos);

this.mqttClient.publish(topic, mqttmessage);

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

public void connectionLost(Throwable arg0) {

logger.info("Connection Lost");

}

@Override

public void deliveryComplete(IMqttDeliveryToken arg0) {

logger.info("delivery completed");

}

@Override

public void messageArrived(String arg0, MqttMessage arg1) throws Exception {

// Leave it blank for Publisher

}

@Override

public void disconnect() {

try {

this.mqttClient.disconnect();

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

}

5. Subscriber 訂閱者

定義接口

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public interface IMQTTSubscriber {

public static final Logger logger = LoggerFactory.getLogger(IMQTTSubscriber.class);

public void subscribeMessage(String topic);

public void disconnect();

}

類定義

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import java.sql.Timestamp;

@Component

public class MQTTSubscriber extends MQTTConfig implements MqttCallback, IMQTTSubscriber {

private String brokerUrl = null;

final private String colon = ":";

final private String clientId = "mqtt_server_sub";

private MqttClient mqttClient = null;

private MqttConnectOptions connectionOptions = null;

private MemoryPersistence persistence = null;

private static final Logger logger = LoggerFactory.getLogger(MQTTSubscriber.class);

public MQTTSubscriber() {

this.config();

}

@Override

public void connectionLost(Throwable cause) {

logger.info("Connection Lost");

}

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

// Called when a message arrives from the server that matches any

// subscription made by the client

String time = new Timestamp(System.currentTimeMillis()).toString();

System.out.println();

System.out.println("***********************************************************************");

System.out.println("Message Arrived at Time: " + time + " Topic: " + topic + " Message: "

+ new String(message.getPayload()));

System.out.println("***********************************************************************");

System.out.println();

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

// Leave it blank for subscriber

}

@Override

public void subscribeMessage(String topic) {

try {

this.mqttClient.subscribe(topic, this.qos);

} catch (MqttException me) {

me.printStackTrace();

}

}

public void disconnect() {

try {

this.mqttClient.disconnect();

} catch (MqttException me) {

logger.error("ERROR", me);

}

}

@Override

protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {

String protocal = this.TCP;

if (true == ssl) {

protocal = this.SSL;

}

this.brokerUrl = protocal + this.broker + colon + port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

if (true == withUserNamePass) {

if (password != null) {

this.connectionOptions.setPassword(this.password.toCharArray());

}

if (userName != null) {

this.connectionOptions.setUserName(this.userName);

}

}

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

me.printStackTrace();

}

}

@Override

protected void config() {

this.brokerUrl = this.TCP + this.broker + colon + this.port;

this.persistence = new MemoryPersistence();

this.connectionOptions = new MqttConnectOptions();

try {

this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);

this.connectionOptions.setCleanSession(true);

this.mqttClient.connect(this.connectionOptions);

this.mqttClient.setCallback(this);

} catch (MqttException me) {

me.printStackTrace();

}

}

}

6. 建構 RestFul接口

建構Controller

import lombok.extern.slf4j.Slf4j;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.*;

import javax.annotation.PostConstruct;

@Slf4j

@RestController

public class DemoRestController {

public static String TOPIC_LOOP_TEST = "mqtt/loop/message";

@Autowired

IMQTTPublisher publisher;

@Autowired

IMQTTSubscriber subscriber;

@PostConstruct

public void init() {

subscriber.subscribeMessage(TOPIC_LOOP_TEST);

}

@RequestMapping(value = "/mqtt/loop/message", method = RequestMethod.POST)

public String index(@RequestBody String data) {

publisher.publishMessage(TOPIC_LOOP_TEST, data);

return "Success";

}

}

7. 使用curl指令進行api調用測試

❯ curl -X POST "http://127.0.0.1:8080/mqtt/loop/message" -d "test"

Success%

# springboot 視窗中可以看到自己sub的回顯

***********************************************************************

Message Arrived at Time: 2019-05-21 16:11:13.675 Topic: mqtt/loop/message Message: test=

***********************************************************************

也可以使用postman 調用8080 端口調試。

建構Java-Mqtt-Server (Springboot + Mqtt +protobuf)

在現有基礎上添加protobuf包裝pub/sub 消息

1. proto檔案

将.proto檔案放到src/main/proto/下

2. 使用maven生成protobuf java代碼

pom中properties中添加

1.8

1.6.1

3.3.0

pom dependencies中添加

io.grpc

grpc-netty

${grpc.version}

provided

io.grpc

grpc-protobuf

${grpc.version}

provided

io.grpc

grpc-stub

${grpc.version}

provided

com.google.protobuf

protobuf-java

${protobuf.version}

pom build中添加,pom plugins中添加

kr.motd.maven

os-maven-plugin

1.5.0.Final

org.springframework.boot

spring-boot-maven-plugin

org.xolstice.maven.plugins

protobuf-maven-plugin

0.5.0

com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}

grpc-java

io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}

compile

compile-custom

使用IDE中右側Maven Projects -> Lifecycle ->compile 生成java對應的protobuf檔案

生成的路徑在:target/generated-sources/protobuf/java/對應的包名下

3. 使用proto封裝mqtt message

使用Proto中的newBuilder建構builder。使用builder中的set方法設定proto中的參數,例如:

KylinProto.Group.Builder builder = KylinProto.Group.newBuilder();

KylinProto.Group group = builder.setThreshold(85.f)

.setTop(1)

.setGroup(group_name).build();

publisher.publish(topic, group.toByteArray(), 2, false);