目錄
MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)
MQTT(二)在windows64位上安裝Python環境
MQTT(三)Python用戶端+net用戶端+net服務端 簡單通信
MQTT(四)樹莓派開機自動運作Python用戶端
MQTT(五)EMQ開源MQTT消息伺服器
1 什麼是 MQTT ?
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。MQTT官網
2 MQTTnet
MQTTnet 是一個基于 MQTT 通信的高性能 .NET 開源庫,它同時支援 MQTT 伺服器端和用戶端。而且作者也保持更新,目前支援新版的.NET core,這也是選擇 MQTTnet 的原因。 MQTTnet 在 Github 并不是下載下傳最多的 .NET 的 MQTT 開源庫,其他的還 MqttDotNet、nMQTT、M2MQTT 等
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
3 建立項目并導入類庫
這裡我們使用 Visual Studio 2017 建立一個空解決方案,并在其中添加兩個項目,即一個服務端和一個用戶端,服務端項目模闆選擇最新的 .NET Core 控制台應用,用戶端項目選擇傳統的 WinForm 窗體應用程式。.NET Core 項目模闆如下圖所示:
在解決方案在右鍵單擊-選擇“管了解決方案的 NuGet 程式包”-在“浏覽”頁籤下面搜尋 MQTTnet,為服務端項目和用戶端項目都安裝上 MQTTnet 庫,目前最新穩定版為 2.4.0。項目結構如下圖所示:
4 服務端
MQTT 服務端主要用于與多個用戶端保持連接配接,并處理用戶端的釋出和訂閱等邏輯。一般很少直接從服務端發送消息給用戶端(可以使用
mqttServer.Publish(appMsg);
直接發送消息),多數情況下服務端都是轉發主題比對的用戶端消息,在系統中起到一個中介的作用。
4.1 建立服務端并啟動
建立服務端最簡單的方式是采用
MqttServerFactory
對象的
CreateMqttServer
方法來實作,該方法需要一個
MqttServerOptions
參數。
var options = new MqttServerOptions();
var mqttServer = new MqttServerFactory().CreateMqttServer(options);
通過上述方式建立了一個
IMqttServer
對象後,調用其
StartAsync
方法即可啟動 MQTT 服務。值得注意的是:之前版本采用的是
Start
方法,作者也是緊跟 C# 語言新特性,能使用異步的地方也都改為異步方式。
await mqttServer.StartAsync();
4.2 驗證用戶端
在
MqttServerOptions
選項中,你可以使用
ConnectionValidator
來對用戶端連接配接進行驗證。比如用戶端ID辨別
ClientId
,使用者名
Username
和密碼
Password
等。
var options = new MqttServerOptions
{
ConnectionValidator = c =>
{
if (c.ClientId.Length < 10)
{
return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
}
if (c.Username != "xxx" || c.Password != "xxx")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
return MqttConnectReturnCode.ConnectionAccepted;
}
};
4.3 相關事件
服務端支援
ClientConnected
、
ClientDisconnected
和
ApplicationMessageReceived
事件,分别用來檢查用戶端連接配接、用戶端斷開以及接收用戶端發來的消息。
其中
ClientConnected
和
ClientDisconnected
事件的事件參數一個用戶端連接配接對象
ConnectedMqttClient
,通過該對象可以擷取用戶端ID辨別
ClientId
和 MQTT 版本
ProtocolVersion
。
ApplicationMessageReceived
的事件參數包含了用戶端ID辨別
ClientId
和 MQTT 應用消息
MqttApplicationMessage
對象,通過該對象可以擷取主題
Topic
、QoS
QualityOfServiceLevel
和消息内容
Payload
等資訊。
5 用戶端
MQTT 與 HTTP 不同,後者是基于請求/響應方式的,伺服器端無法直接發送資料給用戶端。而 MQTT 是基于釋出/訂閱模式的,所有的用戶端均與服務端保持連接配接狀态。
那麼用戶端之間是如何通信的呢?
具體邏輯是:某些用戶端向服務端訂閱它感興趣(主題)的消息,另一些用戶端向服務端釋出(主題)消息,服務端将訂閱和釋出的主題進行比對,并将消息轉發給比對通過的用戶端。
5.1 建立用戶端并連接配接
使用 MQTTnet 建立 MQTT 也非常簡單,隻需要使用
MqttClientFactory
對象的
CreateMqttClient
方法即可。
var mqttClient = new MqttClientFactory().CreateMqttClient();
建立用戶端對象後,調用其異步方法
ConnectAsync
來連接配接到服務端。
await mqttClient.ConnectAsync(options);
調用該方法時需要傳遞一個
MqttClientTcpOptions
對象(之前的版本是在建立對象時使用該選項),該選項包含了用戶端ID辨別
ClientId
、服務端位址(可以使用IP位址或域名)
Server
、端口号
Port
、使用者名
UserName
、密碼
Password
等資訊。
var options = new MqttClientTcpOptions
{
Server = "127.0.0.1",
ClientId = "c001",
UserName = "u001",
Password = "p001",
CleanSession = true
};
5.2 相關事件
用戶端支援
Connected
、
Disconnected
和
ApplicationMessageReceived
事件,用來處理用戶端與服務端連接配接、用戶端從服務端斷開以及用戶端收到消息的事情。
5.2 訂閱消息
用戶端連接配接到服務端之後,可以使用
SubscribeAsync
異步方法訂閱消息,該方法可以傳入一個可枚舉或可變參數的主題過濾器
TopicFilter
參數,主題過濾器包含主題名和 QoS 等級。
mqttClient.SubscribeAsync(new List<TopicFilter> {
new TopicFilter("家/客廳/空調/#", MqttQualityOfServiceLevel.AtMostOnce)
});
5.3 釋出消息
釋出消息前需要先建構一個消息對象
MqttApplicationMessage
,最直接的方法是使用其實構造函數,傳入主題、内容、Qos 等參數。
mqttClient.SubscribeAsync(new List<TopicFilter> {
new TopicFilter("家/客廳/空調/#", MqttQualityOfServiceLevel.AtMostOnce)
});
得到
MqttApplicationMessage
消息對象後,通過用戶端對象調用其
PublishAsync
異步方法進行消息釋出。
mqttClient.PublishAsync(appMsg);
6 跟蹤消息
MQTTnet
提供了一個靜态類
MqttNetTrace
來對消息進行跟蹤,該類可用于服務端和用戶端。
MqttNetTrace
的事件
TraceMessagePublished
用于跟蹤服務端和用戶端應用的日志消息,比如啟動、停止、心跳、消息訂閱和釋出等。事件參數
MqttNetTraceMessagePublishedEventArgs
包含了線程ID
ThreadId
、來源
Source
、日志級别
Level
、日志消息
Message
、異常資訊
Exception
等。
MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;
private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e)
{
Console.WriteLine($">> 線程ID:{e.ThreadId} 來源:{e.Source} 跟蹤級别:{e.Level} 消息: {e.Message}");
if (e.Exception != null)
{
Console.WriteLine(e.Exception);
}
}
同時
MqttNetTrace
類還提供了4個不同消息等級的靜态方法,
Verbose
、
Information
、
Warning
和
Error
,用于給出不同級别的日志消息,該消息将會在
TraceMessagePublished
事件中輸出,你可以使用
e.Level
進行過慮。
7 運作效果
以下分别是服務端、用戶端1和用戶端2的運作效果,其中用戶端1和用戶端2隻是同一個項目運作了兩個執行個體。用戶端1用于訂閱傳感器的“溫度”資料,并模拟上位機(如 APP 等)發送開關控制指令;用戶端2訂閱上位機傳來的“開關”控制指令,并模拟溫度傳感器上報溫度資料。
7.1 服務端
7.2 用戶端1
7.2 用戶端2
8 Demo代碼
8.1 服務端代碼
using MQTTnet;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using System;
using System.Text;
using System.Threading;
namespace MqttServerTest
{
class Program
{
private static MqttServer mqttServer = null;
static void Main(string[] args)
{
MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;
new Thread(StartMqttServer).Start();
while (true)
{
var inputString = Console.ReadLine().ToLower().Trim();
if (inputString == "exit")
{
mqttServer?.StopAsync();
Console.WriteLine("MQTT服務已停止!");
break;
}
else if (inputString == "clients")
{
foreach (var item in mqttServer.GetConnectedClients())
{
Console.WriteLine($"用戶端辨別:{item.ClientId},協定版本:{item.ProtocolVersion}");
}
}
else
{
Console.WriteLine($"指令[{inputString}]無效!");
}
}
}
private static void StartMqttServer()
{
if (mqttServer == null)
{
try
{
var options = new MqttServerOptions
{
ConnectionValidator = p =>
{
if (p.ClientId == "c001")
{
if (p.Username != "u001" || p.Password != "p001")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}
return MqttConnectReturnCode.ConnectionAccepted;
}
};
mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer;
mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;
mqttServer.ClientConnected += MqttServer_ClientConnected;
mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return;
}
}
mqttServer.StartAsync();
Console.WriteLine("MQTT服務啟動成功!");
}
private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
{
Console.WriteLine($"用戶端[{e.Client.ClientId}]已連接配接,協定版本:{e.Client.ProtocolVersion}");
}
private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
{
Console.WriteLine($"用戶端[{e.Client.ClientId}]已斷開連接配接!");
}
private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Console.WriteLine($"用戶端[{e.ClientId}]>> 主題:{e.ApplicationMessage.Topic} 負荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}");
}
private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e)
{
/*Console.WriteLine($">> 線程ID:{e.ThreadId} 來源:{e.Source} 跟蹤級别:{e.Level} 消息: {e.Message}");
if (e.Exception != null)
{
Console.WriteLine(e.Exception);
}*/
}
}
}
8.2 用戶端代碼
using MQTTnet;
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MqttClientWin
{
public partial class FmMqttClient : Form
{
private MqttClient mqttClient = null;
public FmMqttClient()
{
InitializeComponent();
Task.Run(async () => { await ConnectMqttServerAsync(); });
}
private async Task ConnectMqttServerAsync()
{
if (mqttClient == null)
{
mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;
mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
mqttClient.Connected += MqttClient_Connected;
mqttClient.Disconnected += MqttClient_Disconnected;
}
try
{
var options = new MqttClientTcpOptions
{
Server = "127.0.0.1",
ClientId = Guid.NewGuid().ToString().Substring(0, 5),
UserName = "u001",
Password = "p001",
CleanSession = true
};
await mqttClient.ConnectAsync(options);
}
catch (Exception ex)
{
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($"連接配接到MQTT伺服器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine);
})));
}
}
private void MqttClient_Connected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText("已連接配接到MQTT伺服器!" + Environment.NewLine);
})));
}
private void MqttClient_Disconnected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText("已斷開MQTT連接配接!" + Environment.NewLine);
})));
}
private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
})));
}
private void BtnSubscribe_ClickAsync(object sender, EventArgs e)
{
string topic = txtSubTopic.Text.Trim();
if (string.IsNullOrEmpty(topic))
{
MessageBox.Show("訂閱主題不能為空!");
return;
}
if (!mqttClient.IsConnected)
{
MessageBox.Show("MQTT用戶端尚未連接配接!");
return;
}
mqttClient.SubscribeAsync(new List<TopicFilter> {
new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)
});
txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);
txtSubTopic.Enabled = false;
btnSubscribe.Enabled = false;
}
private void BtnPublish_Click(object sender, EventArgs e)
{
string topic = txtPubTopic.Text.Trim();
if (string.IsNullOrEmpty(topic))
{
MessageBox.Show("釋出主題不能為空!");
return;
}
string inputString = txtSendMessage.Text.Trim();
var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);
mqttClient.PublishAsync(appMsg);
}
}
}
9 本文的Demo下載下傳位址
點選下載下傳 Demo
javascript:void(0)
pw的其他原創文章導航
C#的MQTT系列
MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)
MQTT(二)在windows64位上安裝Python環境
MQTT(三)Python用戶端+net用戶端+net服務端 簡單通信
MQTT(四)樹莓派開機自動運作Python用戶端
MQTT(五)EMQ開源MQTT消息伺服器
C#的阿裡物聯網平台
阿裡物聯網平台(一)Windows系統+VS2017 模拟裝置端接入
阿裡物聯網平台(二).net 實作移動端(WEB、HTML)與裝置端通訊
落地項目
落地項目-智慧海綿城市
落地項目-智能焊機,鋼塑管行業物聯網應用
手持安卓發票列印一體機,發票列印應用
省城建設計院智慧海綿城市示範工程
目錄
MQTT(一)C#使用 MQTTnet 快速實作 MQTT 通信(文末有完整Demo下載下傳)
MQTT(二)在windows64位上安裝Python環境
MQTT(三)Python用戶端+net用戶端+net服務端 簡單通信
MQTT(四)樹莓派開機自動運作Python用戶端
MQTT(五)EMQ開源MQTT消息伺服器
1 什麼是 MQTT ?
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。MQTT官網
2 MQTTnet
MQTTnet 是一個基于 MQTT 通信的高性能 .NET 開源庫,它同時支援 MQTT 伺服器端和用戶端。而且作者也保持更新,目前支援新版的.NET core,這也是選擇 MQTTnet 的原因。 MQTTnet 在 Github 并不是下載下傳最多的 .NET 的 MQTT 開源庫,其他的還 MqttDotNet、nMQTT、M2MQTT 等
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
3 建立項目并導入類庫
這裡我們使用 Visual Studio 2017 建立一個空解決方案,并在其中添加兩個項目,即一個服務端和一個用戶端,服務端項目模闆選擇最新的 .NET Core 控制台應用,用戶端項目選擇傳統的 WinForm 窗體應用程式。.NET Core 項目模闆如下圖所示:
在解決方案在右鍵單擊-選擇“管了解決方案的 NuGet 程式包”-在“浏覽”頁籤下面搜尋 MQTTnet,為服務端項目和用戶端項目都安裝上 MQTTnet 庫,目前最新穩定版為 2.4.0。項目結構如下圖所示:
4 服務端
MQTT 服務端主要用于與多個用戶端保持連接配接,并處理用戶端的釋出和訂閱等邏輯。一般很少直接從服務端發送消息給用戶端(可以使用
mqttServer.Publish(appMsg);
直接發送消息),多數情況下服務端都是轉發主題比對的用戶端消息,在系統中起到一個中介的作用。