【1】MQTT協定介紹
MQTT是一種輕量級的通信協定,适用于物聯網(IoT)和低帶寬網絡環境。它基于一種“釋出/訂閱”模式,其中裝置發送資料(也稱為 “釋出”)到經紀人(稱為MQTT代理),這些資料被存儲,并在需要時被轉發給訂閱者。這種方式簡化了網絡管理,允許多個裝置在不同的網絡條件下進行通信(包括延遲和帶寬限制),并支援實時資料更新。它是開放的,可免費使用并易于實施。
【2】MQTT協定封包字段介紹
MQTT協定封包由兩部分組成:固定報頭和可變報頭。
固定報頭的格式是統一的,其中包括了封包類型和剩餘長度兩個字段。
可變報頭的格式取決于封包類型。
下面是MQTT協定中各個封包類型的可變報頭字段說明。
(1)CONNECT:MQTT連接配接請求封包
CONNECT封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組(即封包類型和标志位的組合)為0x10,表示這是一個CONNECT封包。
可變報頭包括了以下字段:
- 協定名(Protocol Name):用于辨別MQTT協定的名稱,固定為字元串"MQTT";
- 協定級别(Protocol Level):用于辨別所使用的MQTT協定的版本号,一般情況下為4;
- 連接配接标志(Connect Flags):用于設定各種連接配接選項,其中包括:
- 使用者名/密碼(Username/Password):用于對連接配接進行身份驗證;
- 清理會話(Clean Session):表示用戶端需要清除伺服器上舊的Session資訊;
- 遺囑标志(Will Flag):表示用戶端是否需要在與伺服器的連接配接意外斷開時發送遺囑資訊;
- 遺囑QoS(Will QoS):用于設定遺囑消息的服務品質等級;
- 遺囑保留(Will Retain):表示遺囑消息是否需要被伺服器保留;
- 使用者名标志(Username Flag):表示用戶端是否需要發送使用者名字段;
- 密碼标志(Password Flag):表示用戶端是否需要發送密碼字段。
- 保持連接配接(Keep Alive):用于設定心跳包的發送間隔時間,以便用戶端和伺服器之間保持連接配接。
(2)CONNACK:MQTT連接配接響應封包
CONNACK封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0x20,表示這是一個CONNACK封包。
可變報頭包括了以下字段:
- 連接配接應答(Connect Acknowledgment):用于表示連接配接是否成功,一般為0表示成功,其他值表示失敗;
- 保留标志(Reserved Flag):保留字段,必須為0。
(3)PUBLISH:MQTT釋出消息封包
PUBLISH封包包括固定報頭和可變報頭兩部分,以及消息體。其中,固定報頭的第一個位元組由封包類型和QoS級别組合而成,QoS級别可以為0、1或2。
可變報頭包括了以下字段:
- 主題名(Topic Name):用于辨別消息的主題;
- 封包辨別符(Packet Identifier):用于在QoS級别為1或2時确認消息分發的情況,如果為0則表示QoS級别為0。
消息體包括了要釋出的消息内容。
(4)PUBACK:MQTT釋出确認封包
PUBACK封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0x40,表示這是一個PUBACK封包。
可變報頭僅包括一個封包辨別符(Packet Identifier)字段,用于确認QoS級别為1的釋出消息。
(5)PUBREC:MQTT釋出接收封包
PUBREC封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0x50,表示這是一個PUBREC封包。
可變報頭僅包括一個封包辨別符(Packet Identifier)字段,用于确認QoS級别為2的釋出消息。
(6)PUBREL:MQTT釋出釋放封包
PUBREL封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0x62,表示這是一個PUBREL封包。
可變報頭僅包括一個封包辨別符(Packet Identifier)字段,用于确認QoS級别為2的釋出消息。
(7)PUBCOMP:MQTT釋出完成封包
PUBCOMP封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0x70,表示這是一個PUBCOMP封包。
可變報頭僅包括一個封包辨別符(Packet Identifier)字段,用于确認QoS級别為2的釋出消息。
(8)SUBSCRIBE:MQTT訂閱請求封包
SUBSCRIBE封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0x82,表示這是一個SUBSCRIBE封包。
可變報頭包括了以下字段:
- 封包辨別符(Packet Identifier):用于确認訂閱請求的情況;
- 訂閱主題(Subscription Topic):用于設定訂閱的主題;
- 服務品質等級(QoS Level):用于設定訂閱請求使用的服務品質等級,可以為0、1或2。
(9)SUBACK:MQTT訂閱确認封包
SUBACK封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0x90,表示這是一個SUBACK封包。
可變報頭包括了以下字段:
- 封包辨別符(Packet Identifier):用于确認訂閱請求的情況;
- 訂閱确認等級(Subscription Acknowledgment):用于确認訂閱請求的服務品質等級,可以為0、1或2。
(10)UNSUBSCRIBE:MQTT取消訂閱封包
UNSUBSCRIBE封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0xA2,表示這是一個UNSUBSCRIBE封包。
可變報頭包括了以下字段:
- 封包辨別符(Packet Identifier):用于确認取消訂閱請求的情況;
- 訂閱主題(Subscription Topic):用于設定要取消訂閱的主題。
(11)UNSUBACK:MQTT取消訂閱确認封包
UNSUBACK封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0xB0,表示這是一個UNSUBACK封包。
可變報頭僅包含封包辨別符(Packet Identifier)字段,用于确認取消訂閱請求。
(12)PINGREQ:MQTT心跳請求封包
PINGREQ封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0xC0,表示這是一個PINGREQ封包。
PINGREQ封包不包含可變報頭字段。
(13)PINGRESP:MQTT心跳響應封包
PINGRESP封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0xD0,表示這是一個PINGRESP封包。
PINGRESP封包不包含可變報頭字段。
(14)DISCONNECT:MQTT斷開連接配接封包
DISCONNECT封包包括固定報頭和可變報頭兩部分。其中,固定報頭的第一個位元組為0xE0,表示這是一個DISCONNECT封包。
DISCONNECT封包不包含可變報頭字段。
【3】封裝MQTT協定
這是一個使用C語言在Linux下建立TCP通信并發送MQTT封包的例子。 根據MQTT封包自己封裝協定。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
// 定義MQTT封包類型
#define MQTT_CONNECT 0x10
#define MQTT_CONNACK 0x20
#define MQTT_PUBLISH 0x30
#define MQTT_PUBACK 0x40
#define MQTT_SUBSCRIBE 0x80
#define MQTT_SUBACK 0x90
#define MQTT_UNSUBSCRIBE 0xA0
#define MQTT_UNSUBACK 0xB0
#define MQTT_PINGREQ 0xC0
#define MQTT_PINGRESP 0xD0
#define MQTT_DISCONNECT 0xE0
// 定義MQTT連接配接标志
#define MQTT_CONNECT_FLAG_CLEAN 0x02
#define MQTT_CONNECT_FLAG_WILL 0x04
#define MQTT_CONNECT_FLAG_WILL_QOS0 0x00
#define MQTT_CONNECT_FLAG_WILL_QOS1 0x08
#define MQTT_CONNECT_FLAG_WILL_QOS2 0x10
#define MQTT_CONNECT_FLAG_WILL_RETAIN 0x20
#define MQTT_CONNECT_FLAG_PASSWORD 0x40
#define MQTT_CONNECT_FLAG_USERNAME 0x80
// 定義MQTT封包結構體
typedef struct mqtt_packet
{
unsigned char *data;
unsigned int length;
}
mqtt_packet_t;
// 建立socket連接配接并傳回socket檔案描述符
int socket_connect(char *address, int port)
{
struct sockaddr_in server_address;
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1)
{
printf("Failed to create socket!\n");
return -1;
}
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port);
if ((inet_pton(AF_INET, address, &server_address.sin_addr)) <= 0)
{
printf("Invalid address/ Address not supported\n");
return -1;
}
if (connect(socket_fd, (struct sockaddr *)&server_address, sizeof(server_address)) < 0)
{
printf("Connection Failed!\n");
return -1;
}
return socket_fd;
}
// 打包MQTT連接配接封包
mqtt_packet_t *mqtt_connect(char *client_id, char *username, char *password)
{
mqtt_packet_t *packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
unsigned char *data = (unsigned char *)malloc(256);
unsigned int length = 0;
// 固定報頭
data[length++] = MQTT_CONNECT;
// 可變報頭
data[length++] = 0x0C;
// 清理會話标志和協定版本号
data[length++] = 'M';
data[length++] = 'Q';
data[length++] = 'T';
data[length++] = 'T';
data[length++] = 0x04;
// 協定版本号 // 連接配接标志
unsigned char flags = MQTT_CONNECT_FLAG_CLEAN;
if (username != NULL)
{
flags |= MQTT_CONNECT_FLAG_USERNAME;
}
if (password != NULL)
{
flags |= MQTT_CONNECT_FLAG_PASSWORD;
}
data[length++] = flags;
data[length++] = 0xFF;
// 保持連接配接時間低8位
data[length++] = 0xFF;
// 保持連接配接時間高8位 // 剩餘長度
unsigned char remaining_length = length - 1;
data[remaining_length++] = (unsigned char)(length - 2);
packet->data = data;
packet->length = length;
return packet;
}
// 發送MQTT封包
void mqtt_send(int socket_fd, mqtt_packet_t *packet)
{
if (send(socket_fd, packet->data, packet->length, 0) < 0)
{
printf("Failed to send message!\n");
}
}
// 接收MQTT封包
int mqtt_recv(int socket_fd, mqtt_packet_t *packet)
{
unsigned char header[2];
if (recv(socket_fd, header, 2, 0) != 2)
{
printf("Failed to receive message header!\n");
return -1
}
unsigned int remaining_length = 0;
unsigned int multiplier = 1;
int i = 1;
do
{
if (recv(socket_fd, &header[i], 1, 0) != 1)
{
printf("Failed to receive remaining_length byte %d!\n", i);
return -1;
}
remaining_length += (header[i] & 127) * multiplier;
multiplier *= 128;
i++;
}
while ((header[i - 1] & 128) != 0);
packet->length = remaining_length + i;
packet->data = (unsigned char *)malloc(packet->length);
memcpy(packet->data, header, 2);
if (recv(socket_fd, packet->data + 2, packet->length - 2, 0) != packet->length - 2)
{
printf("Failed to receive full message!\n");
return -1;
}
return 0;
}
int main(int argc, char *argv[])
{
// 建立 TCP 連接配接
int socket_fd = socket_connect("test.mosquitto.org", 1883);
if (socket_fd == -1)
{
printf("Failed to connect to MQTT server!\n");
return -1;
}
printf("Connected to MQTT server!\n");
// 打包并發送 MQTT 連接配接封包
mqtt_packet_t *connect_packet = mqtt_connect("test_client", NULL, NULL);
mqtt_send(socket_fd, connect_packet);
printf("Sent MQTT CONNECT packet!\n");
free(connect_packet->data);
free(connect_packet);
// 接收 MQTT CONNACK 封包
mqtt_packet_t *connack_packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
if (mqtt_recv(socket_fd, connack_packet) != 0)
{
printf("Failed to receive MQTT CONNACK packet!\n");
return -1;
}
if (connack_packet->data[1] != 0x00)
{
printf("MQTT server rejected connection!\n");
return -1;
}
printf("Received MQTT CONNACK packet!\n");
free(connack_packet->data);
free(connack_packet);
// 斷開 TCP 連接配接 close(socket_fd); return 0;
}