天天看点

ESP32 基于4G模块透传 模式 实现MQTT通信

这里选择的是SIM7600CE 和EC20 4G通信模块,工作在透传模式

  1. 创建GPRS 拨号的通信接口源文件 GprsUtils.c ,实现4G模块 拨号及连接服务器功能
#include <string.h>
#include <stdint.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "GprsUtils.h"
#include "UartUtils.h"
#include "driver/gpio.h"
    
static uint8_t find_string(const char * recvBuff,uint16_t recvBuffLen,const char* p)
{ 
    if(recvBuffLen>0&&strstr(recvBuff,p)!=NULL)
    {
        return 1;
    }
    else
    {
        return 0;
    }
}
static void delay_ms(int waitTimeMs){
    vTaskDelay(waitTimeMs/portTICK_PERIOD_MS);
    
}

void gprs_send_string(char* s){
    uartSendBytes(GPRSPORT,(uint8_t *)s,strlen(s));
}

void gprs_recv_ack(uint8_t *buf,int *len,int wait_time){
    uartRecvBytes(GPRSPORT, buf, len,MAX_UART_RECV_BUFF_SIZE,wait_time);
}


uint8_t gprs_send_cmd(char *b,char *a,uint8_t times,uint16_t wait_time)         
{
    uint8_t i;
    i = 0;
    
    while(i < times)                    
    {
        char gprsBuf[MAX_UART_RECV_BUFF_SIZE]={0};
        int gprsBufLen=0;
        gprs_send_string(b);
        gprs_send_string("\r\n");             // 回车换行
        gprs_recv_ack((uint8_t *)gprsBuf,&gprsBufLen,wait_time);
        if(find_string(gprsBuf,gprsBufLen,a))
            return 1;
        i++;
    }
    return 0;
}


/******************************************************************************
* 函数名称:str_delim
* 描    述:字符串分割函数
* 输入参数: num   0---取出分隔字符串(delim)前面的字符串  1---取出分隔字符串(delim)后面的字符串
*           temp  要分割的字符串
*           delim 分隔符字符串
* 输    出: 
* 返    回: 
* 说    明:          
*******************************************************************************/
char *str_delim(uint8_t num,char *temp,char *delim)
{
    int i;
    char *str[2]={0};
    char *tok=temp;
    char *restr;
    for(i=0;i<2;i++)
    {
        tok=strtok(tok,delim);
        str[i]=tok;
        tok = NULL;
    }
    restr=str[num];
  return restr;
}



uint8_t sim7600CheckStatus(void);
uint8_t sim7600SelectNet(void);
uint8_t sim7600NetConfig(void);
uint8_t sim7600TcpConnect(const char *serverIp,int port);
uint8_t sim7600ConnectServer(const char *serverIp,int port);
void     sim7600ExitDataMode(void);
uint8_t sim7600CloseTcpConnect(void);
uint8_t sim7600CloseNet(void);

/******************************************************************************
* 函数名称: check_status
* 描    述: 核心板基本状态测试
* 输入参数: 无
* 输    出: 无
* 返    回: 0 --- 出错  1 --- 正确
* 说    明: 核心板开机后,先判断AT命令是否正常、能否读到卡、能否注册网络。确认无误后
*           再进行其他功能测试
*******************************************************************************/
uint8_t sim7600CheckStatus(void)
{
     sim7600ExitDataMode();                                               // 退出透传模式
    // 同步波特率       
    if(!gprs_send_cmd("AT","OK",5,200)) 
    {
        printf("handshake failed\r\n");
        return 0;
    }
    // 关闭网络,避免后面因为网络已经开启,导致打开网络失败
    gprsCloseNet();
    // 取消回显             
    if(!gprs_send_cmd("ATE0","OK",1,200)){
        printf("echo cancelled failed\r\n");
        return 0;
    }
    
    // 查询核心板能否读到SIM卡            
    if(!gprs_send_cmd("AT+CPIN?","+CPIN: READY",2,500)){
       printf("no sim card detected\r\n");
       return 0;
    }
    
    if(!gprs_send_cmd("AT+CSQ","+CSQ",2,100))
    {                
        printf("serching CSQ failed\r\n");
        return 0;
    }
    if(!gprs_send_cmd("AT+COPS?","OK",2,100))
    {                
        printf("get Operator failed\r\n");
        return 0;
    }
    
    // 选择网络
    if(!sim7600SelectNet())                                              // 第一次选择好网路后,可直接去掉该函数
      return 0;    
    return 1;
}

/******************************************************************************
* 函数名称: select_net
* 描    述: 核心板选择网络
* 输入参数: 
* 输    出: 
* 返    回: 0 --- 出错  1 --- 正确
* 说    明: SIM卡首次插入核心板测试时,需要选择网络,选择好网络后,如果不换卡,则不需要再执行该函数
*******************************************************************************/

uint8_t sim7600SelectNet(void)
{
    
    // 设置APN
    if(!gprs_send_cmd("AT+CGSOCKCONT=1,\"IP\",\"CMNET\"","OK",1,500)){
        printf("set APN failed\r\n");
        return 0;
    }
    
    // 选择网络,可在常量声明中修改
    if(!gprs_send_cmd("AT+CNMP=38","OK",1,200)){
        printf("select network failed\r\n");
        return 0;
    }    
    //查询核心板所处网络
    if(!gprs_send_cmd("AT+CNMP?","+CNMP: 38",3,500)){
        printf("register network failed\r\n");
        return 0;
    }


    
    // 查询核心板是否注册成功             
    if(!gprs_send_cmd("AT+CREG?","+CREG: 0,1",5,1000)){
         printf("register network failed\r\n");
         return 0;
    }
    return 1;
}

/******************************************************************************
* 函数名称: net_config
* 描    述: 核心板进行连接前的网络配置
* 输入参数: 无
* 输    出: 无
* 返    回: 0 --- 出错  1 --- 正确
* 说    明: 判断核心板是否可以进行通信连接          
*******************************************************************************/
uint8_t sim7600NetConfig(void)
{
    // 激活启动场景
    if(!gprs_send_cmd("AT+CSOCKSETPN=1","OK",1,200) ){
         printf("activate mobile scene failed\r\n");
         return 0;
    }    

  // 设置为非透传模式
    if(!gprs_send_cmd("AT+CIPMODE?","+CIPMODE: 1",1,200)){   // 这里不能直接设置非透传模式,避免因为重复设置导致ERROR
    
         if(!gprs_send_cmd("AT+CIPMODE=1","OK",1,200)){
             printf("Transparent transmission mode failed\r\n");
             return 0;
         }
             
    }

    // 打开网络
    if(!gprs_send_cmd("AT+NETOPEN","+NETOPEN: 0",1,5000)){  // 不能重复操作,否则会ERROR
        printf("open network failed\r\n");
        return 0;
    }

    // 获取本地IP地址
    if(gprs_send_cmd("AT+IPADDR","ERROR",1,2000))
    {
         printf("get local Ipaddress failed\r\n"); 
         return 0;
    }

    /************
    else
    {
       printf("11.获取本地IP成功\r\n");
       printf("%s",str_delim(1,gprsBuf,":"));            // 将获取到的本地IP地址打印到串口调试助手
    }
    ************/
    return 1;
}

/******************************************************************************
* 函数名称: tcp_connect
* 描    述: 核心板与云服务器进行TCP透传模式通信
* 输入参数: 无
* 输    出: 无
* 返    回: 0 --- 出错  1 --- 正确
* 说    明: 无       
*******************************************************************************/
uint8_t sim7600TcpConnect(const char *serverIp,int port)
{
    char serverIpCmd[512]={0};
    sprintf(serverIpCmd,"AT+CIPOPEN=0,\"TCP\",\"%s\",%d\r\n",serverIp,port);    // TCP服务器IP地址,可自行修改
    // 建立TCP连接
    if(!gprs_send_cmd((char*)serverIpCmd,"CONNECT",2,3000)){
         printf("set Tcp Transparent transmission mode failed \r\n");
         return 0;    
    }
    return 1;
}


uint8_t sim7600ConnectServer(const char *serverIp,int port){
    if(!sim7600CheckStatus())                                   // 判断核心板状态是否正常
    {
       printf("*** check_status failed ***\r\n");
       return 0;
    }else
       printf("*** check_status ok ***\r\n");
    if(!sim7600NetConfig()){                                     // 判断通信连接是否正常
        printf("*** net_config failed  ***\r\n");
        return 0; 
    }else
        printf("*** net_config OK  ***\r\n");
    
    return sim7600TcpConnect(serverIp,port);
}


/********************************************************************************
* 函数名称:  exit_data_mode 
* 描    述:  退出数据模式
* 输入参数:  无
* 输    出:  无
* 返    回:  无
* 说    明:  发送“+++”,确保核心板退出透传数据模式,前后1s延时
*******************************************************************************/
void sim7600ExitDataMode(void)
{
  char gprsBuf[MAX_UART_RECV_BUFF_SIZE]={0};
  int gprsBufLen=0;
  delay_ms(1000);
  gprs_send_string("+++");
  gprs_recv_ack((uint8_t *)gprsBuf,&gprsBufLen,1000);
}

/********************************************************************************
* 函数名称:  close_tcp_connect 
* 描    述:  关闭TCP连接
* 输入参数:  无
* 输    出:  无
* 返    回:  无
* 说    明: 关闭TCP连接
*******************************************************************************/
uint8_t sim7600CloseTcpConnect(void)
{
    // 退出透传数据模式
    sim7600ExitDataMode();
    delay_ms(1500);
     // 关闭TCP连接    
    if(!gprs_send_cmd("AT+CIPCLOSE=0","OK",2,2000)){       // 关闭连接耗时较长(+CIPCLOSE: 0,0),可根据实际情况调整
        printf("close tcp port failed\n");
        return 0;
    }
    return 1;
}



uint8_t sim7600CloseNet(void){
    // 关闭网络
    sim7600CloseTcpConnect();
    if(!gprs_send_cmd("AT+NETCLOSE","+NETCLOSE:",5,3000))
    {
        printf("close network failed\r\n");    
        return 0;        
    }    
    return 1;
}




/EC20  Start/


uint8_t ec20ConnectServer(const char *serverIp,int port);
uint8_t ec20CloseNet(void);

uint8_t ec20ConnectServer(const char *serverIp,int port){
    printf("ec20ConnectServer:%s %d\n",serverIp,port);
    if(!gprs_send_cmd("AT","OK",2,1000)){
        printf("handshake failed\r\n");
        return 0;
    }
    // 查询核心板能否读到SIM卡            
    if(!gprs_send_cmd("AT+CPIN?","+CPIN: READY",2,1000)){
       printf("no sim card detected\r\n");
       return 0;
    }
    if(!gprs_send_cmd("AT+CSQ","+CSQ",2,1000))
    {                
        printf("serching CSQ failed\r\n");
        return 0;
    }
    // 查询核心板是否注册成功             
    if(!gprs_send_cmd("AT+CREG?","+CREG: 0,1",5,1000)){
         printf("register GSM network failed\r\n");
         return 0;
    }
    if(!gprs_send_cmd("AT+CGREG?","OK",5,1000)){
         printf("register GPRS network failed\r\n");
         return 0;
    }
    // 设置APN
    if(!gprs_send_cmd("AT+QICSGP=1,1,\"CMNET\"","OK",2,1000)){
        printf("set APN failed\r\n");
        return 0;
    }

    if(!gprs_send_cmd("AT+QIDEACT=1","OK",2,1000)){
        printf("Deactivates a PDP context\r\n");
        return 0;
    }
    if(!gprs_send_cmd("AT+QIACT=1","OK",2,1000)){
        printf("activates a PDP context\r\n");
        return 0;
    }

    char serverIpCmd[512]={0};
    sprintf(serverIpCmd,"AT+QIOPEN=1,0,\"TCP\",\"%s\",%d,0,2\r\n",serverIp,port);    // TCP服务器IP地址,可自行修改
    // 建立TCP连接
    if(!gprs_send_cmd((char*)serverIpCmd,"CONNECT",2,3000)){
         printf("set Tcp Transparent transmission mode failed \r\n");
         return 0;    
    }
    return 1;


}

uint8_t ec20CloseNet(void){

    sim7600ExitDataMode();
    delay_ms(2000);
     // 关闭TCP连接    
    if(!gprs_send_cmd("AT+QICLOSE=0","OK",2,2000)){       // 关闭连接耗时较长(+CIPCLOSE: 0,0),可根据实际情况调整
        printf("close tcp port failed\n");
        return 0;
    }
    return 1;
}

/EC20  End/




//#define SIM7600CE_4G        1
#define EC20_4G                1





void gprsInit(int uartPort,uint32_t bound){
    uart_init(uartPort,bound);
    gprsPower(1);
    gprsReset(0);
    delay_ms(2000);
}

uint8_t gprsTryConnectServer(const char *serverIp,int port){
    #if SIM7600CE_4G
        return sim7600ConnectServer(serverIp,port);
    #else
        return ec20ConnectServer(serverIp,port);
    #endif
    
}

uint8_t gprsCloseNet(void)
{
    #if SIM7600CE_4G
        return sim7600CloseNet();
    #else
        return ec20CloseNet();
    #endif
}

int gprsTcpWrite(uint8_t *data,int len){
    return uartSendBytes(GPRSPORT,data,len);
}

int gprsTcpRead(uint8_t *recvbuf,int maxRecvLen,int waitTime){
    int readLen=0;
    uartRecvBytes(GPRSPORT, recvbuf, &readLen,maxRecvLen,waitTime);
    return readLen;
}
int gprsTcpAvailable(int waitTime){
    return uartHasDataReceived(GPRSPORT, waitTime);
}

void gprsReboot(void){
    gpio_set_level(SIM_RST_Pin,1);
    delay_ms(2000);
    gpio_set_level(SIM_RST_Pin, 0);
    delay_ms(20000);
}

void gprsPower(uint8_t isOn){
    gpio_set_level(SIM_PEN_Pin,isOn);
}
void gprsReset(uint8_t isOn){
    gpio_set_level(SIM_RST_Pin, isOn);
}
void gprsRepowerOn(void) {
    gpio_set_level(SIM_PEN_Pin,1);
    delay_ms(4000);
}
void gprsRepowerOff(void) {
    gpio_set_level(SIM_PEN_Pin,0);
    delay_ms(2000);
}
           
  1. 移植MQTT 通信协议

从https://github.com/eclipse/paho.mqtt.embedded-c/tree/master/MQTTPacket 下载MQTT通信源码包, 将src目录下所有文件拷贝到自己的工程目录下,然后移植samples目录下的transport.c

#include "transport.h"
#include "lwip/opt.h"
#include "lwip/arch.h"
#include "lwip/api.h"
#include "lwip/inet.h"
#include "lwip/sockets.h"
#include "string.h"
#include "../GprsUtils/GprsUtils.h"

static int mysock;

/************************************************************************
** 函数名称: transport_sendPacketBuffer                                    
** 函数功能: 以TCP方式发送数据
** 入口参数: unsigned char* buf:数据缓冲区
**           int buflen:数据长度
** 出口参数: <0发送数据失败                            
************************************************************************/
int transport_sendPacketBuffer( uint8_t* buf, int buflen)
{
    return gprsTcpWrite(buf,buflen);
}

/************************************************************************
** 函数名称: transport_getdata                                    
** 函数功能: 以阻塞的方式接收TCP数据
** 入口参数: unsigned char* buf:数据缓冲区
**           int count:数据长度
** 出口参数: <=0接收数据失败                                    
************************************************************************/
int transport_getdata(uint8_t* buf, int count)
{
    return gprsTcpRead(buf,count,500);
}


/************************************************************************
** 函数名称: transport_open                                    
** 函数功能: 打开一个接口,并且和服务器 建立连接
** 入口参数: char* servip:服务器域名
**           int   port:端口号
** 出口参数: <0打开连接失败                                        
************************************************************************/
int transport_open(const char *servip, int port)
{
    return gprsTryConnectServer(servip,port);
}


/************************************************************************
** 函数名称: transport_close                                    
** 函数功能: 关闭套接字
** 入口参数: unsigned char* buf:数据缓冲区
**           int buflen:数据长度
** 出口参数: <0发送数据失败                            
************************************************************************/
int transport_close(void)
{
    return gprsCloseNet();
}
           
  1. 创建MQTT通信接口源文件,包含连接MQTT服务器、订阅、发布,接收
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "MqttUtils.h"
#include "../GprsUtils/GprsUtils.h"





/************************************************************************
** 函数名称: MQTTClientInit                                
** 函数功能: 初始化客户端并登录服务器
** 入口参数: int sock:网络描述符
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
static int MQTTClientInit(const char *username,const char *password,int keepalive){
        MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
        uint8_t buf[100];
        int buflen = sizeof(buf);
        int len;
       uint8_t sessionPresent,connack_rc;
    
    
         //创建MQTT客户端连接参数
      connectData.willFlag = 0;
      //MQTT版本
        connectData.MQTTVersion = 4;
        //客户端ID--必须唯一
        connectData.clientID.cstring = "MIMITOTO";
        //保活间隔
        connectData.keepAliveInterval = keepalive;
        if(username&&password){
            //用户名
            connectData.username.cstring = username;
            //用户密码
            connectData.password.cstring = password;
        }
        //清除会话
        connectData.cleansession = 1;
        
      //串行化连接消息
        len = MQTTSerialize_connect(buf, buflen, &connectData);
        //发送TCP数据
    if(transport_sendPacketBuffer(buf, len) < 0)
        return -1;
      if(gprsTcpAvailable(2000)<0)
          return -2;
    if(MQTTPacket_read(buf, buflen, transport_getdata) != CONNACK)
        return -3;    
        //拆解连接回应包
    if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
        return -4;
        
    if(sessionPresent == 1)
        return 1;//不需要重新订阅--服务器已经记住了客户端的状态
    else 
        return 0;//需要重新订阅
}


int mqttConnnectServer(const char *servip, int port,const char *username,const char *password,int keepalive){
    int sessionPresent=-1;
    if(!transport_open(servip,port))
        return -1;
    if((sessionPresent=MQTTClientInit(username,password,keepalive))<0){
        transport_close();
        return -1;
    }
    return sessionPresent;
}


/************************************************************************
** 函数名称: ReadPacketTimeout                    
** 函数功能: 阻塞读取MQTT数据
** 入口参数: int sock:网络描述符
**           uint8_t *buf:数据缓存区
**           int buflen:缓冲区大小
**           uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回
** 出口参数: -1:错误,其他--包类型
** 备    注: 
************************************************************************/
int ReadPacketTimeout(uint8_t *buf,int buflen,uint32_t timeout)
{
      if(timeout != 0)
      {
        if(gprsTcpAvailable(timeout)<0)
            return -1;            
      }
        //读取TCP/IP事件
        return MQTTPacket_read(buf, buflen, transport_getdata);
}

/************************************************************************
** 函数名称: MQTTSubscribe                                
** 函数功能: 订阅消息
** 入口参数: int sock:套接字
**           char *topic:主题
**           enum QoS pos:消息质量
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
int MQTTSubscribe(char *topic,enum QoS pos){
      static uint32_t PacketID = 0;
      uint16_t packetidbk = 0;
      int conutbk = 0;
        uint8_t buf[100];
        int buflen = sizeof(buf);
      MQTTString topicString = MQTTString_initializer;  
        int len;
      int req_qos,qosbk;
    
      //复制主题
    topicString.cstring = (char *)topic;
        //订阅质量
    req_qos = pos;
    
      //串行化订阅消息
    len = MQTTSerialize_subscribe(buf, buflen, 0, PacketID++, 1, &topicString, &req_qos);
    //发送TCP数据
    if(transport_sendPacketBuffer(buf, len) < 0)
        return -1;
    if(gprsTcpAvailable(2000) <0 ){
        return -2;
    }
    //等待订阅返回--未收到订阅返回
    if(MQTTPacket_read(buf, buflen, transport_getdata) != SUBACK)
        return -4;    
        
        //拆订阅回应包
    if(MQTTDeserialize_suback(&packetidbk,1, &conutbk, &qosbk, buf, buflen) != 1)
        return -5;
        
        //检测返回数据的正确性
    if((qosbk == 0x80)||(packetidbk != (PacketID-1)))
        return -6;
    //订阅成功
        return 0;
}




/************************************************************************
** 函数名称: GetNextPackID                        
** 函数功能: 产生下一个数据包ID
** 入口参数: 无
** 出口参数: uint16_t packetid:产生的ID
** 备    注: 
************************************************************************/
static uint16_t GetNextPackID(void)
{
     static uint16_t pubpacketid = 0;
     return pubpacketid++;
}

/************************************************************************
** 函数名称: WaitForPacket                    
** 函数功能: 等待特定的数据包
** 入口参数: int sock:网络描述符
**           uint8_t packettype:包类型
**           uint8_t times:等待次数
** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包
** 备    注: 
************************************************************************/
static int WaitForPacket(uint8_t packettype,uint8_t times)
{
      int type;
        uint8_t buf[MSG_MAX_LEN];
      uint8_t n = 0;
        int buflen = sizeof(buf);
        do
        {
                //读取数据包
            type = ReadPacketTimeout(buf,buflen,2000);
            if(type != -1)
                mqtt_pktype_ctl(type,buf,buflen);
            n++;
        }while((type != packettype)&&(n < times));
        //收到期望的包
        if(type == packettype)
             return 0;
        else 
             return -1;        
}


int MQTTMsgPublish(char *topic, char qos, char retained,uint8_t* msg,uint32_t msg_len){
        uint8_t buf[MSG_MAX_LEN];
        int buflen = sizeof(buf),len;
        MQTTString topicString = MQTTString_initializer;
      uint16_t packid = 0,packetidbk=0;
    
        //填充主题
      topicString.cstring = (char *)topic;
      //填充数据包ID
      if((qos == QOS1)||(qos == QOS2))
        { 
            packid = GetNextPackID();
        }
        else
        {
              qos = QOS0;
              retained = 0;
              packid = 0;
        }
        
        //推送消息
        len = MQTTSerialize_publish(buf, buflen, 0, qos, retained, packid, topicString, (unsigned char*)msg, msg_len);
        if(len <= 0)
                return -1;
        if(transport_sendPacketBuffer(buf, len) < 0)    
                return -2;    
        
        //质量等级0,不需要返回
        if(qos == QOS0)
        {
                return 0;
        }
        
        //等级1
        if(qos == QOS1)
        {
                //等待PUBACK
              if(WaitForPacket(PUBACK,5) < 0)
                     return -3;
                return 1;
              
        }
        //等级2
        if(qos == QOS2)    
        {
              //等待PUBREC
              if(WaitForPacket(PUBREC,5) < 0)
                     return -3;
              //发送PUBREL
              len = MQTTSerialize_pubrel(buf, buflen,0, packetidbk);
                if(len == 0)
                    return -4;
                if(transport_sendPacketBuffer(buf, len) < 0)    
                    return -6;            
              //等待PUBCOMP
              if(WaitForPacket(PUBREC,5) < 0)
                     return -7;
                return 2;
        }
        //等级错误
        return -8;
}


int my_mqtt_send_pingreq(){
    int len;
    uint8_t buf[200];
    int buflen = sizeof(buf);     
    len = MQTTSerialize_pingreq(buf, buflen);
    transport_sendPacketBuffer(buf, len);
    if(gprsTcpAvailable(5000)<0)
            return -1;
    if(MQTTPacket_read(buf, buflen, transport_getdata) != PINGRESP)
            return -2;
    return 0;
}
int mqttHasDataIn(int waitTime){
    return gprsTcpAvailable(waitTime);
}




消息处理/


/************************************************************************
** 函数名称: deliverMessage                        
** 函数功能: 接受服务器发来的消息
** 入口参数: MQTTMessage *msg:MQTT消息结构体
**           MQTT_USER_MSG *mqtt_user_msg:用户接受结构体
**           MQTTString  *TopicName:主题
** 出口参数: 无
** 备    注: 
************************************************************************/
static void deliverMessage(MQTTString  *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg)
{
        //消息质量
        mqtt_user_msg->msgqos = msg->qos;
        //保存消息
        memcpy(mqtt_user_msg->msg,msg->payload,msg->payloadlen);
        mqtt_user_msg->msg[msg->payloadlen] = '\0';
        //保存消息长度
        mqtt_user_msg->msglenth = msg->payloadlen;
        //消息主题
        memcpy((char *)mqtt_user_msg->topic,TopicName->lenstring.data,TopicName->lenstring.len);
        mqtt_user_msg->topic[TopicName->lenstring.len] = 0;
        //消息ID
        mqtt_user_msg->packetid = msg->id;
        //标明消息合法
        mqtt_user_msg->valid = 1;        
}

/************************************************************************
** 函数名称: UserMsgCtl                        
** 函数功能: 用户消息处理函数
** 入口参数: MQTT_USER_MSG  *msg:消息结构体指针
** 出口参数: 无
** 备    注: 
************************************************************************/
void UserMsgCtl(MQTT_USER_MSG  *msg)
{
      

    //这里处理数据只是打印,用户可以在这里添加自己的处理方式
      printf("MQTT>>****收到客户端自己订阅的消息!!****\n");
        //返回后处理消息
      switch(msg->msgqos)
        {
            case 0:
                    printf("MQTT>>消息质量:QoS0\n");
                    break;
            case 1:
                    printf("MQTT>>消息质量:QoS1\n");
                    break;
            case 2:
                    printf("MQTT>>消息质量:QoS2\n");
                    break;
            default:
                    printf("MQTT>>错误的消息质量\n");
                    break;
        }
      printf("MQTT>>消息主题:%s\n",msg->topic);    
      printf("MQTT>>消息类容:%s\n",msg->msg);    
      printf("MQTT>>消息长度:%d\n",msg->msglenth);
      //处理完后销毁数据
      msg->valid  = 0;

    char MqttSendbuf[512]={0};
    static int timeCount=0;
    sprintf(MqttSendbuf,"Hello server %d",timeCount++);
    MQTTMsgPublish("mqttSendTopic",QOS0, 0,(uint8_t *)MqttSendbuf,strlen(MqttSendbuf));
}



/************************************************************************
** 函数名称: mqtt_pktype_ctl                        
** 函数功能: 根据包类型进行处理
** 入口参数: uint8_t packtype:包类型
** 出口参数: 无
** 备    注: 
************************************************************************/
void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen)
{
        MQTTMessage msg;
        int rc;
        MQTTString receivedTopic;
        uint32_t len;
        MQTT_USER_MSG  mqtt_user_msg;
        switch(packtype)
        {
            case PUBLISH:
                                        //拆析PUBLISH消息
                                        if(MQTTDeserialize_publish(&msg.dup,(int*)&msg.qos, &msg.retained, &msg.id, &receivedTopic,(unsigned char **)&msg.payload, &msg.payloadlen, buf, buflen) != 1)
                                                return;    
                                        //接受消息
                                        deliverMessage(&receivedTopic,&msg,&mqtt_user_msg);
                                        
                                        //消息质量不同,处理不同
                                        if(msg.qos == QOS0)
                                        {
                                             //QOS0-不需要ACK
                                             //直接处理数据
                                             UserMsgCtl(&mqtt_user_msg);
                                             return;
                                        }
                                        //发送PUBACK消息
                                        if(msg.qos == QOS1)
                                        {
                                                len =MQTTSerialize_puback(buf,buflen,mqtt_user_msg.packetid);
                                                if(len == 0)
                                                    return;
                                                //发送返回
                                                if(transport_sendPacketBuffer(buf,len)<0)
                                                     return;    
                                                //返回后处理消息
                                                UserMsgCtl(&mqtt_user_msg); 
                                                return;                                                
                                        }
                
                                        //对于质量2,只需要发送PUBREC就可以了
                                        if(msg.qos == QOS2)
                                        {
                                             len = MQTTSerialize_ack(buf, buflen, PUBREC, 0, mqtt_user_msg.packetid);                            
                                             if(len == 0)
                                                 return;
                                             //发送返回
                                             transport_sendPacketBuffer(buf,len);    
                                        }        
                            break;
            case  PUBREL:                           
                                        //解析包数据,必须包ID相同才可以
                                        rc = MQTTDeserialize_ack(&msg.type,&msg.dup, &msg.id, buf,buflen);
                                        if((rc != 1)||(msg.type != PUBREL)||(msg.id != mqtt_user_msg.packetid))
                                            return ;
                                        //收到PUBREL,需要处理并抛弃数据
                                        if(mqtt_user_msg.valid == 1)
                                        {
                                             //返回后处理消息
                                             UserMsgCtl(&mqtt_user_msg);
                                        }      
                                        //串行化PUBCMP消息
                                        len = MQTTSerialize_pubcomp(buf,buflen,msg.id);                           
                                        if(len == 0)
                                            return;                                    
                                        //发送返回--PUBCOMP
                                        transport_sendPacketBuffer(buf,len);                                        
                        break;
            case   PUBACK://等级1客户端推送数据后,服务器返回
                            break;
            case   PUBREC://等级2客户端推送数据后,服务器返回
                            break;
            case   PUBCOMP://等级2客户端推送PUBREL后,服务器返回
                            break;
            default:
                           break;
        }
}
           
  1. 创建MQTT 任务
#include <stdint.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "string.h"
#include "freertos/queue.h"
#include "malloc.h"

#include "taskMqttClient.h"
#include "../MQTT/transport.h"
#include "../MQTT/MQTTPacket.h"
#include "../MqttUtils/MqttUtils.h"
#include "../GprsUtils/GprsUtils.h"


#define HOST_NAME "183.220.123.35"
#define HOST_PORT 17788
#define USER_NAME     "jdtf1"
#define USER_PASSWD  "jdtf1"



#define MQTT_RECV_TOPIC         "mqttRecvTopic"
#define MQTT_SEND_TOPIC            "mqttSendTopic"








#define MQTT_RECV_TASK_STACKSIZE                    (configMINIMAL_STACK_SIZE*8)    
#define MQTT_RECV_TASK_PRIO                            4
static TaskHandle_t                                 MqttRecvTask_Handler;
static void mqttRecvThread(void *pvParameters);

#define MQTT_SEND_TASK_STACKSIZE                    (configMINIMAL_STACK_SIZE*8)    
#define MQTT_SEND_TASK_PRIO                            5
static TaskHandle_t                                 MqttSendTask_Handler;
static void mqttSendThread(void *pvParameters);


QueueHandle_t MQTT_Data_Queue =NULL;
#define  MQTT_QUEUE_LEN    10   /* 队列的长度,最大可包含多少个消息 */
#define  MQTT_QUEUE_SIZE   sizeof(MQTT_MSG_ITEM )   /* 队列中每个消息大小(字节) */
void mqtt_thread_init(void){
    /* 创建Test_Queue */
    MQTT_Data_Queue = xQueueCreate((UBaseType_t ) MQTT_QUEUE_LEN,(UBaseType_t ) MQTT_QUEUE_SIZE);/* 消息的大小 */
    if(NULL != MQTT_Data_Queue)
        printf("The MQTT_Data_Queue was created successfully!\r\n");
    xTaskCreate(mqttRecvThread, (const char* ) "MQTTRecvTask", MQTT_RECV_TASK_STACKSIZE, NULL, MQTT_RECV_TASK_PRIO, (TaskHandle_t*)&MqttRecvTask_Handler);
    xTaskCreate(mqttSendThread, (const char* ) "MQTTSendTask", MQTT_SEND_TASK_STACKSIZE, NULL, MQTT_SEND_TASK_PRIO, (TaskHandle_t*)&MqttSendTask_Handler);
}


static void mqttSendThread(void *pvParameters){
    /* 定义一个创建信息返回值,默认为pdTRUE */
    BaseType_t xReturn = pdTRUE;
    /* 定义一个接收消息的变量 */
    MQTT_MSG_ITEM recvMqttMsg;
    while(1){
      xReturn = xQueueReceive(MQTT_Data_Queue, &recvMqttMsg,portMAX_DELAY); /* 等待时间 3000ms */
      if(xReturn == pdTRUE){
          printf("mqttSendThread: msg=%s len=%d\r\n",recvMqttMsg.msgStr,recvMqttMsg.msgLength);
          MQTTMsgPublish((char*)MQTT_SEND_TOPIC,QOS0, 0,(uint8_t *)recvMqttMsg.msgStr,recvMqttMsg.msgLength);
          free(recvMqttMsg.msgStr);
      }
    }
}


#define MAX_TRY_CONNECT_TIMES    15
#define MAX_PING_FAILED_TIMES    3
static void mqttRecvThread(void *pvParameters){
      
        uint32_t curtick=0;
        uint8_t no_mqtt_msg_exchange = 1;
        int sessionPresent = 0;
        uint8_t buf[MSG_MAX_LEN];
        int buflen = sizeof(buf),type;    
        int isMqttServerConnected=0;
        int tryConnectServerFailedTime=0;
        int tryPingServerFFailedTimes=0;
        gprsInit(GPRSPORT,115200);
        gprsReboot();
        //获取当前滴答,作为心跳包起始时间
        curtick = xTaskGetTickCount();
        //无线循环
        printf("MQTT>>4.开始循环接收订阅的消息...\n");
        while(1)
        {

                if(!isMqttServerConnected){
                    printf("MQTT>>1.开始创建网络连接...\n");
                    if((sessionPresent=mqttConnnectServer(HOST_NAME,HOST_PORT,USER_NAME,USER_PASSWD,KEEPLIVE_TIME))<0){
                        vTaskDelay(3000/portTICK_RATE_MS);
                        if(tryConnectServerFailedTime++>MAX_TRY_CONNECT_TIMES){
                            tryConnectServerFailedTime=0;
                            gprsReboot();
                            printf("Try max %d time to connect server:%s port:%d failed,then reboot 4G model\n",MAX_TRY_CONNECT_TIMES,HOST_NAME,HOST_PORT);
                        }
                        continue;
                    }
                    printf("连接MQTT 服务器成功 ...\n");
                    isMqttServerConnected=1;
                    tryConnectServerFailedTime=0;
                    if(sessionPresent!=1){    //订阅消息
                        if(MQTTSubscribe((char*)MQTT_RECV_TOPIC,QOS0) < 0)
                        {
                                //重连服务器
                                printf("MQTT>>客户端订阅消息失败...\n");
                                transport_close();
                                isMqttServerConnected=0;
                                continue;
                        }else
                                printf("订阅主题 %s 成功 \n",MQTT_RECV_TOPIC);
                    }
                    curtick = xTaskGetTickCount();    
                }
                //表明无数据交换
                no_mqtt_msg_exchange = 1;
                //判断MQTT服务器是否有数据
                if((type=ReadPacketTimeout(buf,buflen,1000))!=-1)
                {    
                    mqtt_pktype_ctl(type,buf,buflen);
                    //表明有数据交换
                    no_mqtt_msg_exchange = 0;
                    //获取当前滴答,作为心跳包起始时间
                    curtick = xTaskGetTickCount();
                        
                }
                
            //这里主要目的是定时向服务器发送PING保活命令
             if((xTaskGetTickCount() - curtick) >(pdMS_TO_TICKS(KEEPLIVE_TIME)))
            {
                curtick = xTaskGetTickCount();
                //判断是否有数据交换
                if(no_mqtt_msg_exchange == 0)
                {
                    //如果有数据交换,这次就不需要发送PING消息
                    continue;
                }
                    
                if(my_mqtt_send_pingreq() < 0){
                         //重连服务器
                         printf("MQTT>>发送ping失败....\n");
                         if(tryPingServerFFailedTimes++>=MAX_PING_FAILED_TIMES){
                            tryPingServerFFailedTimes=0;
                             transport_close();
                             isMqttServerConnected=0;
                         }
                         continue;     
                }
                tryPingServerFFailedTimes=0;
                printf("MQTT>>发送ping作为心跳成功....\n");
                //表明有数据交换
                no_mqtt_msg_exchange = 0;
          }                      
        }
}
           

工程源码地址:https://download.csdn.net/download/du2005023029/87505817

继续阅读