天天看點

c# 基礎技能之-MQTT

作者:工控軟體坑把子

什麼是MQTT?

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。

目前很多工廠都架設了MQTT伺服器,用于接收裝置上傳上來的實時資料

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協定),是一種基于釋出/訂閱(publish/subscribe)模式的“輕量級”通訊協定,該協定建構于TCP/IP協定上,由IBM在1999年釋出。

劃重點的地方就是 1.釋出、訂閱模式,2.輕量級,3.TCP/IP 協定;基于上面3個特點,他很友善地運用于物聯網(IOT)。可以通過MQTT 協定将各個裝置連接配接起來;

c# 基礎技能之-MQTT

釋出訂閱模式; 基于topic, 通過MQTT的Broker 的模式

上圖名詞解析:Broker [經紀人]

從上圖可以看到,又3個部分組成: MQTT 伺服器端,釋出端,訂閱端;我們在工廠自動化的項目中,能用到的基本也是 編寫釋出端,訂閱端的代碼;

C#中github排名比較高的第三庫mqttnet;下面用這個來簡單實作一個mqtt的伺服器跟用戶端。

建立服務端代碼:

建立一個控制台應用程式 MqttServerTest,安裝mqttnet包

Install-package mqttnet           
private static async Task RunServer()
{
  var mqttFactory = new MqttFactory();
  //預設mqtt端口為1833
  var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();

  using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
  {
    //監聽用戶端的連接配接,斷開連接配接
    mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
    mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
    //開啟伺服器
    await mqttServer.StartAsync();
    await Console.Out.WriteLineAsync("Mqtt伺服器啟動成功:127.0.0.1:1883");
    Console.WriteLine("Press Enter to Exit");
    Console.ReadLine();
    await mqttServer.StopAsync();
  }
}

private static Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
  Console.Out.WriteLineAsync("==============");
  return Console.Out.WriteLineAsync("用戶端斷開連接配接:" + arg.ClientId);
}

private static Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
  Console.Out.WriteLineAsync("==============");
  return Console.Out.WriteLineAsync("用戶端連接配接:" + arg.ClientId);
}

static void Main(string[] args)
{
  _ = RunServer();
}           

建立一個Mqtt 釋出消息用戶端MqttPublishTest,

代碼如下:

//建立釋出消息用戶端
private static async Task CreateMqttClient()
{
  try
  {
    var factory = new MqttFactory();
    var mqttClient = factory.CreateMqttClient();
    var options = new MqttClientOptionsBuilder()
    .WithClientId("Client-publish")
    .WithTcpServer("127.0.0.1", 1883)
    .WithUserProperty("admin", "admin")
    .WithCleanSession().Build();
    mqttClient.ConnectAsync(options).Wait();
    Console.WriteLine("連接配接MqttServer成功...");
    while (true)
    {
      Console.WriteLine("請輸入mqtt top[t1/t2]:");
      string topic = Console.ReadLine();
      Console.WriteLine("請輸入Mqtt-Playload");
      var playload = Console.ReadLine();
      var message = new MqttApplicationMessageBuilder()
      .WithTopic(topic)
      .WithPayload(playload)
      .WithRetainFlag(true)
      .Build();
      await mqttClient.PublishAsync(message, CancellationToken.None);
      Console.WriteLine("==釋出消息成功==");
    }

  }catch(Exception ex)
  {
    Console.WriteLine(ex.Message);
  }
}
/// <summary>
/// 釋出消息用戶端
/// </summary>
/// <param name="args"></param>
static  void Main(string[] args)
{
  Task task = CreateMqttClient();
  Console.ReadLine();
}           

建立一個Mqtt 訂閱消息用戶端MqttSubTest,

代碼如下:

public static async Task CreateClientPublish()
{
  try
  {
    var factory = new MqttFactory();
    var mqttClient = factory.CreateMqttClient();
    var options = new MqttClientOptionsBuilder()
    .WithClientId("Client-Sub")
    .WithTcpServer("127.0.0.1", 1883)
    .WithUserProperty("admin", "admin")
    .WithCleanSession().Build();
    mqttClient.ConnectAsync(options).Wait();
    await Console.Out.WriteLineAsync("連接配接MqttServer成功");
    await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("t1").Build());
    await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("t2").Build());
    await Console.Out.WriteLineAsync("訂閱消息成功,訂閱的Topic為:t1,t2");
    mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
  }catch(Exception ex)
  {
    Console.WriteLine(ex.ToString());
  }
}

private static Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
  Console.WriteLine("收到消息:");
  Console.WriteLine(#34;topic :{arg.ApplicationMessage.Topic}");
  Console.WriteLine(#34;playload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}");
  Console.WriteLine(#34;Qos:{arg.ApplicationMessage.QualityOfServiceLevel}");
  Console.WriteLine(#34;Retain:{arg.ApplicationMessage.Retain}");
  return Task.CompletedTask;
}

static void Main(string[] args)
{
  Task task = CreateClientPublish();
  Console.ReadKey();
}           

依次運作:Server, Publish,Subscribe3個程式,結果如下:

c# 基礎技能之-MQTT

在釋出用戶端,輸入釋出的topic, playload即可看到訂閱的用戶端收到了釋出的消息;

到此,我們用Mqttnet完成了一個簡單的測試;是不是很簡單呢?