什麼是MQTT
MQTT官網
MQTT-WIKI
MQTT系列教程1(基本概念介紹)
MQTT消息隊列遙測傳輸(英語:Message Queuing Telemetry Transport)是ISO 标準(ISO/IECPRF 20922)下基于釋出 (Publish)/訂閱 (Subscribe)範式的消息協定,可視為“資料傳遞的橋梁”它工作在TCP/IP協定族上,是為硬體性能低下的遠端裝置以及網絡狀況糟糕的情況下而設計的釋出/訂閱型消息協定,為此,它需要一個消息中間件,以解決目前繁重的資料傳輸協定,如:HTTP。![]()
MQTT的初步學習以及編寫用戶端程式什麼是MQTTMQTT實際應用
曆史
IBM公司的安迪·斯坦福-克拉克及Arcom公司的阿蘭·尼普于1999年撰寫了該協定的第一個版本
MQTT 相較于HTTP, 能節省更多的資源,帶來較少的傳輸負擔,也因為這樣,在制造業中,讓更多人發現 IoT 在裝置、廠房的無限可能,發現原來要取機台的溫度這麼容易,要了解廠區的産量這麼友善。
概覽
消息代理和用戶端
MQTT 協定定義了兩種網絡實體:消息代理(message broker)與用戶端(client)。
其中,消息代理用于接收來自用戶端的消息并轉發至目标用戶端。
MQTT 用戶端可以是任何運作有 MQTT 庫并通過網絡連接配接至消息代理的裝置,例如微型控制器或大型伺服器。
主題
資訊的傳輸是通過主題(topic)管理的。釋出者有需要分發的資料時,其向連接配接的消息代理發送攜帶有資料的控制消息。代理會向訂閱此主題的用戶端分發此資料。釋出者不需要知道訂閱者的資料和具體位置;同樣,訂閱者不需要配置釋出者的相關資訊。
如果消息代理接受到某個主題上的消息,且這個主題沒有任何訂閱,那麼代理就會丢棄之,除非釋出者将其标記為保留消息(retained message)。
特性
1.MQTT 控制消息最小隻有 2 位元組的資料。最多可以承載 256 Mb 的資料。有 14 種預定義的消息類型用于:連接配接用戶端與代理、斷開連接配接、釋出資料、确認資料接收、監督用戶端與代理的連接配接。
2.MQTT 基于 TCP 協定,用于資料傳輸。變體 MQTT-SN 用于在藍牙上傳輸,基于 UDP。
3.MQTT 協定使用普通文本發送連接配接認證書,且并不包含任何安全或認證相關的措施。但可以使用傳輸層安全來加密并保護發送的資料,以防止攔截、修改或僞造。
4.MQTT 預設端口為 1883。加密的端口為 8883
QOS
以下是每一個服務品質級别的具體描述
0:最多一次傳送 (隻負責傳送,發送過後就不管資料的傳送情況)
1:至少一次傳送 (确認資料傳遞)
2:正好一次傳送 (保證資料傳遞成功)
MQTT實際應用
1、在本機上安裝MQTT伺服器和用戶端軟體
1-在CentOs上面部署EMQ
EMQ 是一個百萬級分布式開源物聯網 MQTT 消息伺服器。
官網位址: https://www.emqx.io/cn/
首先執行如下指令安裝 unzip:
yum -y install unzip
接着我們通路如下位址找到适合我們系統的版本:
https://www.emqx.io/cn/downloads#broker
照着下方的提示就可以安裝了
安裝文檔
Shell 腳本一鍵安裝 (Linux)
包管理器安裝 (Linux)
1.安裝所需要的依賴包
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
2.使用以下指令設定穩定存儲庫
以 CentOS8為例
sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/8/emqx-ce.repo
3.安裝最新版本的 EMQ X Broker
sudo yum install emqx
4.啟動 EMQ X Broker
#直接啟動
emqx start
emqx 4.0.0 is started successfully!
emqx_ctl status
Node '[email protected]' is started
emqx v4.0.0 is running
#systemctl 啟動
sudo systemctl start emqx
#service 啟動
sudo service emqx start
5.停止 EMQ X Broker
emqx stop
6.解除安裝 EMQ X Broker
sudo yum remove emqx
7.開放端口
當 EMQ 啟動之後我們就可以使用用戶端進行連接配接了,各個服務端口如下:
1883:MQTT 協定端口
8883:MQTT/SSL 端口
8083:MQTT/WebSocket 端口
8080:HTTP API 端口
18083:Dashboard 管理控制台端口
/sbin/iptables -I INPUT -p tcp --dport 1883 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 8883 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 18083 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 8080 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 8083 -j ACCEPT
8.EMQ 提供了一個後端 Web 控制台
使用者可通過 Web 控制台,檢視伺服器運作狀态、統計資料、用戶端(Client)、會話(Session)、主題(Topic)、訂閱(Subscription)、插件(Plugin)。
通路位址:http://伺服器IP:18083
預設使用者名:admin
預設密碼:public
這裡我們把界面改成中文
上文我們安裝好伺服器端(EMQ)之後,接下來就可以使用 MQTT 的用戶端對 MQTT 伺服器的基本功能進行相關的測試了。下面介紹一個十分好用的 MQTT 用戶端工具:MQTTBox。
2-用戶端工具MQTTBox的安裝和使用
MQTTBox 是一個帶有可視化的界面的 MQTT 的用戶端工具,它具有如下特點:
- 支援 TCP、TLS、Web Sockets 和安全的 Web Sockets 連接配接 MQTT 伺服器
- 支援各種 MQTT 用戶端的設定
- 支援釋出和訂閱多個主題
- 支援主題的單級和多級訂閱
- 複制/重新釋出有效負載
- 支援檢視每個主題已釋出/已訂閱消息的曆史記錄
下載下傳
MQTTBOX可以直接在Chrom浏覽器裡面下載下傳
MQTTBox - Chrome 線上應用程式商店
下載下傳好之後在應用裡面就可以打開
使用
打開建立一個新的連接配接
填寫連接配接名稱(可以随意輸入)、協定和主機位址後,點選“Save”儲存。
這裡是我個人的伺服器IP
儲存成功後進入如下的界面,頂部綠色“Connected”按鈕表明目前 MQTT 連接配接已經成功。
首先我們開啟一個訂閱,填入要訂閱的主題,并選擇 QoS 字段後,點選“Subscribe”按鈕。
設定好訂閱的主題後,在釋出這邊輸入釋出的主題(必須和訂閱那邊的輸入的主題相同),選好 QoS 字段,Payload 裡輸入任意要發送的内容,完畢後點選“Publish”按鈕。
我這裡截圖的時候已經釋出過一次了
然後訂閱端這邊就會收到由伺服器端轉發的消息内容。
打開 EMQ 的管理者控制台,可以看到一些相關的統計資料已經發生了變化。比如:
- “qos0/received”的值為 2,表示 EMQ 收到了 2 條 QoS0 的消息
-
“qos0/sent”的值為 2,表示 EMQ 轉發了2條 QoS0 的消息。
這和我釋出了兩次相符合
MQTT的初步學習以及編寫用戶端程式什麼是MQTTMQTT實際應用
練習消息釋出與訂閱,比如自定義一個天氣預報的消息主題。
2、利用網上提供的MQTT服務
編寫MQTT用戶端程式(python、java或c#、c/c++,任意一種程式設計語言),自定義一個天氣預報主題,完成訂閱與釋出。思考MQTT與前面REST協定的差別。
可利用的免費線上MQTT網站: https://www.emqx.io/cn/mqtt/public-mqtt5-broker
Java實作MQTT釋出和訂閱
MQTT–入門
釋出端
依賴:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
代碼:
package demo.ein;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Mqtt1 {
public static void main(String[] args) {
String topic = "The Weather";
String content = "test message,from ein";
int qos = 0;
String broker = "tcp://45.32.36.70:1883";
String userName = "ein";
String password = "123456";
String clientId = "963c923a-9e0a-43da-af1e-4eab7f646d70";
// 記憶體存儲
MemoryPersistence persistence = new MemoryPersistence();
try {
// 建立用戶端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
// 建立連結參數
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新啟動和重新連接配接時記住狀态
connOpts.setCleanSession(false);
// 設定連接配接的使用者名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立連接配接
sampleClient.connect(connOpts);
// 建立消息
MqttMessage message = new MqttMessage(content.getBytes());
// 設定消息的服務品質
message.setQos(qos);
// 釋出消息
sampleClient.publish(topic, message);
// 斷開連接配接
sampleClient.disconnect();
// 關閉用戶端
sampleClient.close();
System.out.println("ok");
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
使用Mqtt-Box測試
代碼運作無誤,能正常釋出消息
訂閱端
package demo.ein;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 訂閱
* @author issuser
*
*/
public class MqttRecive {
public static void main(String[] args) throws MqttException {
String HOST = "tcp://45.32.36.70:1883";
String TOPIC = "天氣預報";
int qos = 0;
String clientid = "963c923a-9e0a-43da-af1e-4eab7f646d70";
String userName = "ein";
String passWord = "123456";
try {
// host為主機名,test為clientid即連接配接MQTT的用戶端ID,一般以用戶端唯一辨別符表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接配接設定
MqttConnectOptions options = new MqttConnectOptions();
// 設定是否清空session,這裡如果設定為false表示伺服器會保留用戶端的連接配接記錄,這裡設定為true表示每次連接配接到伺服器都以新的身份連接配接
options.setCleanSession(true);
// 設定連接配接的使用者名
options.setUserName(userName);
// 設定連接配接的密碼
options.setPassword(passWord.toCharArray());
// 設定逾時時間 機關為秒
options.setConnectionTimeout(10);
// 設定會話心跳時間 機關為秒 伺服器會每隔1.5*20秒的時間向用戶端發送個消息判斷用戶端是否線上,但這個方法并沒有重連的機制
options.setKeepAliveInterval(20);
// 設定回調函數
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("topic:" + topic);
System.out.println("Qos:" + message.getQos());
System.out.println("message content:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
client.connect(options);
// 訂閱消息
client.subscribe(TOPIC, qos);
System.out.println("訂閱完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
測試成功:
釋出端 ·改:
但是僅僅是這樣好像還不太夠
于是在釋出端添加了阿裡雲的天氣預報接口調用
package demo.ein;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.util.EntityUtils;
import com.aliyuncs.http.HttpResponse;
public class Mqtt1 {
public static void main(String[] args) throws Exception {
String topic = "The Weather";
String content = "test message,from ein";
int qos = 0;
String broker = "tcp://45.32.36.70:1883";
String userName = "ein";
String password = "123456";
String clientId = "963c923a-9e0a-43da-af1e-4eab7f646d70";
// 記憶體存儲
MemoryPersistence persistence = new MemoryPersistence();
String host = "https://jisutqybmf.market.alicloudapi.com";
String path = "/weather/query";
String method = "ANY";
//GET/POST 任意
String appcode = "fdf6fefc645849ceb7dee6fea319ffe8";
Map<String, String> headers = new HashMap<String, String>();
//最後在header中的格式(中間是英文空格)為Authorization:APPCODE 83359fd73fe94948385f570e3c139105
headers.put("Authorization", "APPCODE " + appcode);
Map<String, String> querys = new HashMap<String, String>();
querys.put("city", "重慶");
querys.put("citycode", "citycode");
querys.put("cityid", "cityid");
querys.put("ip", "ip");
querys.put("location", "location");
try {
org.apache.http.HttpResponse response = HttpUtils.doGet(host, path, method, headers, querys);
System.out.println(response.toString());
//擷取response的body
String str=EntityUtils.toString(((org.apache.http.HttpResponse) response).getEntity(),"utf-8");
String[] strarray=str.split("}"); //遇到逗号就分割
// 建立用戶端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
// 建立連結參數
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新啟動和重新連接配接時記住狀态
connOpts.setCleanSession(false);
// 設定連接配接的使用者名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立連接配接
sampleClient.connect(connOpts);
for (int i = 0; i < strarray.length; i++) {
// 建立消息
MqttMessage message = new MqttMessage(strarray[i].getBytes());
// 設定消息的服務品質
message.setQos(qos);
// 釋出消息
sampleClient.publish(topic, message);
}
// 斷開連接配接
sampleClient.disconnect();
// 關閉用戶端
sampleClient.close();
System.out.println("ok");
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
在Mqtt-Box的訂閱端可以看到收到了重慶的天氣情況
訂閱端 ·改:
既然釋出端都改了,
訂閱端不給整好點也說不過去
訂閱端用C#來寫
C#得先安裝mosquitto
mosquitto 在 Windows 上的安裝
建立一個控制台應用程式
然後需要安裝名為M2Mqtt的NuGet程式包
這是目前最好用的C#庫是 eclipse出的M2Mqtt庫,項目的位址是https://github.com/eclipse/paho.mqtt.m2mqtt
下載下傳穩定4.3.0版本
代碼部分:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using System.Net;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
string topic = "The Weather";
string host = "45.32.36.70";
// 執行個體化Mqtt用戶端
MqttClient client = new MqttClient(host);
// 注冊接收消息事件
client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId);
// 訂閱主題 "天氣預報", 訂閱品質 QoS 0
client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
}
static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
// 列印訂閱的釋出端消息
Console.WriteLine(string.Format("subscriber,topic:{0},content:{1}", e.Topic, Encoding.UTF8.GetString(e.Message)));
}
}
}
配合前面的釋出端·改
最後實作效果如下
粗糙是粗糙了點,但是成功了