天天看点

.NET Core MQTT DEMO

什么是MQTT

  MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。简单来说HQTT是一种通信协议,要实现发布/订阅就必须遵循这个协议。

二、实现MQTT通讯协议.NET开源库有哪些?

  MQTTnet、MqttDotNet、nMQTT、M2MQTT等,这里我们使用MQTTnet(但MQTTnet搜到的教程基本都是2.7及以下版本的,我们使用的是3.0.9版本)

  官网项目网址:https://github.com/chkr1011/MQTTnet

三、展示MQTT实现效果图

  

.NET Core MQTT DEMO

  例:客户端1只要订阅了positon主题,客户端2、客户端3、客户端4.....同样订阅了position主题则他们之间就能共享position主题的所发的内容了。

  如果客户端1订阅了position主题,客户端2订阅了beautiful主题,1发给消息2是收不到的。

四、创建.NETCore项目(Server和Client)

.NET Core MQTT DEMO

五、服务器

  添加Nuget包:安装MQTTnet

.NET Core MQTT DEMO

class Program

{

public static IMqttServer mqttServer;

static void Main(string[] args)

{

StartMqttServer();

}

//启动Mqtt服务器

private static async void StartMqttServer()

try

{

//验证客户端信息

var options = new MqttServerOptions

{

//连接验证

ConnectionValidator = new MqttServerConnectionValidatorDelegate(p =>

{

if (p.ClientId == "SpecialClient")

{

if (p.Username != "USER" || p.Password != "PASS")

{

p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;

}

}

})

};

//设置端口号

options.DefaultEndpointOptions.Port = 8031;

//创建Mqtt服务器

mqttServer = new MqttFactory().CreateMqttServer();

//开启订阅事件

mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);

//取消订阅事件

mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);

//客户端消息事件

mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceived);

//客户端连接事件

mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);

//客户端断开事件

mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);

//启动服务器

await mqttServer.StartAsync(options);

Console.WriteLine("服务器启动成功!输入任意内容并回车停止服务!");

Console.ReadLine();

await mqttServer.StopAsync();

}

catch (Exception e)

Console.Write($"服务器启动失败 Msg:{e}");

/// <summary>

/// 客户订阅

/// </summary>

private static void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)

//客户端Id

var ClientId = e.ClientId;

var Topic = e.TopicFilter.Topic;

Console.WriteLine($"客户端[{ClientId}]已订阅主题:{Topic}");

/// 客户取消订阅

private static void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)

var Topic = e.TopicFilter;

Console.WriteLine($"客户端[{ClientId}]已取消订阅主题:{Topic}");

/// 接收消息

private static void MqttServe_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)

var Topic = e.ApplicationMessage.Topic;

var Payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);

var Qos = e.ApplicationMessage.QualityOfServiceLevel;

var Retain = e.ApplicationMessage.Retain;

Console.WriteLine($"客户端[{ClientId}]>> 主题:[{Topic}] 负载:[{Payload}] Qos:[{Qos}] 保留:[{Retain}]");

/// 客户连接

private static void MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)

Console.WriteLine($"客户端[{ClientId}]已连接");

/// 客户连接断开

private static void MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)

Console.WriteLine($"客户端[{ClientId}]已断开连接");

}

六、客户端

public static IMqttClient mqttClient;

ConnectMqttServerAsync();

ImportData();

private static async void ConnectMqttServerAsync()

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient();

var options = new MqttClientOptionsBuilder()

.WithTcpServer("127.0.0.1", 8031)

.WithCredentials("test", "test")

.WithClientId(Guid.NewGuid().ToString().Substring(0, 5))

.Build();

//消息

mqttClient.UseApplicationMessageReceivedHandler(e =>

Console.WriteLine("### 收到的信息 ###");

Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");//主题

Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");//页面信息

Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");//消息等级

Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");//是否保留

Console.WriteLine();

});

//重连机制

mqttClient.UseDisconnectedHandler(async e =>

Console.WriteLine("与服务器断开连接!");

await Task.Delay(TimeSpan.FromSeconds(5));

try

await mqttClient.ConnectAsync(options);

}

catch (Exception exp)

Console.Write($"重新连接服务器失败 Msg:{exp}");

await mqttClient.ConnectAsync(options);

Console.Write("连接服务器成功!输入任意内容并回车进入菜单页面!");

catch (Exception exp)

Console.Write($"连接服务器失败 Msg:{exp}");

private static void ImportData()

Console.ReadLine();

bool isExit = false;

while (!isExit)

Console.WriteLine(@"请输入

1.订阅主题

2.取消订阅

3.发送消息

4.退出");

var input = Console.ReadLine();

switch (input)

case "1":

Console.WriteLine(@"请输入主题名称:");

var topicName = Console.ReadLine();

Subscribe(topicName);

break;

case "2":

Console.WriteLine(@"请输入需要取消订阅主题名称:");

topicName = Console.ReadLine();

Unsubscribe(topicName);

case "3":

Console.WriteLine("请输入需要发送的主题名称");

Console.WriteLine("请输入需要发送的消息");

var message = Console.ReadLine();

Publish(topicName, message);

case "4":

isExit = true;

default:

Console.WriteLine("请输入正确指令!");

}

/// 订阅

/// <param name="topicName"></param>

private static async void Subscribe(string topicName)

string topic = topicName.Trim();

if (string.IsNullOrEmpty(topic))

Console.Write("订阅主题不能为空!");

return;

if (!mqttClient.IsConnected)

Console.Write("MQTT客户端尚未连接!");

await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());

/// 取消订阅

private static async void Unsubscribe(string topicName)

await mqttClient.UnsubscribeAsync(topic);

/// 发送消息

/// <param name="message"></param>

private static async void Publish(string topicName, string message)

string msg = message.Trim();

Console.Write("主题不能为空!");

var MessageBuilder = new MqttApplicationMessageBuilder()

.WithTopic(topic)

.WithPayload(msg)

.WithExactlyOnceQoS()

.WithRetainFlag()

.Build();

await mqttClient.PublishAsync(MessageBuilder);