什么是MQTT
MQTT官网
MQTT-WIKI
MQTT系列教程1(基本概念介绍)
MQTT消息队列遥测传输(英语:Message Queuing Telemetry Transport)是ISO 标准(ISO/IECPRF 20922)下基于发布 (Publish)/订阅 (Subscribe)范式的消息协议,可视为“资料传递的桥梁”它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件,以解决当前繁重的资料传输协议,如:HTTP。![]()
MQTT的初步学习以及编写客户端程序什么是MQTTMQTT实际应用
历史
IBM公司的安迪·斯坦福-克拉克及Arcom公司的阿兰·尼普于1999年撰写了该协议的第一个版本
MQTT 相较于HTTP, 能节省更多的资源,带来较少的传输负担,也因为这样,在制造业中,让更多人发现 IoT 在设备、厂房的无限可能,发现原来要取机台的温度这么容易,要了解厂区的产量这么方便。
概览
消息代理和客户端
MQTT 协议定义了两种网络实体:消息代理(message broker)与客户端(client)。
其中,消息代理用于接收来自客户端的消息并转发至目标客户端。
MQTT 客户端可以是任何运行有 MQTT 库并通过网络连接至消息代理的设备,例如微型控制器或大型服务器。
主题
信息的传输是通过主题(topic)管理的。发布者有需要分发的数据时,其向连接的消息代理发送携带有数据的控制消息。代理会向订阅此主题的客户端分发此数据。发布者不需要知道订阅者的数据和具体位置;同样,订阅者不需要配置发布者的相关信息。
如果消息代理接受到某个主题上的消息,且这个主题没有任何订阅,那么代理就会丢弃之,除非发布者将其标记为保留消息(retained message)。
特性
1.MQTT 控制消息最小只有 2 字节的数据。最多可以承载 256 Mb 的数据。有 14 种预定义的消息类型用于:连接客户端与代理、断开连接、发布数据、确认数据接收、监督客户端与代理的连接。
2.MQTT 基于 TCP 协议,用于数据传输。变体 MQTT-SN 用于在蓝牙上传输,基于 UDP。
3.MQTT 协议使用普通文本发送连接认证书,且并不包含任何安全或认证相关的措施。但可以使用传输层安全来加密并保护发送的数据,以防止拦截、修改或伪造。
4.MQTT 默认端口为 1883。加密的端口为 8883
QOS
以下是每一个服务质量级别的具体描述
0:最多一次传送 (只负责传送,发送过后就不管数据的传送情况)
1:至少一次传送 (确认数据交付)
2:正好一次传送 (保证数据交付成功)
MQTT实际应用
1、在本机上安装MQTT服务器和客户端软件
1-在CentOs上面部署EMQ
EMQ 是一个百万级分布式开源物联网 MQTT 消息服务器。
官网地址: https://www.emqx.io/cn/
首先执行如下命令安装 unzip:
yum -y install unzip
接着我们访问如下地址找到适合我们系统的版本:
https://www.emqx.io/cn/downloads#broker
照着下方的提示就可以安装了
安装文档
Shell 脚本一键安装 (Linux)
包管理器安装 (Linux)
1.安装所需要的依赖包
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
2.使用以下命令设置稳定存储库
以 CentOS8为例
sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/8/emqx-ce.repo
3.安装最新版本的 EMQ X Broker
sudo yum install emqx
4.启动 EMQ X Broker
#直接启动
emqx start
emqx 4.0.0 is started successfully!
emqx_ctl status
Node '[email protected]' is started
emqx v4.0.0 is running
#systemctl 启动
sudo systemctl start emqx
#service 启动
sudo service emqx start
5.停止 EMQ X Broker
emqx stop
6.卸载 EMQ X Broker
sudo yum remove emqx
7.开放端口
当 EMQ 启动之后我们就可以使用客户端进行连接了,各个服务端口如下:
1883:MQTT 协议端口
8883:MQTT/SSL 端口
8083:MQTT/WebSocket 端口
8080:HTTP API 端口
18083:Dashboard 管理控制台端口
/sbin/iptables -I INPUT -p tcp --dport 1883 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 8883 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 18083 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 8080 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 8083 -j ACCEPT
8.EMQ 提供了一个后端 Web 控制台
用户可通过 Web 控制台,查看服务器运行状态、统计数据、客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)、插件(Plugin)。
访问地址:http://服务器IP:18083
默认用户名:admin
默认密码:public
这里我们把界面改成中文
上文我们安装好服务器端(EMQ)之后,接下来就可以使用 MQTT 的客户端对 MQTT 服务器的基本功能进行相关的测试了。下面介绍一个十分好用的 MQTT 客户端工具:MQTTBox。
2-客户端工具MQTTBox的安装和使用
MQTTBox 是一个带有可视化的界面的 MQTT 的客户端工具,它具有如下特点:
- 支持 TCP、TLS、Web Sockets 和安全的 Web Sockets 连接 MQTT 服务器
- 支持各种 MQTT 客户端的设置
- 支持发布和订阅多个主题
- 支持主题的单级和多级订阅
- 复制/重新发布有效负载
- 支持查看每个主题已发布/已订阅消息的历史记录
下载
MQTTBOX可以直接在Chrom浏览器里面下载
MQTTBox - Chrome 線上應用程式商店
下载好之后在应用里面就可以打开
使用
打开创建一个新的连接
填写连接名称(可以随意输入)、协议和主机地址后,点击“Save”保存。
这里是我个人的服务器IP
保存成功后进入如下的界面,顶部绿色“Connected”按钮表明当前 MQTT 连接已经成功。
首先我们开启一个订阅,填入要订阅的主题,并选择 QoS 字段后,点击“Subscribe”按钮。
设置好订阅的主题后,在发布这边输入发布的主题(必须和订阅那边的输入的主题相同),选好 QoS 字段,Payload 里输入任意要发送的内容,完毕后点击“Publish”按钮。
我这里截图的时候已经发布过一次了
然后订阅端这边就会收到由服务器端转发的消息内容。
打开 EMQ 的管理员控制台,可以看到一些相关的统计数据已经发生了变化。比如:
- “qos0/received”的值为 2,表示 EMQ 收到了 2 条 QoS0 的消息
-
“qos0/sent”的值为 2,表示 EMQ 转发了2条 QoS0 的消息。
这和我发布了两次相符合
MQTT的初步学习以及编写客户端程序什么是MQTTMQTT实际应用
练习消息发布与订阅,比如自定义一个天气预报的消息主题。
2、利用网上提供的MQTT服务
编写MQTT客户端程序(python、java或c#、c/c++,任意一种编程语言),自定义一个天气预报主题,完成订阅与发布。思考MQTT与前面REST协议的区别。
可利用的免费在线MQTT网站: https://www.emqx.io/cn/mqtt/public-mqtt5-broker
Java实现MQTT发布和订阅
MQTT–入门
发布端
依赖:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
代码:
package demo.ein;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Mqtt1 {
public static void main(String[] args) {
String topic = "The Weather";
String content = "test message,from ein";
int qos = 0;
String broker = "tcp://45.32.36.70:1883";
String userName = "ein";
String password = "123456";
String clientId = "963c923a-9e0a-43da-af1e-4eab7f646d70";
// 内存存储
MemoryPersistence persistence = new MemoryPersistence();
try {
// 创建客户端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(false);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立连接
sampleClient.connect(connOpts);
// 创建消息
MqttMessage message = new MqttMessage(content.getBytes());
// 设置消息的服务质量
message.setQos(qos);
// 发布消息
sampleClient.publish(topic, message);
// 断开连接
sampleClient.disconnect();
// 关闭客户端
sampleClient.close();
System.out.println("ok");
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
使用Mqtt-Box测试
代码运行无误,能正常发布消息
订阅端
package demo.ein;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 订阅
* @author issuser
*
*/
public class MqttRecive {
public static void main(String[] args) throws MqttException {
String HOST = "tcp://45.32.36.70:1883";
String TOPIC = "天气预报";
int qos = 0;
String clientid = "963c923a-9e0a-43da-af1e-4eab7f646d70";
String userName = "ein";
String passWord = "123456";
try {
// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置回调函数
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("topic:" + topic);
System.out.println("Qos:" + message.getQos());
System.out.println("message content:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
client.connect(options);
// 订阅消息
client.subscribe(TOPIC, qos);
System.out.println("订阅完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试成功:
发布端 ·改:
但是仅仅是这样好像还不太够
于是在发布端添加了阿里云的天气预报接口调用
package demo.ein;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.util.EntityUtils;
import com.aliyuncs.http.HttpResponse;
public class Mqtt1 {
public static void main(String[] args) throws Exception {
String topic = "The Weather";
String content = "test message,from ein";
int qos = 0;
String broker = "tcp://45.32.36.70:1883";
String userName = "ein";
String password = "123456";
String clientId = "963c923a-9e0a-43da-af1e-4eab7f646d70";
// 内存存储
MemoryPersistence persistence = new MemoryPersistence();
String host = "https://jisutqybmf.market.alicloudapi.com";
String path = "/weather/query";
String method = "ANY";
//GET/POST 任意
String appcode = "fdf6fefc645849ceb7dee6fea319ffe8";
Map<String, String> headers = new HashMap<String, String>();
//最后在header中的格式(中间是英文空格)为Authorization:APPCODE 83359fd73fe94948385f570e3c139105
headers.put("Authorization", "APPCODE " + appcode);
Map<String, String> querys = new HashMap<String, String>();
querys.put("city", "重庆");
querys.put("citycode", "citycode");
querys.put("cityid", "cityid");
querys.put("ip", "ip");
querys.put("location", "location");
try {
org.apache.http.HttpResponse response = HttpUtils.doGet(host, path, method, headers, querys);
System.out.println(response.toString());
//获取response的body
String str=EntityUtils.toString(((org.apache.http.HttpResponse) response).getEntity(),"utf-8");
String[] strarray=str.split("}"); //遇到逗号就分割
// 创建客户端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(false);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立连接
sampleClient.connect(connOpts);
for (int i = 0; i < strarray.length; i++) {
// 创建消息
MqttMessage message = new MqttMessage(strarray[i].getBytes());
// 设置消息的服务质量
message.setQos(qos);
// 发布消息
sampleClient.publish(topic, message);
}
// 断开连接
sampleClient.disconnect();
// 关闭客户端
sampleClient.close();
System.out.println("ok");
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
在Mqtt-Box的订阅端可以看到收到了重庆的天气情况
订阅端 ·改:
既然发布端都改了,
订阅端不给整好点也说不过去
订阅端用C#来写
C#得先安装mosquitto
mosquitto 在 Windows 上的安装
新建一个控制台应用程序
然后需要安装名为M2Mqtt的NuGet程序包
这是目前最好用的C#库是 eclipse出的M2Mqtt库,项目的地址是https://github.com/eclipse/paho.mqtt.m2mqtt
下载稳定4.3.0版本
代码部分:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using System.Net;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
string topic = "The Weather";
string host = "45.32.36.70";
// 实例化Mqtt客户端
MqttClient client = new MqttClient(host);
// 注册接收消息事件
client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId);
// 订阅主题 "天气预报", 订阅质量 QoS 0
client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
}
static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
// 打印订阅的发布端消息
Console.WriteLine(string.Format("subscriber,topic:{0},content:{1}", e.Topic, Encoding.UTF8.GetString(e.Message)));
}
}
}
配合前面的发布端·改
最后实现效果如下
粗糙是粗糙了点,但是成功了