MQTT協定
- MQTT簡介
- 特性
- MQTT協定格式
- 實作方式
- MQTT的搭建(ubuntu)
- MQTT權限配置
-
- 修改配置檔案
- 添加使用者資訊
- 添加Topic和使用者的關系
- 使用者認證測試
- MQTT實作(Java語言)
-
- Pom
- ClientMQTT
- ServerMQTT
- PushCallback
MQTT簡介
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協定),是一種基于釋出/訂閱(publish/subscribe)模式的輕量級協定,該協定建構于TCP/IP協定之上,MQTT最大優點在于,可以以極少的代碼和有限的帶寬,為連接配接遠端裝置提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協定,使其在物聯網、小型裝置、移動應用等方面有較廣泛的應用。
MQTT是一個基于用戶端-伺服器的消息釋出/訂閱傳輸協定。MQTT協定是輕量、簡單、開放和易于實作的,這些特點使它适用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鍊路通信傳感器、偶爾撥号的醫療裝置、智能家居、及一些小型化裝置中已廣泛使用。
特性
MQTT協定工作在低帶寬、不可靠的網絡的遠端傳感器和控制裝置通訊而設計的協定,它具有以下主要的幾項特性:
- 使用釋出/訂閱消息模式,提供一對多的消息釋出,解除應用程式耦合。
- 對負載内容屏蔽的消息傳輸。
-
使用TCP/IP提供網絡連接配接。
主流的MQTT是基于TCP連接配接進行資料推送的,但是同樣有基于UDP的版本,叫做MQTT-SN。這兩種版本由于基于不同的連接配接方式,優缺點自然也就各有不同了。
- 有三種消息釋出服務品質(QoS):
- QoS0:“至多一次”,消息釋出完全依賴底層TCP/IP網絡。會發生消息丢失或重複。這一級别可用于如下情況,環境傳感器資料,丢失一次讀記錄無所謂,因為不久後還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能裝置在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。
- QoS1:“至少一次”,確定消息到達,但消息重複可能會發生。
- QoS2:“隻有一次”,確定消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級别。在計費系統中,消息重複或丢失會導緻不正确的結果。這種最高品質的消息釋出服務還可以用于即時通訊類的APP的推送,確定使用者收到且隻會收到一次。
- 小型傳輸,開銷很小(固定長度的頭部是2位元組),協定交換最小化,以降低網絡流量。
這就是為什麼在介紹裡說它非常适合“在物聯網領域,傳感器與伺服器的通信,資訊的收集”,要知道嵌入式裝置的運算能力和帶寬都相對薄弱,使用這種協定來傳遞消息再适合不過了。
MQTT協定格式
MQTT協定使用二進制資料包,包含三個部分,分别是固定頭,可變頭、消息體;
- 固定頭:存在于所有的MQTT資料包中,長度是2-5位元組,包括3部分内容,資料包類型(4bit),辨別位(4bit),資料包剩餘長度大小(1~4Byte),具體含義參考MQTT協定
- 可變頭:部分MQTT資料包包含
- 消息體:部分MQTT資料包包含
MQTT協定中,固定頭中的資料包剩餘長度大小包括可變頭和消息體長度,剩餘長度可用1~4Byte表示,4Byte最大可表示256MB(0xFFFFFF7F)。有些資料包沒有可變頭和消息體,比如PINGREQ資料包隻有2Byte,由此可以計算MQTT資料包的最大值和最小值
- 最大值:256MB+5Byte,其中256MB是剩餘長度最大值,包括可變頭和消息體;5Byte是固定頭,資料包類型(4bit),辨別位(4bit),資料包剩餘長度大小(4Byte);
- 最小值:2Byte,比如PINGREQ資料包,隻有固定頭,沒有可變頭,消息體;
實作方式
實作MQTT協定需要用戶端和伺服器端通訊完成,在通訊過程中,MQTT協定中有三種身份:釋出者(Publish)、代理(Broker)(伺服器)、訂閱者(Subscribe)。其中,消息的釋出者和訂閱者都是用戶端,消息代理是伺服器,消息釋出者可以同時是訂閱者。
MQTT Client: 隻要裝置基于MQTT協定連接配接了MQTT Broker ,就認為這個裝置是MQTT Client, MQTT Client 可以單獨作為釋出者和訂閱者,也可以同時是釋出者和訂閱者。
MQTT Broker: MQTT Broker 是MQTT協定的核心,主要作用是接收釋出者的消息,然後轉發給對應的訂閱者。Broker可以對Clinet接入進行授權,并對Client進行權限控制。常用的C語言編寫的MQTT Broker 開源庫有Mosquitto。如果學習MQTT需要Broker環境,可以通過這些開源庫自建,也可以用各大雲平台提供的Broker服務,如阿裡雲、騰訊雲。
MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:
- Topic,可以了解為消息的類型,訂閱者訂閱(Subscribe)後,就會收到該主題的消息内容(payload);
- payload,可以了解為消息的内容,是指訂閱者具體要使用的内容。
MQTT的搭建(ubuntu)
1.apt-get安裝mqtt相關包
apt-get install mosquitto
安裝用戶端
apt-get install mosquitto-clients
2.測試mosquitto是否正确運作
service mosquitto status
3.本機終端測試mqtt
打開一個終端,訂閱主題
mosquitto_sub -h 192.168.158.128 -t "mqtt" -v
- -h 指定要連接配接的MQTT伺服器
- -t 訂閱主題,此處為mqtt
- -v 列印更多的調試資訊
再打開一個終端,釋出主題
mosquitto_pub -h 192.168.158.128 -t "mqtt" -m "hello world"
- -h 指定要連接配接的MQTT伺服器
- -t 向指定主題推送消息
- -m 指定消息内容
MQTT權限配置
前面我們基于Mosquitto伺服器已經搭建成功了,但是預設是允許匿名使用者登入,對于正式上線的項目則是需要進行使用者認證(當然,使用者一般都會與資料庫映射,不過在這裡我們就會直接将使用者寫入配置檔案中)
1.Mosquitto伺服器的配置檔案為/etc/mosquitto/mosquitto.conf,關于使用者認證的方式和讀取的配置都在這個檔案中進行
配置檔案參數說明:
ID | allow_anonymous | password_file | acl_file | result |
---|---|---|---|---|
1 | True(預設) | 允許匿名方式登入 | ||
2 | False | password_file | 開啟使用者驗證機制 | |
3 | False | password_file | acl_file | 開啟使用者驗證機制,但通路控制不起作用 |
4 | True | password_file | acl_file | 使用者名及密碼不為空,将自動進行使用者驗證且受到通路控制的限制;使用者名及密碼為空,将不進行使用者驗證且受到通路控制的限制 |
5 | False | 無法啟動服務 |
allow_anonymous:允許匿名
password-file:密碼檔案
acl_file:通路控制清單
修改配置檔案
vi /etc/mosquitto/mosquitto.conf
allow_anonymous false
password_file /etc/mosquitto/pwfile
acl_file /etc/mosquitto/acl
添加使用者資訊
mosquitto_passwd -c /etc/mosquitto/pwfile zysheep
添加Topic和使用者的關系
user zysheep
topic write mtopic/#
user zysheep
topic read mtopic/#
主題層級分隔符
/
: 用于分割主題層級,
/
分割後的主題,這是消息主題層級設計中很重要的符号。 比方說: aaa/bbb和 aaa/bbb/ccc 和aaa/bbb/ccc/ddd ,這樣的消息主題格式,是一個層層遞進的關系,可通過多層通配符同時比對兩者,或者單層通配符隻比對一個。 這在現實場景中,可以應用到:公司的部門層級推送、國家城市層級推送等包含層級關系的場景。
單層通配符
+
: 單層通配符隻能比對一層主題。比如: aaa/+ 可以比對 aaa/bbb ,但是不能比對aaa/bbb/ccc。單獨的
+
号可以比對單層的所有推送
多層通配符
#
: 多層通配符可以比對于多層主題。比如: aaa/# ,不但可以比對aaa/bbb,還可以比對aaa/bbb/ccc/ddd。也就是說,多層通配符可以比對符合通配符之前主題層級的所有子集主題。單獨的
#
比對所有的消息主題.
使用者認證測試
1.重新開機Mosquitto步驟,檢視mosquitto的程序
ps -aux|grep mosquitto
2. 殺死程序
sudo kill -9 25372
3.重新啟動mosquitto
mosquitto -c /etc/mosquitto/mosquitto.conf
4.訂閱端啟動(不加使用者)
mosquitto_sub -h 192.168.158.128 -t "mtopic" -v
拒絕連接配接:未授權
5.訂閱端啟動(加使用者)
mosquitto_sub -h 192.168.158.128 -t "mtopic" -v -u zysheep -P 123456
- -h 指定要連接配接的MQTT伺服器
- -t 訂閱主題,此處為mqtt
- -v 列印更多的調試資訊
- -u 設定的使用者
- -P 使用者密碼
6.釋出端啟動
mosquitto_pub -h 192.168.158.128 -t mtopic -u zysheep -p 123456 -m "hello world"
MQTT實作(Java語言)
Pom
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>
ClientMQTT
package cn.zysheep.mybatisspringboot.mqtt;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* Subscribe 用戶端訂閱主題
*/
public class ClientMQTT {
public static final String HOST = "tcp://192.168.158.128:1883";
public static final String TOPIC = "mtopic/zysheep/123";
private static final String clientid = "client11";
private MqttClient client;
private MqttConnectOptions options;
private String userName = "zysheep";
private String passWord = "123456";
private ScheduledExecutorService scheduler;
private void start() {
try {
// host為主機名,clientid即連接配接MQTT的用戶端ID,一般以唯一辨別符表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接配接設定
options = new MqttConnectOptions();
// 設定是否清空session,這裡如果設定為false表示伺服器會保留用戶端的連接配接記錄,這裡設定為true表示每次連接配接到伺服器都以新的身份連接配接
options.setCleanSession(true);
// 設定連接配接的使用者名
options.setUserName(userName);
// 設定連接配接的密碼
options.setPassword(passWord.toCharArray());
// 設定逾時時間 機關為秒
options.setConnectionTimeout(10);
// 設定會話心跳時間 機關為秒 伺服器會每隔1.5*20秒的時間向用戶端發送個消息判斷用戶端是否線上,但這個方法并沒有重連的機制
options.setKeepAliveInterval(20);
// 設定回調
client.setCallback(new PushCallback());
MqttTopic topic = client.getTopic(TOPIC);
//setWill方法,如果項目中需要知道用戶端是否掉線可以調用該方法。設定最終端口的通知消息
options.setWill(topic, "close".getBytes(), 2, true);
client.connect(options);
//訂閱消息
int[] Qos = {1};
// qos 對消息處理的幾種機制。
// 0 表示的是訂閱者沒收到消息不會再次發送,消息會丢失。 最多一次,即:<=1
// 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導緻訂閱者收到多次重複消息。 至少一次,即:>=1
// 2 多了一次去重的動作,確定訂閱者收到的消息有一次。 一次,即:=1
String[] topic1 = {TOPIC};
client.subscribe(topic1, Qos);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws MqttException {
ClientMQTT client = new ClientMQTT();
client.start();
}
}
ServerMQTT
package cn.zysheep.mybatisspringboot.mqtt;
import cn.zysheep.mybatisspringboot.entity.Wares;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 伺服器向多個用戶端推送主題,即不同用戶端可向服務端訂閱相同的主題
*
* Publish 服務端向主題釋出消息
*/
public class ServerMQTT {
//tcp://MQTT安裝的伺服器位址:MQTT定義的端口号
public static final String HOST = "tcp://192.168.158.128:1883";
//定義一個主題
public static final String TOPIC = "mtopic/zysheep/123";
//定義MQTT的ID,可以在MQTT服務配置中指定
private static final String clientid = "server11";
private MqttClient client;
private MqttTopic topic11;
private String userName = "zysheep";
private String passWord = "123456";
private MqttMessage message;
/**
* 構造函數
* @throws MqttException
*/
public ServerMQTT() throws MqttException {
// MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
connect();
}
/**
* 用來連接配接伺服器
*/
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 設定逾時時間
options.setConnectionTimeout(10);
// 設定會話心跳時間
options.setKeepAliveInterval(20);
try {
client.setCallback(new PushCallback());
client.connect(options);
topic11 = client.getTopic(TOPIC);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete());
}
/**
* 啟動入口
* @param args
* @throws MqttException
*/
public static void main(String[] args) throws MqttException, JsonProcessingException, InterruptedException {
ServerMQTT server = new ServerMQTT();
server.message = new MqttMessage();
server.message.setQos(1);
server.message.setRetained(true);
Wares wares = new Wares();
wares.setName("常德牌水表");
wares.setPicAddress("常德");
ObjectMapper objectMapper = new ObjectMapper();
String s = objectMapper.writeValueAsString(wares);
server.message.setPayload(s.getBytes());
for (int i = 0; i < 10; i++) {
if (i%2 == 0){
Thread.sleep(5000);
}
server.publish(server.topic11 , server.message);
}
System.out.println(server.message.isRetained() + "------ratained狀态");
}
}
PushCallback
package cn.zysheep.mybatisspringboot.mqtt;
import cn.zysheep.mybatisspringboot.entity.Wares;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 釋出消息的回調類
*
* 必須實作MqttCallback的接口并實作對應的相關接口方法CallBack 類将實作 MqttCallBack。
* 每個客戶機辨別都需要一個回調執行個體。在此示例中,構造函數傳遞客戶機辨別以另存為執行個體資料。
* 在回調中,将它用來辨別已經啟動了該回調的哪個執行個體。
* 必須在回調類中實作三個方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的釋出。
*
* public void connectionLost(Throwable cause)在斷開連接配接時調用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已經釋出的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。
* 由 MqttClient.connect 激活此回調。
*/
public class PushCallback implements MqttCallback{
public void connectionLost(Throwable cause) {
// 連接配接丢失後,一般在這裡面進行重連
System.out.println("連接配接斷開,可以做重連");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe後得到的消息會執行到這裡面
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
String s = new String(message.getPayload());
ObjectMapper objectMapper = new ObjectMapper();
Wares wares = objectMapper.readValue(s, Wares.class);
System.out.println(wares);
//System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}