什麼是MQTT?
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。
目前很多工廠都架設了MQTT伺服器,用于接收裝置上傳上來的實時資料
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協定),是一種基于釋出/訂閱(publish/subscribe)模式的“輕量級”通訊協定,該協定建構于TCP/IP協定上,由IBM在1999年釋出。
劃重點的地方就是 1.釋出、訂閱模式,2.輕量級,3.TCP/IP 協定;基于上面3個特點,他很友善地運用于物聯網(IOT)。可以通過MQTT 協定将各個裝置連接配接起來;
釋出訂閱模式; 基于topic, 通過MQTT的Broker 的模式
上圖名詞解析:Broker [經紀人]
從上圖可以看到,又3個部分組成: MQTT 伺服器端,釋出端,訂閱端;我們在工廠自動化的項目中,能用到的基本也是 編寫釋出端,訂閱端的代碼;
C#中github排名比較高的第三庫mqttnet;下面用這個來簡單實作一個mqtt的伺服器跟用戶端。
建立服務端代碼:
建立一個控制台應用程式 MqttServerTest,安裝mqttnet包
Install-package mqttnet
private static async Task RunServer()
{
var mqttFactory = new MqttFactory();
//預設mqtt端口為1833
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
{
//監聽用戶端的連接配接,斷開連接配接
mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
//開啟伺服器
await mqttServer.StartAsync();
await Console.Out.WriteLineAsync("Mqtt伺服器啟動成功:127.0.0.1:1883");
Console.WriteLine("Press Enter to Exit");
Console.ReadLine();
await mqttServer.StopAsync();
}
}
private static Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.Out.WriteLineAsync("==============");
return Console.Out.WriteLineAsync("用戶端斷開連接配接:" + arg.ClientId);
}
private static Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.Out.WriteLineAsync("==============");
return Console.Out.WriteLineAsync("用戶端連接配接:" + arg.ClientId);
}
static void Main(string[] args)
{
_ = RunServer();
}
建立一個Mqtt 釋出消息用戶端MqttPublishTest,
代碼如下:
//建立釋出消息用戶端
private static async Task CreateMqttClient()
{
try
{
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithClientId("Client-publish")
.WithTcpServer("127.0.0.1", 1883)
.WithUserProperty("admin", "admin")
.WithCleanSession().Build();
mqttClient.ConnectAsync(options).Wait();
Console.WriteLine("連接配接MqttServer成功...");
while (true)
{
Console.WriteLine("請輸入mqtt top[t1/t2]:");
string topic = Console.ReadLine();
Console.WriteLine("請輸入Mqtt-Playload");
var playload = Console.ReadLine();
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(playload)
.WithRetainFlag(true)
.Build();
await mqttClient.PublishAsync(message, CancellationToken.None);
Console.WriteLine("==釋出消息成功==");
}
}catch(Exception ex)
{
Console.WriteLine(ex.Message);
}
}
/// <summary>
/// 釋出消息用戶端
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
Task task = CreateMqttClient();
Console.ReadLine();
}
建立一個Mqtt 訂閱消息用戶端MqttSubTest,
代碼如下:
public static async Task CreateClientPublish()
{
try
{
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithClientId("Client-Sub")
.WithTcpServer("127.0.0.1", 1883)
.WithUserProperty("admin", "admin")
.WithCleanSession().Build();
mqttClient.ConnectAsync(options).Wait();
await Console.Out.WriteLineAsync("連接配接MqttServer成功");
await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("t1").Build());
await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("t2").Build());
await Console.Out.WriteLineAsync("訂閱消息成功,訂閱的Topic為:t1,t2");
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
}catch(Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
private static Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine("收到消息:");
Console.WriteLine(#34;topic :{arg.ApplicationMessage.Topic}");
Console.WriteLine(#34;playload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}");
Console.WriteLine(#34;Qos:{arg.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine(#34;Retain:{arg.ApplicationMessage.Retain}");
return Task.CompletedTask;
}
static void Main(string[] args)
{
Task task = CreateClientPublish();
Console.ReadKey();
}
依次運作:Server, Publish,Subscribe3個程式,結果如下:
在釋出用戶端,輸入釋出的topic, playload即可看到訂閱的用戶端收到了釋出的消息;
到此,我們用Mqttnet完成了一個簡單的測試;是不是很簡單呢?