天天看點

MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

MQTT協定

  • MQTT簡介
  • 特性
  • MQTT協定格式
  • 實作方式
  • MQTT的搭建(ubuntu)
  • MQTT權限配置
    • 修改配置檔案
    • 添加使用者資訊
    • 添加Topic和使用者的關系
    • 使用者認證測試
  • MQTT實作(Java語言)
    • Pom
    • ClientMQTT
    • ServerMQTT
    • PushCallback

MQTT簡介

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協定),是一種基于釋出/訂閱(publish/subscribe)模式的輕量級協定,該協定建構于TCP/IP協定之上,MQTT最大優點在于,可以以極少的代碼和有限的帶寬,為連接配接遠端裝置提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協定,使其在物聯網、小型裝置、移動應用等方面有較廣泛的應用。

MQTT是一個基于用戶端-伺服器的消息釋出/訂閱傳輸協定。MQTT協定是輕量、簡單、開放和易于實作的,這些特點使它适用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鍊路通信傳感器、偶爾撥号的醫療裝置、智能家居、及一些小型化裝置中已廣泛使用。

特性

MQTT協定工作在低帶寬、不可靠的網絡的遠端傳感器和控制裝置通訊而設計的協定,它具有以下主要的幾項特性:

  1. 使用釋出/訂閱消息模式,提供一對多的消息釋出,解除應用程式耦合。
  2. 對負載内容屏蔽的消息傳輸。
  3. 使用TCP/IP提供網絡連接配接。

    主流的MQTT是基于TCP連接配接進行資料推送的,但是同樣有基于UDP的版本,叫做MQTT-SN。這兩種版本由于基于不同的連接配接方式,優缺點自然也就各有不同了。

  4. 有三種消息釋出服務品質(QoS):
    • QoS0:“至多一次”,消息釋出完全依賴底層TCP/IP網絡。會發生消息丢失或重複。這一級别可用于如下情況,環境傳感器資料,丢失一次讀記錄無所謂,因為不久後還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能裝置在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。
    • QoS1:“至少一次”,確定消息到達,但消息重複可能會發生。
    • QoS2:“隻有一次”,確定消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級别。在計費系統中,消息重複或丢失會導緻不正确的結果。這種最高品質的消息釋出服務還可以用于即時通訊類的APP的推送,確定使用者收到且隻會收到一次。
  5. 小型傳輸,開銷很小(固定長度的頭部是2位元組),協定交換最小化,以降低網絡流量。

這就是為什麼在介紹裡說它非常适合“在物聯網領域,傳感器與伺服器的通信,資訊的收集”,要知道嵌入式裝置的運算能力和帶寬都相對薄弱,使用這種協定來傳遞消息再适合不過了。

MQTT協定格式

MQTT協定使用二進制資料包,包含三個部分,分别是固定頭,可變頭、消息體;

  • 固定頭:存在于所有的MQTT資料包中,長度是2-5位元組,包括3部分内容,資料包類型(4bit),辨別位(4bit),資料包剩餘長度大小(1~4Byte),具體含義參考MQTT協定
  • 可變頭:部分MQTT資料包包含
  • 消息體:部分MQTT資料包包含

MQTT協定中,固定頭中的資料包剩餘長度大小包括可變頭和消息體長度,剩餘長度可用1~4Byte表示,4Byte最大可表示256MB(0xFFFFFF7F)。有些資料包沒有可變頭和消息體,比如PINGREQ資料包隻有2Byte,由此可以計算MQTT資料包的最大值和最小值

  • 最大值:256MB+5Byte,其中256MB是剩餘長度最大值,包括可變頭和消息體;5Byte是固定頭,資料包類型(4bit),辨別位(4bit),資料包剩餘長度大小(4Byte);
  • 最小值:2Byte,比如PINGREQ資料包,隻有固定頭,沒有可變頭,消息體;

實作方式

實作MQTT協定需要用戶端和伺服器端通訊完成,在通訊過程中,MQTT協定中有三種身份:釋出者(Publish)、代理(Broker)(伺服器)、訂閱者(Subscribe)。其中,消息的釋出者和訂閱者都是用戶端,消息代理是伺服器,消息釋出者可以同時是訂閱者。

MQTT Client: 隻要裝置基于MQTT協定連接配接了MQTT Broker ,就認為這個裝置是MQTT Client, MQTT Client 可以單獨作為釋出者和訂閱者,也可以同時是釋出者和訂閱者。

MQTT Broker: MQTT Broker 是MQTT協定的核心,主要作用是接收釋出者的消息,然後轉發給對應的訂閱者。Broker可以對Clinet接入進行授權,并對Client進行權限控制。常用的C語言編寫的MQTT Broker 開源庫有Mosquitto。如果學習MQTT需要Broker環境,可以通過這些開源庫自建,也可以用各大雲平台提供的Broker服務,如阿裡雲、騰訊雲。

MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:

  • Topic,可以了解為消息的類型,訂閱者訂閱(Subscribe)後,就會收到該主題的消息内容(payload);
  • payload,可以了解為消息的内容,是指訂閱者具體要使用的内容。

MQTT的搭建(ubuntu)

1.apt-get安裝mqtt相關包

apt-get install mosquitto
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

安裝用戶端

apt-get install mosquitto-clients
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

2.測試mosquitto是否正确運作

service mosquitto status
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

3.本機終端測試mqtt

打開一個終端,訂閱主題
mosquitto_sub -h 192.168.158.128 -t "mqtt" -v
           
  • -h 指定要連接配接的MQTT伺服器
  • -t 訂閱主題,此處為mqtt
  • -v 列印更多的調試資訊
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)
再打開一個終端,釋出主題
mosquitto_pub -h 192.168.158.128 -t "mqtt" -m "hello world"
           
  • -h 指定要連接配接的MQTT伺服器
  • -t 向指定主題推送消息
  • -m 指定消息内容
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

MQTT權限配置

前面我們基于Mosquitto伺服器已經搭建成功了,但是預設是允許匿名使用者登入,對于正式上線的項目則是需要進行使用者認證(當然,使用者一般都會與資料庫映射,不過在這裡我們就會直接将使用者寫入配置檔案中)

1.Mosquitto伺服器的配置檔案為/etc/mosquitto/mosquitto.conf,關于使用者認證的方式和讀取的配置都在這個檔案中進行

配置檔案參數說明:

ID allow_anonymous password_file acl_file result
1 True(預設) 允許匿名方式登入
2 False password_file 開啟使用者驗證機制
3 False password_file acl_file 開啟使用者驗證機制,但通路控制不起作用
4 True password_file acl_file 使用者名及密碼不為空,将自動進行使用者驗證且受到通路控制的限制;使用者名及密碼為空,将不進行使用者驗證且受到通路控制的限制
5 False 無法啟動服務

allow_anonymous:允許匿名

password-file:密碼檔案

acl_file:通路控制清單

修改配置檔案

vi /etc/mosquitto/mosquitto.conf
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)
allow_anonymous false
password_file /etc/mosquitto/pwfile
acl_file /etc/mosquitto/acl
           

添加使用者資訊

mosquitto_passwd -c /etc/mosquitto/pwfile zysheep
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

添加Topic和使用者的關系

user zysheep
topic write  mtopic/#

user zysheep
topic read mtopic/#
           

主題層級分隔符

/

: 用于分割主題層級,

/

分割後的主題,這是消息主題層級設計中很重要的符号。 比方說: aaa/bbb和 aaa/bbb/ccc 和aaa/bbb/ccc/ddd ,這樣的消息主題格式,是一個層層遞進的關系,可通過多層通配符同時比對兩者,或者單層通配符隻比對一個。 這在現實場景中,可以應用到:公司的部門層級推送、國家城市層級推送等包含層級關系的場景。

單層通配符

+

: 單層通配符隻能比對一層主題。比如: aaa/+ 可以比對 aaa/bbb ,但是不能比對aaa/bbb/ccc。單獨的

+

号可以比對單層的所有推送

多層通配符

#

: 多層通配符可以比對于多層主題。比如: aaa/# ,不但可以比對aaa/bbb,還可以比對aaa/bbb/ccc/ddd。也就是說,多層通配符可以比對符合通配符之前主題層級的所有子集主題。單獨的

#

比對所有的消息主題.

MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

使用者認證測試

1.重新開機Mosquitto步驟,檢視mosquitto的程序

ps -aux|grep mosquitto
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

2. 殺死程序

sudo kill -9 25372
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

3.重新啟動mosquitto

mosquitto -c /etc/mosquitto/mosquitto.conf 
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

4.訂閱端啟動(不加使用者)

mosquitto_sub -h 192.168.158.128 -t "mtopic" -v
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)
拒絕連接配接:未授權

5.訂閱端啟動(加使用者)

mosquitto_sub -h 192.168.158.128 -t "mtopic" -v -u zysheep -P 123456
           
  • -h 指定要連接配接的MQTT伺服器
  • -t 訂閱主題,此處為mqtt
  • -v 列印更多的調試資訊
  • -u 設定的使用者
  • -P 使用者密碼
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

6.釋出端啟動

mosquitto_pub -h 192.168.158.128 -t mtopic -u zysheep  -p 123456 -m "hello world"
           
MQTT協定MQTT簡介特性MQTT協定格式實作方式MQTT的搭建(ubuntu)MQTT權限配置MQTT實作(Java語言)

MQTT實作(Java語言)

Pom

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.1.1</version>
</dependency>
           

ClientMQTT

package cn.zysheep.mybatisspringboot.mqtt;

import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * Subscribe 用戶端訂閱主題
 */
public class ClientMQTT {
    public static final String HOST = "tcp://192.168.158.128:1883";
    public static final String TOPIC = "mtopic/zysheep/123";
    private static final String clientid = "client11";
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "zysheep";
    private String passWord = "123456";

    private ScheduledExecutorService scheduler;

    private void start() {
        try {
            // host為主機名,clientid即連接配接MQTT的用戶端ID,一般以唯一辨別符表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的連接配接設定
            options = new MqttConnectOptions();
            // 設定是否清空session,這裡如果設定為false表示伺服器會保留用戶端的連接配接記錄,這裡設定為true表示每次連接配接到伺服器都以新的身份連接配接
            options.setCleanSession(true);
            // 設定連接配接的使用者名
            options.setUserName(userName);
            // 設定連接配接的密碼
            options.setPassword(passWord.toCharArray());
            // 設定逾時時間 機關為秒
            options.setConnectionTimeout(10);
            // 設定會話心跳時間 機關為秒 伺服器會每隔1.5*20秒的時間向用戶端發送個消息判斷用戶端是否線上,但這個方法并沒有重連的機制
            options.setKeepAliveInterval(20);
            // 設定回調
            client.setCallback(new PushCallback());
            MqttTopic topic = client.getTopic(TOPIC);
            //setWill方法,如果項目中需要知道用戶端是否掉線可以調用該方法。設定最終端口的通知消息
            options.setWill(topic, "close".getBytes(), 2, true);

            client.connect(options);
            //訂閱消息
            int[] Qos  = {1};
            // qos 對消息處理的幾種機制。
            // 0 表示的是訂閱者沒收到消息不會再次發送,消息會丢失。   最多一次,即:<=1
            // 1 表示的是會嘗試重試,一直到接收到消息,但這種情況可能導緻訂閱者收到多次重複消息。  至少一次,即:>=1
            // 2 多了一次去重的動作,確定訂閱者收到的消息有一次。   一次,即:=1
            String[] topic1 = {TOPIC};
            client.subscribe(topic1, Qos);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }




    public static void main(String[] args) throws MqttException {
        ClientMQTT client = new ClientMQTT();
        client.start();
    }
}
           

ServerMQTT

package cn.zysheep.mybatisspringboot.mqtt;
import cn.zysheep.mybatisspringboot.entity.Wares;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 伺服器向多個用戶端推送主題,即不同用戶端可向服務端訂閱相同的主題
 *
 * Publish 服務端向主題釋出消息
 */

public class ServerMQTT {
    //tcp://MQTT安裝的伺服器位址:MQTT定義的端口号
    public static final String HOST = "tcp://192.168.158.128:1883";
    //定義一個主題
    public static final String TOPIC = "mtopic/zysheep/123";
    //定義MQTT的ID,可以在MQTT服務配置中指定
    private static final String clientid = "server11";

    private MqttClient client;
    private MqttTopic topic11;
    private String userName = "zysheep";
    private String passWord = "123456";

    private MqttMessage message;

    /**
     * 構造函數
     * @throws MqttException
     */
    public ServerMQTT() throws MqttException {
        // MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        connect();
    }

    /**
     *  用來連接配接伺服器
     */
    private void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 設定逾時時間
        options.setConnectionTimeout(10);
        // 設定會話心跳時間
        options.setKeepAliveInterval(20);
        try {
            client.setCallback(new PushCallback());
            client.connect(options);

            topic11 = client.getTopic(TOPIC);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
            MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! "
                + token.isComplete());
    }

    /**
     *  啟動入口
     * @param args
     * @throws MqttException
     */
    public static void main(String[] args) throws MqttException, JsonProcessingException, InterruptedException {
        ServerMQTT server = new ServerMQTT();

        server.message = new MqttMessage();
        server.message.setQos(1);
        server.message.setRetained(true);
        Wares wares = new Wares();
        wares.setName("常德牌水表");
        wares.setPicAddress("常德");
        ObjectMapper objectMapper = new ObjectMapper();
        String s = objectMapper.writeValueAsString(wares);
        server.message.setPayload(s.getBytes());
        for (int i = 0; i < 10; i++) {
            if (i%2 == 0){
                Thread.sleep(5000);
            }
            server.publish(server.topic11 , server.message);
        }


        System.out.println(server.message.isRetained() + "------ratained狀态");
    }
}
           

PushCallback

package cn.zysheep.mybatisspringboot.mqtt;

import cn.zysheep.mybatisspringboot.entity.Wares;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 釋出消息的回調類
 *
 * 必須實作MqttCallback的接口并實作對應的相關接口方法CallBack 類将實作 MqttCallBack。
 * 每個客戶機辨別都需要一個回調執行個體。在此示例中,構造函數傳遞客戶機辨別以另存為執行個體資料。
 * 在回調中,将它用來辨別已經啟動了該回調的哪個執行個體。
 * 必須在回調類中實作三個方法:
 *
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的釋出。
 *
 *  public void connectionLost(Throwable cause)在斷開連接配接時調用。
 *
 *  public void deliveryComplete(MqttDeliveryToken token))
 *  接收到已經釋出的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。
 *  由 MqttClient.connect 激活此回調。
 */
public class PushCallback implements MqttCallback{
    public void connectionLost(Throwable cause) {
        // 連接配接丢失後,一般在這裡面進行重連
        System.out.println("連接配接斷開,可以做重連");
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe後得到的消息會執行到這裡面
        System.out.println("接收消息主題 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        String s = new String(message.getPayload());
        ObjectMapper objectMapper = new ObjectMapper();
        Wares wares = objectMapper.readValue(s, Wares.class);
        System.out.println(wares);


        //System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
}
           

繼續閱讀