天天看點

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

目錄

​​MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)​​

​​MQTT(二)在windows64位上安裝Python環境​​

​​MQTT(三)Python用戶端+net用戶端+net服務端 簡單通信​​

​​MQTT(四)樹莓派開機自動運作Python用戶端​​

​​MQTT(五)EMQ開源MQTT消息伺服器​​

1 什麼是 MQTT ?

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。​​MQTT官網​​

2 MQTTnet

​​MQTTnet​​​ 是一個基于 MQTT 通信的高性能 .NET 開源庫,它同時支援 MQTT 伺服器端和用戶端。而且作者也保持更新,目前支援新版的.NET core,這也是選擇 MQTTnet 的原因。 MQTTnet 在 Github 并不是下載下傳最多的 .NET 的 MQTT 開源庫,其他的還 ​​MqttDotNet​​​、​​nMQTT​​​、​​M2MQTT​​ 等

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from ​​http://mqtt.org/​​.

3 建立項目并導入類庫

這裡我們使用 Visual Studio 2017 建立一個空解決方案,并在其中添加兩個項目,即一個服務端和一個用戶端,服務端項目模闆選擇最新的 .NET Core 控制台應用,用戶端項目選擇傳統的 WinForm 窗體應用程式。.NET Core 項目模闆如下圖所示: 

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

在解決方案在右鍵單擊-選擇“管了解決方案的 NuGet 程式包”-在“浏覽”頁籤下面搜尋 MQTTnet,為服務端項目和用戶端項目都安裝上 MQTTnet 庫,目前最新穩定版為 2.4.0。項目結構如下圖所示: 

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

4 服務端

MQTT 服務端主要用于與多個用戶端保持連接配接,并處理用戶端的釋出和訂閱等邏輯。一般很少直接從服務端發送消息給用戶端(可以使用 ​

​mqttServer.Publish(appMsg);​

​ 直接發送消息),多數情況下服務端都是轉發主題比對的用戶端消息,在系統中起到一個中介的作用。

4.1 建立服務端并啟動

建立服務端最簡單的方式是采用 ​

​MqttServerFactory​

​​ 對象的 ​

​CreateMqttServer​

​​ 方法來實作,該方法需要一個​

​MqttServerOptions​

​ 參數。

var options = new MqttServerOptions();
var mqttServer = new MqttServerFactory().CreateMqttServer(options);      

通過上述方式建立了一個 ​

​IMqttServer​

​​ 對象後,調用其 ​

​StartAsync​

​​ 方法即可啟動 MQTT 服務。值得注意的是:之前版本采用的是 ​

​Start​

​ 方法,作者也是緊跟 C# 語言新特性,能使用異步的地方也都改為異步方式。

await mqttServer.StartAsync();      

4.2 驗證用戶端

在 ​

​MqttServerOptions​

​​ 選項中,你可以使用 ​

​ConnectionValidator​

​​ 來對用戶端連接配接進行驗證。比如用戶端ID辨別 ​

​ClientId​

​​,使用者名 ​

​Username​

​​ 和密碼 ​

​Password​

​ 等。

var options = new MqttServerOptions
{
    ConnectionValidator = c =>
    {
        if (c.ClientId.Length < 10)
        {
            return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
        }
 
        if (c.Username != "xxx" || c.Password != "xxx")
        {
            return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
        }
 
        return MqttConnectReturnCode.ConnectionAccepted;
    }
};      

4.3 相關事件

服務端支援 ​

​ClientConnected​

​​、​

​ClientDisconnected​

​​ 和 ​

​ApplicationMessageReceived​

​ 事件,分别用來檢查用戶端連接配接、用戶端斷開以及接收用戶端發來的消息。

其中 ​

​ClientConnected​

​​ 和 ​

​ClientDisconnected​

​​ 事件的事件參數一個用戶端連接配接對象 ​

​ConnectedMqttClient​

​​,通過該對象可以擷取用戶端ID辨別 ​

​ClientId​

​​ 和 MQTT 版本 ​

​ProtocolVersion​

​。

​ApplicationMessageReceived​

​​ 的事件參數包含了用戶端ID辨別 ​

​ClientId​

​​ 和 MQTT 應用消息 ​

​MqttApplicationMessage​

​​ 對象,通過該對象可以擷取主題 ​

​Topic​

​​、QoS ​

​QualityOfServiceLevel​

​​ 和消息内容 ​

​Payload​

​ 等資訊。

5 用戶端

MQTT 與 HTTP 不同,後者是基于請求/響應方式的,伺服器端無法直接發送資料給用戶端。而 MQTT 是基于釋出/訂閱模式的,所有的用戶端均與服務端保持連接配接狀态。

那麼用戶端之間是如何通信的呢?

具體邏輯是:某些用戶端向服務端訂閱它感興趣(主題)的消息,另一些用戶端向服務端釋出(主題)消息,服務端将訂閱和釋出的主題進行比對,并将消息轉發給比對通過的用戶端。

5.1 建立用戶端并連接配接

使用 MQTTnet 建立 MQTT 也非常簡單,隻需要使用 ​

​MqttClientFactory​

​​ 對象的 ​

​CreateMqttClient​

​ 方法即可。

var mqttClient = new MqttClientFactory().CreateMqttClient();      

建立用戶端對象後,調用其異步方法 ​

​ConnectAsync​

​ 來連接配接到服務端。

await mqttClient.ConnectAsync(options);      

調用該方法時需要傳遞一個 ​

​MqttClientTcpOptions​

​​ 對象(之前的版本是在建立對象時使用該選項),該選項包含了用戶端ID辨別 ​

​ClientId​

​​、服務端位址(可以使用IP位址或域名)​

​Server​

​​、端口号 ​

​Port​

​​、使用者名 ​

​UserName​

​​、密碼 ​

​Password​

​ 等資訊。

var options = new MqttClientTcpOptions
{
    Server = "127.0.0.1",
    ClientId = "c001",
    UserName = "u001",
    Password = "p001",
    CleanSession = true
};      

5.2 相關事件

用戶端支援 ​

​Connected​

​​、​

​Disconnected​

​​ 和 ​

​ApplicationMessageReceived​

​ 事件,用來處理用戶端與服務端連接配接、用戶端從服務端斷開以及用戶端收到消息的事情。

5.2 訂閱消息

用戶端連接配接到服務端之後,可以使用 ​

​SubscribeAsync​

​​ 異步方法訂閱消息,該方法可以傳入一個可枚舉或可變參數的主題過濾器 ​

​TopicFilter​

​ 參數,主題過濾器包含主題名和 QoS 等級。

mqttClient.SubscribeAsync(new List<TopicFilter> {
    new TopicFilter("家/客廳/空調/#", MqttQualityOfServiceLevel.AtMostOnce)
});      

5.3 釋出消息

釋出消息前需要先建構一個消息對象 ​

​MqttApplicationMessage​

​,最直接的方法是使用其實構造函數,傳入主題、内容、Qos 等參數。

mqttClient.SubscribeAsync(new List<TopicFilter> {
    new TopicFilter("家/客廳/空調/#", MqttQualityOfServiceLevel.AtMostOnce)
});      

得到 ​

​MqttApplicationMessage​

​​ 消息對象後,通過用戶端對象調用其 ​

​PublishAsync​

​ 異步方法進行消息釋出。

mqttClient.PublishAsync(appMsg);      

6 跟蹤消息

​MQTTnet​

​​ 提供了一個靜态類 ​

​MqttNetTrace​

​​ 來對消息進行跟蹤,該類可用于服務端和用戶端。​

​MqttNetTrace​

​​ 的事件​

​TraceMessagePublished​

​​ 用于跟蹤服務端和用戶端應用的日志消息,比如啟動、停止、心跳、消息訂閱和釋出等。事件參數​

​MqttNetTraceMessagePublishedEventArgs​

​​ 包含了線程ID ​

​ThreadId​

​​、來源 ​

​Source​

​​、日志級别 ​

​Level​

​​、日志消息 ​

​Message​

​​、異常資訊 ​

​Exception​

​ 等。

MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;
 
private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e)
{
    Console.WriteLine($">> 線程ID:{e.ThreadId} 來源:{e.Source} 跟蹤級别:{e.Level} 消息: {e.Message}");
 
    if (e.Exception != null)
    {
        Console.WriteLine(e.Exception);
    }
}      

同時 ​

​MqttNetTrace​

​​ 類還提供了4個不同消息等級的靜态方法,​

​Verbose​

​​、​

​Information​

​​、​

​Warning​

​​ 和 ​

​Error​

​​,用于給出不同級别的日志消息,該消息将會在 ​

​TraceMessagePublished​

​​ 事件中輸出,你可以使用 ​

​e.Level​

​ 進行過慮。

7 運作效果

以下分别是服務端、用戶端1和用戶端2的運作效果,其中用戶端1和用戶端2隻是同一個項目運作了兩個執行個體。用戶端1用于訂閱傳感器的“溫度”資料,并模拟上位機(如 APP 等)發送開關控制指令;用戶端2訂閱上位機傳來的“開關”控制指令,并模拟溫度傳感器上報溫度資料。

7.1 服務端

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

7.2 用戶端1

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

7.2 用戶端2

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

8 Demo代碼

8.1 服務端代碼

using MQTTnet;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using System;
using System.Text;
using System.Threading;
 
namespace MqttServerTest
{
    class Program
    {
        private static MqttServer mqttServer = null;
 
        static void Main(string[] args)
        {
            MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;
            new Thread(StartMqttServer).Start();
 
            while (true)
            {
                var inputString = Console.ReadLine().ToLower().Trim();
 
                if (inputString == "exit")
                {
                    mqttServer?.StopAsync();
                    Console.WriteLine("MQTT服務已停止!");
                    break;
                }
                else if (inputString == "clients")
                {
                    foreach (var item in mqttServer.GetConnectedClients())
                    {
                        Console.WriteLine($"用戶端辨別:{item.ClientId},協定版本:{item.ProtocolVersion}");
                    }
                }
                else
                {
                    Console.WriteLine($"指令[{inputString}]無效!");
                }
            }
        }
 
        private static void StartMqttServer()
        {
            if (mqttServer == null)
            {
                try
                {
                    var options = new MqttServerOptions
                    {
                        ConnectionValidator = p =>
                        {
                            if (p.ClientId == "c001")
                            {
                                if (p.Username != "u001" || p.Password != "p001")
                                {
                                    return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                                }
                            }
 
                            return MqttConnectReturnCode.ConnectionAccepted;
                        }
                    };
 
                    mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer;
                    mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;
                    mqttServer.ClientConnected += MqttServer_ClientConnected;
                    mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    return;
                }
            }
 
            mqttServer.StartAsync();
            Console.WriteLine("MQTT服務啟動成功!");
        }
 
        private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
        {
            Console.WriteLine($"用戶端[{e.Client.ClientId}]已連接配接,協定版本:{e.Client.ProtocolVersion}");
        }
 
        private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine($"用戶端[{e.Client.ClientId}]已斷開連接配接!");
        }
 
        private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            Console.WriteLine($"用戶端[{e.ClientId}]>> 主題:{e.ApplicationMessage.Topic} 負荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}");
        }
 
        private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e)
        {
            /*Console.WriteLine($">> 線程ID:{e.ThreadId} 來源:{e.Source} 跟蹤級别:{e.Level} 消息: {e.Message}");
            if (e.Exception != null)
            {
                Console.WriteLine(e.Exception);
            }*/
        }
    }
}      

8.2 用戶端代碼

using MQTTnet;
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
 
namespace MqttClientWin
{
    public partial class FmMqttClient : Form
    {
        private MqttClient mqttClient = null;
 
        public FmMqttClient()
        {
            InitializeComponent();
 
            Task.Run(async () => { await ConnectMqttServerAsync(); });
        }
 
        private async Task ConnectMqttServerAsync()
        {
            if (mqttClient == null)
            {
                mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;
                mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
                mqttClient.Connected += MqttClient_Connected;
                mqttClient.Disconnected += MqttClient_Disconnected;
            }
 
            try
            {
                var options = new MqttClientTcpOptions
                {
                    Server = "127.0.0.1",
                    ClientId = Guid.NewGuid().ToString().Substring(0, 5),
                    UserName = "u001",
                    Password = "p001",
                    CleanSession = true
                };
 
                await mqttClient.ConnectAsync(options);
            }
            catch (Exception ex)
            {
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($"連接配接到MQTT伺服器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine);
                })));
            }
        }
 
        private void MqttClient_Connected(object sender, EventArgs e)
        {
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText("已連接配接到MQTT伺服器!" + Environment.NewLine);
            })));
        }
 
        private void MqttClient_Disconnected(object sender, EventArgs e)
        {
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText("已斷開MQTT連接配接!" + Environment.NewLine);
            })));
        }
 
        private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
            })));
        }
 
        private void BtnSubscribe_ClickAsync(object sender, EventArgs e)
        {
            string topic = txtSubTopic.Text.Trim();
 
            if (string.IsNullOrEmpty(topic))
            {
                MessageBox.Show("訂閱主題不能為空!");
                return;
            }
 
            if (!mqttClient.IsConnected)
            {
                MessageBox.Show("MQTT用戶端尚未連接配接!");
                return;
            }
 
            mqttClient.SubscribeAsync(new List<TopicFilter> {
                new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)
            });
 
            txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);
            txtSubTopic.Enabled = false;
            btnSubscribe.Enabled = false;
        }
 
        private void BtnPublish_Click(object sender, EventArgs e)
        {
            string topic = txtPubTopic.Text.Trim();
 
            if (string.IsNullOrEmpty(topic))
            {
                MessageBox.Show("釋出主題不能為空!");
                return;
            }
 
            string inputString = txtSendMessage.Text.Trim();
            var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);
            mqttClient.PublishAsync(appMsg);
        }
    }
}      

9 本文的Demo下載下傳位址

​​點選下載下傳 Demo​​

​​javascript:void(0)​​

pw的其他原創文章導航

C#的MQTT系列

​​MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)​​

​​MQTT(二)在windows64位上安裝Python環境​​

​​MQTT(三)Python用戶端+net用戶端+net服務端 簡單通信​​

​​MQTT(四)樹莓派開機自動運作Python用戶端​​

​​MQTT(五)EMQ開源MQTT消息伺服器​​

C#的阿裡物聯網平台

​​阿裡物聯網平台(一)Windows系統+VS2017 模拟裝置端接入​​

​​阿裡物聯網平台(二).net 實作移動端(WEB、HTML)與裝置端通訊​​

落地項目

​​落地項目-智慧海綿城市​​

​​落地項目-智能焊機,鋼塑管行業物聯網應用​​

​​手持安卓發票列印一體機,發票列印應用​​

​​省城建設計院智慧海綿城市示範工程​​

目錄

​​MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)​​

​​MQTT(二)在windows64位上安裝Python環境​​

​​MQTT(三)Python用戶端+net用戶端+net服務端 簡單通信​​

​​MQTT(四)樹莓派開機自動運作Python用戶端​​

​​MQTT(五)EMQ開源MQTT消息伺服器​​

1 什麼是 MQTT ?

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。​​MQTT官網​​

2 MQTTnet

​​MQTTnet​​​ 是一個基于 MQTT 通信的高性能 .NET 開源庫,它同時支援 MQTT 伺服器端和用戶端。而且作者也保持更新,目前支援新版的.NET core,這也是選擇 MQTTnet 的原因。 MQTTnet 在 Github 并不是下載下傳最多的 .NET 的 MQTT 開源庫,其他的還 ​​MqttDotNet​​​、​​nMQTT​​​、​​M2MQTT​​ 等

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from ​​http://mqtt.org/​​.

3 建立項目并導入類庫

這裡我們使用 Visual Studio 2017 建立一個空解決方案,并在其中添加兩個項目,即一個服務端和一個用戶端,服務端項目模闆選擇最新的 .NET Core 控制台應用,用戶端項目選擇傳統的 WinForm 窗體應用程式。.NET Core 項目模闆如下圖所示: 

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

在解決方案在右鍵單擊-選擇“管了解決方案的 NuGet 程式包”-在“浏覽”頁籤下面搜尋 MQTTnet,為服務端項目和用戶端項目都安裝上 MQTTnet 庫,目前最新穩定版為 2.4.0。項目結構如下圖所示: 

MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)

4 服務端

MQTT 服務端主要用于與多個用戶端保持連接配接,并處理用戶端的釋出和訂閱等邏輯。一般很少直接從服務端發送消息給用戶端(可以使用 ​

​mqttServer.Publish(appMsg);​

​ 直接發送消息),多數情況下服務端都是轉發主題比對的用戶端消息,在系統中起到一個中介的作用。

4.1 建立服務端并啟動