天天看點

Rust語言從入門到精通系列 - 物聯網消息傳輸協定MQTT(進階)

作者:TinyZzh
Rust語言從入門到精通系列 - 物聯網消息傳輸協定MQTT(進階)

MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協定,用于在低帶寬和不穩定的網絡環境中傳輸消息。MQTT協定基于釋出/訂閱模式,包含了許多特性,如QoS,保留消息,遺囑消息等,使得它非常适合物聯網裝置之間的通信。

Rust是一種系統級程式設計語言,具有記憶體安全和高性能的特性。Rust語言的主要目标是提供一種安全、并發、實用的程式設計語言,使得開發者可以輕松地編寫高性能的系統級應用程式。本教程将介紹如何使用Rust語言和rumqttc子產品來實作MQTT協定的基礎應用和進階應用。

rumqttc子產品簡介

rumqttc是一個基于Rust語言實作的MQTT用戶端庫,它提供了連接配接MQTT伺服器、訂閱主題、釋出消息等基本功能,并支援TLS加密連接配接。rumqttc的API簡單易用,适合初學者和中級開發者使用。

在Cargo.toml檔案中添加rumqtt子產品依賴, 示例配置如下:

[dependencies]
rumqttc = "0.21.0"           

應用實踐進階

使用QoS2傳輸消息

這個示例示範如何使用rumqttc子產品使用QoS2傳輸消息。

use rumqttc::{Client, MqttOptions, QoS};

fn main() {
    let mqtt_options = MqttOptions::new("test-qos2", "localhost", 1883);

    let (mut client, _) = Client::new(mqtt_options, 10);

    client
        .publish("test/topic", QoS::ExactlyOnce, false, "hello world".to_owned())
        .unwrap();
}           

這個示例中,我們建立了一個MQTT用戶端,連接配接到本地的MQTT伺服器,然後釋出了一條消息到test/topic主題。在調用publish方法時,我們指定了消息的QoS為ExactlyOnce,表示消息必須被傳輸一次,且隻能被傳輸一次。

使用連接配接池

在實際應用中,我們通常需要同時處理多個MQTT用戶端連接配接,這時候使用連接配接池可以提高性能和可靠性。rumqttc子產品提供了一個ConnectionPool結構體,可以友善地管理多個MQTT用戶端連接配接。

use rumqttc::{Client, ConnectionPool, MqttOptions};

fn main() {
    let mqtt_options = MqttOptions::new("test-pool", "localhost", 1883);

    let pool = ConnectionPool::new(mqtt_options, 10);

    let mut clients = Vec::new();

    for _ in 0..10 {
        let client = pool.connect().unwrap();
        clients.push(client);
    }

    // Do something here
}           

這個示例中,我們建立了一個MQTT連接配接池,連接配接到本地的MQTT伺服器。然後我們使用循環建立了10個MQTT用戶端連接配接,這些連接配接會自動被管理和回收。

使用多線程

在實際應用中,我們通常需要同時處理多個MQTT消息,這時候使用多線程可以提高性能和可靠性。Rust語言的多線程非常友善,可以使用标準庫中的std::thread子產品來建立線程。

use rumqttc::{Client, MqttOptions, QoS};
use std::thread;

fn main() {
    let mqtt_options = MqttOptions::new("test-thread", "localhost", 1883);

    let (mut client, _) = Client::new(mqtt_options, 10);

    let handle = thread::spawn(move || {
        client
            .publish("test/topic", QoS::AtLeastOnce, false, "hello world".to_owned())
            .unwrap();
    });

    handle.join().unwrap();
}           

這個示例中,我們建立了一個MQTT用戶端,連接配接到本地的MQTT伺服器。然後我們使用std::thread::spawn方法建立了一個新線程,這個線程會在背景釋出一條消息到test/topic主題。

持久化存儲消息

通過持久化存儲可以保證消息不會因為程式崩潰或網絡故障而丢失。以下是一個使用SQLite資料庫持久化存儲消息的示例代碼:

use std::thread;
use rumqttc::{Client, MqttOptions, QoS, Event, Packet, Publish, Subscriptions, Qos};

fn main() {
    let mqtt_options = MqttOptions::new("test-7", "localhost", 1883);
    let (mut client, mut connection) = Client::new(mqtt_options, 10);
    let subscriptions = vec![Subscriptions::new("test/topic", QoS::AtLeastOnce)];
    client.subscribe(subscriptions).unwrap();
    let mut storage = Storage::new("mqtt.db").unwrap();
    for event in connection.iter() {
        match event.unwrap() {
            Event::Incoming(Packet::Publish(publish)) => {
                storage.insert_message(&publish).unwrap();
                println!("Received message: {} from topic: {}", publish.payload, publish.topic_name);
            },
            _ => {},
        }
    }
}

struct Storage {
    conn: rusqlite::Connection,
}

impl Storage {
    fn new(path: &str) -> rusqlite::Result<Self> {
        let conn = rusqlite::Connection::open(path)?;
        conn.execute("CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, topic TEXT, payload TEXT, qos INTEGER)", [])?;
        Ok(Self { conn })
    }

    fn insert_message(&mut self, publish: &Publish) -> rusqlite::Result<()> {
        let mut stmt = self.conn.prepare("INSERT INTO messages (topic, payload, qos) VALUES (?, ?, ?)")?;
        stmt.execute(&[&publish.topic_name, &publish.payload, &publish.qos as &i32])?;
        Ok(())
    }
}           

其中,Storage結構體使用SQLite資料庫來持久化存儲消息。在Event::Incoming(Packet::Publish(publish))分支中,将接收到的消息插入到資料庫中。

總結

rumqttc子產品是一個非常友善的MQTT用戶端庫,它提供了一系列API,可以友善地實作MQTT協定的功能。本教程作為前一篇的進階補充提供了常見的實際應用場景的應用示例,希望對您進一步深入的了解和掌握物聯網傳輸協定MQTT有所幫助。