前言: MQTT廣泛應用于工業物聯網、智能家居、各類智能制造或各類自動化場景等。MQTT是一個基于用戶端-伺服器的消息釋出/訂閱傳輸協定,在很多受限的環境下,比如說機器與機器通信、機器與物聯網通信等。好了,科普的廢話不多說,下面直接通過.NET環境來實作一套MQTT通信demo,實作服務端與用戶端的雙邊消息釋出與訂閱的功能和示範。
開發環境:
VS2022 + .NET 6 + Webapi / 控制台
1、建立一個webapi項目,用來後面做測試使用
2、建立一個繼承自IHostedService的服務,用于随着webapi程式的啟動而自動執行。(最終代碼在文末)
3、引入 MQTTNet 包,該項目提供了.net環境下的MQTT通信協定支援,這款架構很優秀,此處直接引用它來進行使用。
4、在上面的MqttHostService類裡面,開始方法裡面新增初始化MQTT服務端的一些功能,例如 IP、端口号、事件等等。
5、mqtt服務端支援的一系列功能很多,大佬們可以自行去嘗試一些新發現,此處隻使用若幹個簡單功能。
6、添加用戶端連接配接事件、連接配接關閉事件
7、由于事件要用的可能有點多,此處就不一一例舉了,可以直接看以下的代碼,以及有關注釋來了解。
8、事件觸發時候,列印輸出
9、輸出之前,記錄一個目前事件名稱标記一下,用于可以更加清楚看出是哪個事件輸出的。
10、對MqttHostService類進行注冊,用于程式啟動時候跟随啟動。
11、上面貌似設計的不是很友好,是以把mqtt服務執行個體單獨弄出來,寫入到單獨的類裡面做成屬性,供友善調用。
12、把先前的一些東西改一下,換成使用上面步驟的屬性來直接調用使用。
13、運作一下,看看是否可以成功,顯示服務已啟動,說明服務啟動時OK的了.
14、新增一個控制台程式 MqttClient,用于模拟用戶端。
15、建立用戶端啟動以及有關配置資訊和有關事件,如圖。具體使用可以看代碼注釋,就不過多解釋了。
16、在program類裡面,調用用戶端啟動方法,用于測試使用。
17、上面用戶端對應的三個事件的實作如圖,同時進行有關資訊的列印輸出。
18、啟動服務端,然後啟動用戶端,可以看到服務端有一個連接配接失敗的消息,這個是因為上面配置的用戶端使用者名是admin,密碼是1234567,而服務端配置的規則是,使用者名是admin 密碼是123456
19、密碼改回正常比對項以後,再重新運作試試看,可以看到用戶端與服務端連接配接上了。
20、如果關閉用戶端,也可以看到服務端會進入用戶端關閉事件内。
21、把上面主題訂閱的内容寫到連接配接成功以後的事件裡面,不然用戶端連接配接期間,可能就執行了主題訂閱,會存在訂閱失敗的情況。改為寫入到連接配接成功以後的事件裡面,可以保證主題訂閱肯定是在用戶端連接配接成功以後才執行的。
22、接下來測試服務端消息推送,在MqttService服務裡面,新增一個方法,用來執行mqtt服務端釋出主題消息使用。有關配置資訊和消息格式,如圖所示。
23、新增一個API控制器,用來測試使用。API參數直接拿來進行消息的推送使用。
24、運作服務端和用戶端,并通路剛剛新增的api接口,手動随意輸入一條消息,可以看到用戶端訂閱的主題消息已經被實時接收到了。
25、接下來對用戶端新增一個消息推送的方法,用來測試用戶端消息釋出的功能。有關消息格式和調用,如圖所示,以及注釋部分的說明。
26、用戶端program類裡面,用戶端連接配接以後,通過手動回車,來執行用戶端釋出消息。
27、再次啟動服務端和用戶端
28、然後用戶端内按一下回車,執行消息釋出功能。可以看到,服務端成功接收到了用戶端發過來的主題消息。
29、接下來測試用戶端與用戶端之間的消息釋出與訂閱,為了模拟多用戶端效果,把上面用戶端已經編譯好的檔案拷貝一份出來。
30、然後本地的代碼進行一些修改,用來當做第二個用戶端程式。是以用戶端id也進行變更為 testclient02
31、對用戶端訂閱的主題,也改成 topic_02
32、啟動服務端,以及拷貝出來的用戶端1,和上面修改了部分代碼的用戶端2,保證都已經連接配接上服務端。
33、調用服務端的api接口,由于服務端釋出的消息是釋出給topic_01的,是以隻有用戶端1可以接收到消息。
34、用戶端1執行回車,用于釋出一段消息給主題 topic_02,可以看到用戶端01釋出的消息,同時被服務端和用戶端02接收到了。因為服務端是總指揮,是以用戶端釋出的消息都會經過服務端,進而服務端都可以接收到連接配接的用戶端釋出的所有消息。
35、測試資料保持,下面先對用戶端1進行斷開,然後再重新連接配接用戶端1,可以看到用戶端1直接接收到了它訂閱的主題的上一次最新的消息内容,這個就是消息裡面,Retain屬性設為True的結果,用于讓服務端記憶該主題消息使用的。如果設為false,就沒有這個效果了,大佬們也可以自己嘗試。
36、最終的服務端代碼:
MqttHostService:
public class MqttHostService : IHostedService, IDisposable
{
public void Dispose()
{
}
const string ServerClientId = "SERVER";
public Task StartAsync(CancellationToken cancellationToken)
{
MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
optionsBuilder.WithDefaultEndpoint();
optionsBuilder.WithDefaultEndpointPort(10086); // 設定 服務端 端口号
optionsBuilder.WithConnectionBacklog(1000); // 最大連接配接數
MqttServerOptions options = optionsBuilder.Build();
MqttService._mqttServer = new MqttFactory().CreateMqttServer(options);
MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //用戶端連接配接事件
MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 用戶端關閉事件
MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件
MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 用戶端訂閱主題事件
MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 用戶端取消訂閱事件
MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 啟動後事件
MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 關閉後事件
MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件
MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 使用者名和密碼驗證有關
MqttService._mqttServer.StartAsync();
return Task.CompletedTask;
}
/// <summary>
/// 用戶端訂閱主題事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientSubscribedTopicAsync:用戶端ID=【{arg.ClientId}】訂閱的主題=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 關閉後事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StoppedAsync(EventArgs arg)
{
Console.WriteLine($"StoppedAsync:MQTT服務已關閉……");
return Task.CompletedTask;
}
/// <summary>
/// 使用者名和密碼驗證有關
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
arg.ReasonCode = MqttConnectReasonCode.Success;
if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
{
arg.ReasonCode = MqttConnectReasonCode.Banned;
Console.WriteLine($"ValidatingConnectionAsync:用戶端ID=【{arg.ClientId}】使用者名或密碼驗證錯誤 ");
}
return Task.CompletedTask;
}
/// <summary>
/// 消息接收事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
if (string.Equals(arg.ClientId, ServerClientId))
{
return Task.CompletedTask;
}
Console.WriteLine($"InterceptingPublishAsync:用戶端ID=【{arg.ClientId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 啟動後事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StartedAsync(EventArgs arg)
{
Console.WriteLine($"StartedAsync:MQTT服務已啟動……");
return Task.CompletedTask;
}
/// <summary>
/// 用戶端取消訂閱事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientUnsubscribedTopicAsync:用戶端ID=【{arg.ClientId}】已取消訂閱的主題=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageNotConsumedAsync:發送端ID=【{arg.SenderId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 用戶端斷開時候觸發
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine($"ClientDisconnectedAsync:用戶端ID=【{arg.ClientId}】已斷開, 位址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 用戶端連接配接時候觸發
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine($"ClientConnectedAsync:用戶端ID=【{arg.ClientId}】已連接配接, 使用者名=【{arg.UserName}】位址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
MqttService:
public class MqttService
{
public static MqttServer _mqttServer { get; set; }
public static void PublishData(string data)
{
var message = new MqttApplicationMessage
{
Topic = "topic_01",
Payload = Encoding.Default.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true // 服務端是否保留消息。true為保留,如果有新的訂閱者連接配接,就會立馬收到該消息。
};
_mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 發送消息給有訂閱 topic_01的用戶端
{
SenderClientId = "Server_01"
}).GetAwaiter().GetResult();
}
}
37、最終的用戶端代碼:
MqttClientService:
public class MqttClientService
{
public static IMqttClient _mqttClient;
public void MqttClientStart()
{
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 10086) // 要通路的mqtt服務端的 ip 和 端口号
.WithCredentials("admin", "123456") // 要通路的mqtt服務端的使用者名和密碼
.WithClientId("testclient02") // 設定用戶端id
.WithCleanSession()
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = false // 是否使用 tls加密
});
var clientOptions = optionsBuilder.Build();
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 用戶端連接配接成功事件
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 用戶端連接配接關閉事件
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
_mqttClient.ConnectAsync(clientOptions);
}
/// <summary>
/// 用戶端連接配接關閉事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine($"用戶端已斷開與服務端的連接配接……");
return Task.CompletedTask;
}
/// <summary>
/// 用戶端連接配接成功事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine($"用戶端已連接配接服務端……");
// 訂閱消息主題
// MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确認收到消息,并且消息不被發送者存儲和重新發送提供與底層 TCP 協定相同的保證。
// 1: 保證一條消息至少有一次會傳遞給接收方。發送方存儲消息,直到它從接收方收到确認收到消息的資料包。一條消息可以多次發送或傳遞。
// 2: 保證每條消息僅由預期的收件人接收一次。級别2是最安全和最慢的服務品質級别,保證由發送方和接收方之間的至少兩個請求/響應(四次握手)。
_mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);
return Task.CompletedTask;
}
/// <summary>
/// 收到消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageReceivedAsync:用戶端ID=【{arg.ClientId}】接收到消息。 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
public void Publish(string data)
{
var message = new MqttApplicationMessage
{
Topic = "topic_02",
Payload = Encoding.Default.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true // 服務端是否保留消息。true為保留,如果有新的訂閱者連接配接,就會立馬收到該消息。
};
_mqttClient.PublishAsync(message);
}
}
38、後記:MQTT以上示範已經完畢,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加靈活。其實,實際上MQTT的用戶端在現實生産環境場景下,并不需要咱們開發者進行開發,很多硬體裝置都支援提供MQTT協定的通信用戶端,是以隻需要自己搭建一個服務端,就可以實作實時監控各種裝置推送過來的各種信号資料。同時用戶端支援釋出消息給其他用戶端,是以就實作了裝置與裝置之間的一對一信号通信的效果了。如果需要下發信号給硬體裝置,MQTT服務端也可以直接下發給某個指定裝置來進行實作即可。上面案例隻提供入門方案,如果有感興趣的大佬,可以自己去拓展一下,來達到更好的效果。
39、以上就是該篇文章的所有内容。如果覺得有幫助,歡迎轉發、點贊、推薦和評論留言。大佬們的鼓勵,是我不斷繼續創作部落格的動力之一。如果有興趣一起探索更多.net 技術,歡迎點選下方qq群,加入一起吹牛談人生。或者掃描下面我個人微信名片二維碼加我好友,我也可以拉你到微信.net交流群。如果沒有找到二維碼和QQ群連結,可能是你現在進入的文章是爬蟲爬走的文章,可以點選該文章原始位址[部落格園]的連結來跳轉回最初原文:https://www.cnblogs.com/weskynet/p/16441219.html
歡迎加入QQ群:
群号:1079830632