天天看點

Rust語言從入門到精通系列 - 物聯網消息傳輸協定MQTT(入門)

作者:TinyZzh
Rust語言從入門到精通系列 - 物聯網消息傳輸協定MQTT(入門)

MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協定,用于在低帶寬和不穩定的網絡環境中傳輸消息。MQTT協定基于釋出/訂閱模式,包含了許多特性,如QoS,保留消息,遺囑消息等,使得它非常适合物聯網裝置之間的通信。

Rust是一種系統級程式設計語言,具有記憶體安全和高性能的特性。Rust語言的主要目标是提供一種安全、并發、實用的程式設計語言,使得開發者可以輕松地編寫高性能的系統級應用程式。本教程将介紹如何使用Rust語言和rumqttc子產品來實作MQTT協定的基礎應用和進階應用。

rumqttc子產品簡介

rumqttc是一個基于Rust語言實作的MQTT用戶端庫,它提供了連接配接MQTT伺服器、訂閱主題、釋出消息等基本功能,并支援TLS加密連接配接。rumqttc的API簡單易用,适合初學者和中級開發者使用。

在Cargo.toml檔案中添加rumqtt子產品依賴, 示例配置如下:

[dependencies]
rumqttc = "0.21.0"           

應用實踐

連接配接MQTT伺服器

使用rumqttc連接配接MQTT伺服器非常簡單,隻需要指定伺服器位址和端口号即可。以下是一個連接配接到本地MQTT伺服器的示例代碼:

use rumqttc::{Client, MqttOptions};

fn main() {
    let mqtt_options = MqttOptions::new("test-1", "localhost", 1883);
    let (mut client, _) = Client::new(mqtt_options, 10);
    client
        .connect()
        .expect("Failed to connect to MQTT server");
    // ...
}           

其中,test-1是用戶端ID,可以自行定義。10是消息隊列的大小,表示可以同時處理的未确認消息數量。

訂閱主題

訂閱MQTT主題可以接收來自其他用戶端的消息。使用rumqttc訂閱主題也非常簡單,隻需要指定主題名稱和消息處理函數即可。以下是一個訂閱主題的示例代碼:

use rumqttc::{Client, MqttOptions, QoS};

fn main() {
    let mqtt_options = MqttOptions::new("test-2", "localhost", 1883);
    let (mut client, mut connection) = Client::new(mqtt_options, 10);
    client
        .subscribe("test/topic", QoS::AtLeastOnce)
        .expect("Failed to subscribe to topic");
    for message in connection.iter() {
        if let Ok(message) = message {
            println!("{}", message.payload_str());
        }
    }
}           

其中,test/topic是要訂閱的主題名稱,QoS::AtLeastOnce表示消息至少被處理一次,即使出現網絡故障或用戶端當機也不會丢失。connection.iter()傳回一個疊代器,可以用來不斷接收來自伺服器的消息。

釋出消息

釋出MQTT消息可以向其他用戶端發送資料。使用rumqttc釋出消息也非常簡單,隻需要指定主題名稱和消息内容即可。以下是一個釋出消息的示例代碼:

use rumqttc::{Client, MqttOptions, QoS, ReconnectOptions, Transport};

fn main() {
    let mqtt_options = MqttOptions::new("test-3", "localhost", 1883);
    let (mut client, mut connection) = Client::new(mqtt_options, 10);
    let reconnection_options = ReconnectOptions::Always(10);
    client
        .publish("test/topic", QoS::AtLeastOnce, false, "Hello, world!")
        .expect("Failed to publish message");
}           

其中,"Hello, world!"是要發送的消息内容,可以是字元串、位元組流或其他資料類型。

保留消息

這個示例示範如何使用rumqttc子產品發送和接收保留消息。

use rumqttc::{Client, MqttOptions, QoS};

fn main() {
    let mqtt_options = MqttOptions::new("test-retain", "localhost", 1883);

    let (mut client, _) = Client::new(mqtt_options, 10);

    client
        .publish("test/topic", QoS::AtLeastOnce, true, "hello world".to_owned())
        .unwrap();

    let message = client.get_retained("test/topic").unwrap();
    println!("Received message: {:?}", message);
}           

這個示例中,我們建立了一個MQTT用戶端,連接配接到本地的MQTT伺服器,然後發送了一條保留消息到test/topic主題。在調用publish方法時,我們指定了消息的QoS為AtLeastOnce,表示消息至少要被傳輸一次,但不保證隻傳輸一次。第三個參數表示消息是否為保留消息。

然後我們使用client.get_retained("test/topic")方法擷取到保留消息,這個方法會傳回最新的保留消息。

斷開連接配接

使用完rumqttc後,需要手動斷開與MQTT伺服器的連接配接。以下是一個斷開連接配接的示例代碼:

use rumqttc::{Client, MqttOptions};

fn main() {
    let mqtt_options = MqttOptions::new("test-4", "localhost", 1883);
    let (mut client, _) = Client::new(mqtt_options, 10);
    client
        .connect()
        .expect("Failed to connect to MQTT server");
    // ...
    client.disconnect().expect("Failed to disconnect from MQTT server");
}           

使用Last Will和Testament

這個示例示範如何使用rumqttc子產品設定Last Will和Testament。

use rumqttc::{Client, LastWill, MqttOptions, QoS};

fn main() {
    let mqtt_options = MqttOptions::new("test-lwt", "localhost", 1883);

    let last_will = LastWill::new("test/topic", QoS::AtLeastOnce, "offline".to_owned());

    let (mut client, _) = Client::new(mqtt_options, 10);

    client.set_last_will(last_will).unwrap();

    // Do something here
}           

這個示例中,我們建立了一個MQTT用戶端,連接配接到本地的MQTT伺服器。然後我們使用LastWill::new方法建立了一個Last Will和Testament,指定了主題、QoS和消息内容。最後我們使用client.set_last_will方法設定了Last Will和Testament。

TLS加密連接配接

為了保護MQTT通信的安全性,可以使用TLS加密連接配接。使用rumqttc實作TLS加密連接配接也非常簡單,隻需要指定證書和私鑰即可。以下是一個使用TLS加密連接配接的示例代碼:

use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use rumqttc::{Client, MqttOptions, SecurityOptions};

fn main() {
    let mqtt_options = MqttOptions::new("test-5", "localhost", 8883);
    let security_options = SecurityOptions::with_ca(File::open(Path::new("ca.crt")).unwrap())
        .with_client_cert(File::open(Path::new("client.crt")).unwrap(), File::open(Path::new("client.key")).unwrap());
    let (mut client, _) = Client::new(mqtt_options, 10);
    client
        .set_security_opts(security_options)
        .connect()
        .expect("Failed to connect to MQTT server");
    // ...
}           

其中,ca.crt是CA憑證,client.crt和client.key是用戶端證書和私鑰。

多線程處理消息

使用多線程可以提高消息處理的效率和并發性。以下是一個使用多線程處理消息的示例代碼:

use std::thread;
use rumqttc::{Client, MqttOptions, QoS};

fn main() {
    let mqtt_options = MqttOptions::new("test-6", "localhost", 1883);
    let (mut client, mut connection) = Client::new(mqtt_options, 10);
    client
        .subscribe("test/topic", QoS::AtLeastOnce)
        .expect("Failed to subscribe to topic");
    for message in connection.iter() {
        if let Ok(message) = message {
            let payload = message.payload.clone();
            let topic = message.topic.clone();
            thread::spawn(move || {
                println!("Received message: {} from topic: {}", payload, topic);
            });
        }
    }
}           

其中,thread::spawn()建立一個新線程來處理消息,可以使用閉包來捕獲消息的内容。

總結

rumqttc子產品是一個非常友善的MQTT用戶端庫,它提供了一系列API,可以友善地實作MQTT協定的功能。本教程提供了幾個常見的基礎應用示例,希望對您有所幫助。