天天看點

c++ mqtt用戶端_【開源】一個基于socket API之上的跨平台MQTT用戶端

c++ mqtt用戶端_【開源】一個基于socket API之上的跨平台MQTT用戶端

mqttclient

一個基于socket API之上的跨平台MQTT用戶端

基于socket API的MQTT用戶端,擁有非常簡潔的API接口,以極少的資源實作QOS2的服務品質,并且無縫銜接了mbedtls加密庫。

優勢:

  • 基于标準BSD socket之上開發 ,隻要是相容BSD socket的系統均可使用。
  • 穩定 :無論是

    掉線重連

    丢包重發

    ,都是嚴格

    遵循MQTT協定标準

    執行,除此之外對 大資料量 的測試無論是收是發,都是非常穩定(一次發送

    135K

    資料,3秒一次),高頻測試也是非常穩定(7個主題同時收發,每秒一次,也就是1秒14個mqtt封包,服務品質QoS0、QoS1、QoS2都有)。因為作者以極少的資源設計了

    記錄機制

    ,對采用QoS1服務品質的封包必須保證到達一次,對QoS2服務品質的封包有且隻有收到一次(如果不相信它穩定性的同學可以自己去修改源碼,專門為QoS2服務品質去測試,故意不回複

    PUBREC

    ,讓伺服器重發QoS2封包,看看用戶端是否有且隻有處理一次),而對于掉線重連的穩定性,則是 基本操作 了,沒啥好說的,是以在測試中穩定性極好。
  • 輕量級 :整個代碼工程極其簡單,不使用mbedtls情況下,占用資源極少,作者曾使用esp8266模組與雲端通信,整個工程代碼消耗的RAM不足15k(包括系統占用的開銷,對資料的處理開銷,而此次還是未優化的情況下,還依舊完美保留了掉線重連的穩定性,但是對應qos1 qos2服務品質的封包則未做測試,因為STM32F103C8T6晶片資源實在是太少了,折騰不起)。
  • 無縫銜接mbedtls加密傳輸 ,讓網絡傳輸更加安全。
  • 擁有極簡的API接口 ,随意配置,使用起來非常簡單。
  • 有非常好的代碼風格與思想 :整個代碼采用分層式設計,代碼實作采用異步處理的思想,降低耦合,提高性能。
  • MQTT協定支援主題通配符`“#”、“+”`
  • 訂閱的主題與消息處理完全分離 ,讓程式設計邏輯更加簡單易用,使用者無需理會錯綜複雜的邏輯關系。
  • 不對外産生依賴。
  • mqttclient内部已實作保活處理機制 ,無需使用者過多關心理會,使用者隻需專心處理應用功能即可。

整體架構

擁有非常明确的分層架構。

c++ mqtt用戶端_【開源】一個基于socket API之上的跨平台MQTT用戶端
目前已實作了Linux、TencentOS tiny、RT-Thread平台(已做成軟體包),除此之外TencentOS tiny的AT架構亦可以使用,并且穩定性極好! 平台代碼位置 Linux:

https://github.com/jiejieTop/mqttclient

TencentOS tiny:

https://github.com/Tencent/TencentOS-tiny/tree/master/board/Fire_STM32F429

RT-Thread:

https://github.com/jiejieTop/mqttclient_rtpkgs

linux平台下測試使用

安裝cmake:

sudo apt-get install cmake
           

配置

mqttclient/test/test.c

檔案中修改以下内容:

init_params.connect_params.network_params.network_ssl_params.ca_crt = test_ca_get();    /* CA憑證 */
    init_params.connect_params.network_params.addr = "xxxxxxx";                             /* 伺服器域名 */
    init_params.connect_params.network_params.port = "8883";                                /* 伺服器端口号 */
    init_params.connect_params.user_name = "xxxxxxx";                                       /* 使用者名 */
    init_params.connect_params.password = "xxxxxxx";                                        /* 密碼 */
    init_params.connect_params.client_id = "xxxxxxx";                                       /* 用戶端id */
           

mbedtls

預設不打開mbedtls。

salof 全稱是:

Synchronous Asynchronous Log Output Framework

(同步異步日志輸出架構),它是一個異步日志輸出庫,在空閑時候輸出對應的日志資訊,并且該庫與mqttclient無縫銜接。

配置對應的日志輸出級别:
#define BASE_LEVEL      (0)
#define ASSERT_LEVEL    (BASE_LEVEL + 1)            /* 日志輸出級别:斷言級别(非常高優先級) */
#define ERR_LEVEL       (ASSERT_LEVEL + 1)          /* 日志輸出級别:錯誤級别(高優先級) */
#define WARN_LEVEL      (ERR_LEVEL + 1)             /* 日志輸出級别:警告級别(中優先級) */
#define INFO_LEVEL      (WARN_LEVEL + 1)            /* 日志輸出級别:資訊級别(低優先級) */
#define DEBUG_LEVEL     (INFO_LEVEL + 1)            /* 日志輸出級别:調試級别(更低優先級) */

#define         LOG_LEVEL                   WARN_LEVEL      /* 日志輸出級别 */
           
日志其他選項:
  • 終端帶顔色
  • 時間戳
  • 标簽

mqttclient的配置

配置mqtt等待應答清單的最大值,對于qos1 qos2服務品質有要求的可以将其設定大一點,當然也必須資源跟得上,它主要是保證qos1 qos2的mqtt封包能準确到達伺服器。

#define     MQTT_ACK_HANDLER_NUM_MAX            64
           

選擇MQTT協定的版本,預設為4,表示使用MQTT 3.1.1版本,而3則表示為MQTT 3.1版本。

#define     MQTT_VERSION                        4           // 4 is mqtt 3.1.1
           

設定預設的保活時間,它主要是保證MQTT用戶端與伺服器的保持活性連接配接,機關為 秒 ,比如MQTT用戶端與伺服器100S沒有發送資料了,有沒有接收到資料,此時MQTT用戶端會發送一個ping包,确認一下這個會話是否存在,如果收到伺服器的應答,那麼說明這個會話還是存在的,可以随時收發資料,而如果不存在了,就清除會話。

#define     MQTT_KEEP_ALIVE_INTERVAL            100         // unit: second
           

預設的指令逾時,它主要是用于socket讀寫逾時,在MQTT初始化時可以指定:

#define     MQTT_DEFAULT_CMD_TIMEOUT            4000
           

預設主題的長度,主題是支援通配符的,如果主題太長則會被截斷:

#define     MQTT_TOPIC_LEN_MAX                  64
           

預設的算法資料緩沖區的大小,如果要發送大量資料則修改大一些,在MQTT初始化時可以指定:

#define     MQTT_DEFAULT_BUF_SIZE               1024
           

線程相關的配置,如線程棧,線程優先級,線程時間片等:

在linux環境下可以是不需要理會這些參數的,而在RTOS平台則需要配置,如果不使用mbedtls,線程棧2048位元組已足夠,而使用mbedtls加密後,需要配置4096位元組以上。

#define     MQTT_THREAD_STACK_SIZE              2048    // 線程棧
#define     MQTT_THREAD_PRIO                    5       // 線程優先級
#define     MQTT_THREAD_TICK                    50      // 線程時間片
           

預設的重連時間間隔,當發生掉線時,會以這個時間間隔嘗試重連:

#define     MQTT_RECONNECT_DEFAULT_DURATION     1000
           

其他不需要怎麼配置的東西:

#define     MQTT_MAX_PACKET_ID                  (0xFFFF - 1)    // mqtt封包id
#define     MQTT_MAX_CMD_TIMEOUT                20000           //最大的指令逾時參數
#define     MQTT_MIN_CMD_TIMEOUT                1000            //最小的指令逾時參數
           
ps:以上參數基本不需要怎麼配置的,直接用即可~

編譯 & 運作

./build.sh
           

運作

build.sh

腳本後會在

./build/bin/

目錄下生成可執行檔案

mqtt-client

,直接運作即可。

設計思想

  • 整體采用分層式設計,代碼實作采用異步設計方式,降低耦合。
  • 消息的處理使用回調的方式處理:使用者指定

    [訂閱的主題]

    與指定

    [消息的處理函數]

  • 不對外産生依賴

API

mqttclient

擁有非常簡潔的

api

接口

int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
int mqtt_yield(mqtt_client_t* c, int timeout_ms);
           

核心

mqtt_client_t 結構
typedef struct mqtt_client {
    unsigned short              packet_id;
    unsigned char               ping_outstanding;
    unsigned char               ack_handler_number;
    unsigned char               *read_buf;
    unsigned char               *write_buf;
    unsigned int                cmd_timeout;
    unsigned int                read_buf_size;
    unsigned int                write_buf_size;
    unsigned int                reconnect_try_duration;
    void                        *reconnect_date;
    reconnect_handler_t         reconnect_handler;
    client_state_t              client_state;
    platform_mutex_t            write_lock;
    platform_mutex_t            global_lock;
    list_t                      msg_handler_list;
    list_t                      ack_handler_list;
    network_t                   *network;
    platform_thread_t           *thread;
    platform_timer_t            reconnect_timer;
    platform_timer_t            last_sent;
    platform_timer_t            last_received;
    connect_params_t            *connect_params;
} mqtt_client_t;
           

該結構主要維護以下内容:

  1. 讀寫資料緩沖區

    read_buf、write_buf

  2. 指令逾時時間

    cmd_timeout

    (主要是讀寫阻塞時間、等待響應的時間、重連等待時間)
  3. 維護

    ack

    連結清單

    ack_handler_list

    ,這是異步實作的核心,所有等待響應的封包都會被挂載到這個連結清單上
  4. 維護消息處理清單

    msg_handler_list

    ,這是

    mqtt

    協定必須實作的内容,所有來自伺服器的

    publish

    封包都會被處理(前提是訂閱了對應的消息)
  5. 維護一個網卡接口

    network

  6. 維護一個内部線程

    thread

    ,所有來自伺服器的mqtt包都會在這裡被處理!
  7. 兩個定時器,分别是掉線重連定時器與保活定時器

    reconnect_timer、last_sent、last_received

  8. 一些連接配接的參數

    connect_params

mqttclient實作

以下是整個架構的實作方式,友善大家更容易了解mqttclient的代碼與設計思想,讓大家能夠修改源碼與使用,還可以送出pr或者issues,開源的世界期待各位大神的參與,感謝!

除此之外以下代碼的

記錄機制

與其

逾時處理機制

是非常好的程式設計思想,大家有興趣一定要看源代碼!

初始化

int mqtt_init(mqtt_client_t* c, client_init_params_t* init)
           

主要是配置

mqtt_client_t

結構的相關資訊,如果沒有指定初始化參數,則系統會提供預設的參數。

但連接配接部分的參數則必須指定:

init_params.connect_params.network_params.addr = "[你的mqtt伺服器IP位址或者是域名]";
    init_params.connect_params.network_params.port = 1883;    //端口号
    init_params.connect_params.user_name = "jiejietop";
    init_params.connect_params.password = "123456";
    init_params.connect_params.client_id = "clientid";

    mqtt_init(&client, &init_params);
           

連接配接伺服器

int mqtt_connect(mqtt_client_t* c);
           

參數隻有

mqtt_client_t

類型的指針,字元串類型的

主題

(支援通配符"#" "+"),主題的

服務品質

,以及收到封包的

處理函數

,如不指定則有預設處理函數。連接配接伺服器則是使用非異步的方式設計,因為必須等待連接配接上伺服器才能進行下一步操作。

過程如下:

  1. 調用底層的連接配接函數連接配接上伺服器:
c->network->connect(c->network);
           
  1. 序列化

    mqtt

    CONNECT

    封包并且發送
MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)
mqtt_send_packet(c, len, &connect_timer)
           
  1. 等待來自伺服器的

    CONNACK

    封包
mqtt_wait_packet(c, CONNACK, &connect_timer)
           
  1. 連接配接成功後建立一個内部線程

    mqtt_yield_thread

platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)
           

訂閱封包

int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
           

訂閱封包使用異步設計來實作的:

過程如下:

  1. 序列化訂閱封包并且發送給伺服器
MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, mqtt_get_next_packet_id(c), 1, &topic, (int*)&qos)
mqtt_send_packet(c, len, &timer)
           
  1. 建立對應的消息處理節點,這個消息節點在收到伺服器的

    SUBACK

    訂閱應答封包後會挂載到消息處理清單

    msg_handler_list

mqtt_msg_handler_create(topic_filter, qos, handler)
           
  1. 在發送了封包給伺服器那就要等待伺服器的響應了,記錄這個等待

    SUBACK

mqtt_ack_list_record(c, SUBACK, mqtt_get_next_packet_id(c), len, msg_handler)
           

取消訂閱

與訂閱封包的邏輯基本差不多的~

  1. 序列化訂閱封包并且發送給伺服器
MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)
mqtt_send_packet(c, len, &timer)
           
  1. 建立對應的消息處理節點,這個消息節點在收到伺服器的

    UNSUBACK

    取消訂閱應答封包後将消息處理清單

    msg_handler_list

    上的已經訂閱的主題消息節點銷毀
mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL)
           
  1. 在發送了封包給伺服器那就要等待伺服器的響應了,先記錄這個等待

    UNSUBACK

mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler)
           

釋出封包

int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg)
           

參數隻有

mqtt_client_t

類型的指針,字元串類型的

主題

(支援通配符),要釋出的消息(包括

服務品質

消息主體

)。

mqtt_message_t msg;

    msg.qos = 2;
    msg.payload = (void *) buf;

    mqtt_publish(&client, "testtopic1", &msg);
           

核心思想都差不多,過程如下:

  1. 先序列化釋出封包,然後發送到伺服器
MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id,
              topic, (unsigned char*)msg->payload, msg->payloadlen);
mqtt_send_packet(c, len, &timer)
           
  1. 對于QOS0的邏輯,不做任何處理,對于QOS1和QOS2的封包則需要記錄下來,在沒收到伺服器應答的時候進行重發
if (QOS1 == msg->qos) {
        rc = mqtt_ack_list_record(c, PUBACK, mqtt_get_next_packet_id(c), len, NULL);
    } else if (QOS2 == msg->qos) {
        rc = mqtt_ack_list_record(c, PUBREC, mqtt_get_next_packet_id(c), len, NULL);
    }
           
  1. 還有非常重要的一點,重發封包的MQTT封包頭部需要設定DUP标志位,這是MQTT協定的标準,是以,在重發的時候作者直接操作了封包的DUP标志位,因為修改DUP标志位的函數我沒有從MQTT庫中找到,是以我封裝了一個函數,這與LwIP中的交叉存取思想是一個道理,它假設我知道MQTT封包的所有操作,是以我可以操作它,這樣子可以提高很多效率:
mqtt_set_publish_dup(c,1);  /* may resend this data, set the udp flag in advance */
           

内部線程

static void mqtt_yield_thread(void *arg)
           

主要是對

mqtt_yield

函數的傳回值做處理,比如在

disconnect

的時候銷毀這個線程。

核心的處理函數`mqtt_yield`

  1. 資料包的處理

    mqtt_packet_handle

static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
           

對不同的包使用不一樣的處理:

switch (packet_type) {
        case 0: /* timed out reading packet */
            break;

        case CONNACK:
            break;

        case PUBACK:
        case PUBCOMP:
            rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
            break;

        case SUBACK:
            rc = mqtt_suback_packet_handle(c, timer);
            break;

        case UNSUBACK:
            rc = mqtt_unsuback_packet_handle(c, timer);
            break;

        case PUBLISH:
            rc = mqtt_publish_packet_handle(c, timer);
            break;

        case PUBREC:
        case PUBREL:
            rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
            break;

        case PINGRESP:
            c->ping_outstanding = 0;
            break;

        default:
            goto exit;
    }
           

并且做保活的處理:

mqtt_keep_alive(c)
           

當發生逾時後,

if (platform_timer_is_expired(&c->last_sent) || platform_timer_is_expired(&c->last_received)) 
           

序列号一個心跳包并且發送給伺服器

MQTTSerialize_pingreq(c->write_buf, c->write_buf_size);
mqtt_send_packet(c, len, &timer);
           

當再次發生逾時後,表示與伺服器的連接配接已斷開,需要重連的操作,設定用戶端狀态為斷開連接配接

mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
           
  1. ack

    連結清單的掃描,當收到伺服器的封包時,對ack清單進行掃描操作
mqtt_ack_list_scan(c);
           

當逾時後就銷毀ack連結清單節點:

mqtt_ack_handler_destroy(ack_handler);
           

當然下面這幾種封包則需要重發操作:(

PUBACK 、PUBREC、 PUBREL 、PUBCOMP

,保證QOS1 QOS2的服務品質)

if ((ack_handler->type ==  PUBACK) || (ack_handler->type ==  PUBREC) || (ack_handler->type ==  PUBREL) || (ack_handler->type ==  PUBCOMP))
    mqtt_ack_handler_resend(c, ack_handler);
           
  1. 保持活性的時間過去了,可能掉線了,需要重連操作
mqtt_try_reconnect(c);
           

重連成功後嘗試重新訂閱封包,保證恢複原始狀态~

mqtt_try_resubscribe(c)
           

釋出應答與釋出完成封包的處理

static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
           
  1. 反序列化封包
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
           
  1. 取消對應的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
           

訂閱應答封包的處理

static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
           
  1. 反序列化封包
MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size)
           
  1. 取消對應的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
           
  1. 安裝對應的訂閱消息處理函數,如果是已存在的則不會安裝
mqtt_msg_handlers_install(c, msg_handler);
           

取消訂閱應答封包的處理

static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
           
  1. 反序列化封包
MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size)
           
  1. 取消對應的ack記錄
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
           
  1. 銷毀對應的訂閱消息處理函數
mqtt_msg_handler_destory(msg_handler);
           

來自伺服器的釋出封包的處理

static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
           
  1. 反序列化封包
MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
        (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size)
           
  1. 對于QOS0、QOS1的封包,直接去處理消息
mqtt_deliver_message(c, &topic_name, &msg);
           
  1. 對于QOS1的封包,還需要發送一個

    PUBACK

    應答封包給伺服器
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBACK, 0, msg.id);
           
  1. 而對于QOS2的封包則需要發送

    PUBREC

    封包給伺服器,除此之外還需要記錄

    PUBREL

    到ack連結清單上,等待伺服器的釋出釋放封包,最後再去處理這個消息
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREC, 0, msg.id);
mqtt_ack_list_record(c, PUBREL, msg.id + 1, len, NULL)
mqtt_deliver_message(c, &topic_name, &msg);
           
說明:一旦注冊到ack清單上的封包,當具有重複的封包是不會重新被注冊的,它會通過

mqtt_ack_list_node_is_exist

函數判斷這個節點是否存在,主要是依賴等待響應的消息類型與msgid。

釋出收到與釋出釋放`封包的處理

static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
           
  1. 反序列化封包
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
           
  1. 産生一個對應的應答封包
mqtt_publish_ack_packet(c, packet_id, packet_type);
           
  1. 取消對應的ack記錄
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
           

在背景測試

nohup ./mqtt-client > log.out 2>&1 &

繼續閱讀