天天看点

之六-呕血制作-Lettuce IOT框架-移远BC35G+树莓派+华为OC+SpringBoot后台+微信小程序lettuce-Sea设备端python代码讲解

上一篇主要讲的是使用树莓派与移远BC35G模组的初始环境设置及调试而这一篇将要讲 Lettuce

IOT框架中的lettuce-Sea设备端代码解析。

lettuce-Sea设备端python代码讲解

上一节课我们虽然让树莓派与移远BC35G模组连接起来了,但是他们之间的通信还需要一个“驱动”来衔接。这个“驱动”要有以下特点:

  1. 能不间断的接收平台下发的指令。
  2. 能自动初始化NB-IoT的环境。
  3. 能对指令的结果进行解析,失败自动重试
  4. 能定时上报心跳
  5. 能集成硬件的驱动,以便于更好的控制硬件
  6. 能完美退出

这就是我们lettuce-Sea设备端所提供的能力。

先提供lettuce-Sea的源代码

https://github.com/lipuqi/lettuce-Sea

首先我们要在树莓派上搭建一个可以运行lettuce-Sea的python环境

所需环境清单

  1. 可运行python的环境,我安装的是3.7.3的版本,现在最新版本是3.7.4
  2. RPi.GPIO模块,这是树莓派使用python对GPIO编程的模块请自行百度安装。我是用pip安装的。
  3. pyserial模块,这是python的串口调试模块,也是必备的。我也是用pip安装的。
  4. 如果你心情好安装个git也没问题,毕竟来回拖拽也怪麻烦的。

接下来我们来讲一下lettuce-Sea的实现原理

第四篇我已经讲过了,AT指令有4个类型

之六-呕血制作-Lettuce IOT框架-移远BC35G+树莓派+华为OC+SpringBoot后台+微信小程序lettuce-Sea设备端python代码讲解

常用的就是查询指令,执行指令(有参数),执行指令(无参数)

查询指令一般都是先返回结果,再返回OK

例AT+CGATT? -> + CGATT:1 -> OK

执行指令(有参数)一般是直接返回一个OK

例AT+CGATT=1 -> OK

执行指令(无参数)则跟查询指令相同

例AT+CSQ -> + CSQ:23,99 -> OK

还有就是接收数据

例 +NNMI:2,0001

我们写代码就是要将这种规律写成可以运行的程序。

lettuce-Sea是如何实现的呢?

首先lettuce-Sea是有4个线程的

之六-呕血制作-Lettuce IOT框架-移远BC35G+树莓派+华为OC+SpringBoot后台+微信小程序lettuce-Sea设备端python代码讲解

主线程:主要是自动执行一些操作,例如开始运行的初始化,和子线程的开始等等。

监听线程:主要是在开启串口以后,就不断监听端口。看有没有新数据上报,如果有就进行解析分类,主要分两种。一种是主动执行的指令返回的值,例如查询指令,执行指令这些。

还有一种是被动接收的数据,比如平台下发一条指令给设备。设备就是从这个线程接收的,然后放到执行序列里。在lettuce-Sea中,执行序列就是一个列表类型的类属性。一会看代码就明白了。

执行线程:就是不断读取执行序列,看有没有数据,如果有数据,经设备调度分发给相应的设备驱动去执行。

上报心跳:这个就好理解了,就是定时向平台上报数据,告诉平台我还活着。

代码目录

之六-呕血制作-Lettuce IOT框架-移远BC35G+树莓派+华为OC+SpringBoot后台+微信小程序lettuce-Sea设备端python代码讲解

我把AT指令,设备,和基础操作分为了不同的区域。

其中AT指令中ATBase是所有指令的父类,其他都是子类。其中NNMI是没有继承ATBase的,因为这个指令主要用于存放执行队列。

设备中Drive是调度类,Gpio是封装GPIO模块的类。led是led设备驱动,pi里有初始化方法,退出方法和心跳方法。

基础操作中SerialPort是封装pyserial模块的类,ReceiveMsg负责接收消息,ATBugUtil是主程序。

代码主要就是两个流程:一个是自动执行指令,一个是被动接收指令

先讲一下自动执行指令的生命周期

之六-呕血制作-Lettuce IOT框架-移远BC35G+树莓派+华为OC+SpringBoot后台+微信小程序lettuce-Sea设备端python代码讲解

主程序发送一条指令给串口,这时主程序会阻塞等待结果(status属性为结果标识位,0为执行中,1为执行成功,2为失败)

ATBase.py

class ATBase:

    def __init__(self, serialPort, receiveMsg):
        self.serialPort = serialPort  # 串口基础操作模块
        self.receiveMsg = receiveMsg  # 串口数据接收模块
        self.at_name = ""  # AT指令名称
        self.at_error_result = None  # AT指令返回信息校验(错误结果)
        self.at_suc_result = None  # AT指令返回信息校验(正确结果)
        self.status = 0  # 执行结果(0未执行1执行成功2执行失败)
        self.at_result_pattern = None  # 匹配返回结果的正则类型
        self.error = re.compile('ERROR')  # 普通消息失败
        self.ok = re.compile('OK')  # 普通消息成功
        self.result = None  # 执行结果
        self.retry = 3  # 重试次数

    # 发送at基础方法
    def send_at(self, data=None, at_type=None):
        retry = self.retry
        self.serialPort.write_data(self.at_name, data, at_type)  # 调用串口基础操作-写入
        self.receiveMsg.atObj = self  # 将本类基本信息注入串口数据接收以便更新执行情况
        self.status = 0  # 复位执行结果
        time.sleep(1)
        # 执行写入以后,主程序阻塞等待执行结果
        while True:
            if self.retry == 0:
                print("执行失败,不再重试")
                self.retry = retry
                self.off_compile_result()  # 自动复位
                return 2
            if self.status == 1:
                print("执行成功")
                self.retry = retry
                self.off_compile_result()  # 自动复位
                return 1
            elif self.status == 2:
                print("执行失败,重试" + str(4 - self.retry))
                self.retry -= 1
                time.sleep(2)
                return self.send_at(data, at_type)  # 执行失败后递归调用
           

串口处理后,将结果返回对监听线程接收。监听线程判断此执行的指令是否需要校验(比如带返回值的可能就需要校验,直接返回OK/ERROR的就不需要)。无论是否校验都会返回一个结果标识位status属性,主程序获取到属性以后,如果成功则继续往下执行,失败则重试,我设定为3次重试。

ReceiveMsg.py

class ReceiveMsg:

    def __init__(self, serialPort, nnmi, qlwevtind):
        super(ReceiveMsg, self).__init__()
        self.nnmi = nnmi   # 接收数据模块
        self.qlwevtind = qlwevtind   # 接收平台状态模块
        self.serialPort = serialPort  # 串口基础操作模块
        self.atObj = None  # 发送AT指令的模块
        self.quit_sys = 0  # 退出(此参数3个线程同步)
        self.is_pause = 0  # 暂停(此参数3个线程同步)

    #  处理消息的线程方法
    def receive_data(self):
        while self.quit_sys == 0:
            while self.is_pause == 1:
                pass
            result = self.serialPort.read_data()  # 接收数据
            if result:
                # 先匹配是否为上报数据和平台上报状态,不是则匹配是否为发送指令的回值
                if self.nnmi.at_result_pattern.search(result):
                    at_result = result.split(":")
                    print("接到数据")
                    print(at_result[1])
                    self.nnmi.add_order(at_result[1])
                elif self.qlwevtind.at_result_pattern.search(result):
                    at_result = result.split(":")
                    print("接到平台状态")
                    print(at_result[1])
                    self.qlwevtind.oc_analysis_msg(at_result[1])
                elif self.atObj:
                    if self.atObj.compile_result(result):
                        self.atObj = None
                else:
                    print("未设置匹配项数据")
                    print(result)
           

被动接收指令的生命周期

之六-呕血制作-Lettuce IOT框架-移远BC35G+树莓派+华为OC+SpringBoot后台+微信小程序lettuce-Sea设备端python代码讲解

华为OC平台下发指令给通信模组,通信模组将消息以串口发给程序。程序的监听线程接收到消息后,添加到执行队列中(NNMI类)。

NNMI.py

class NNMI:

    def __init__(self):
        self.at_name = "NNMI"
        self.at_result_pattern = re.compile(self.at_name)
        self.wait_list = []  # 待处理信息列表

    # 添加待处理信息到列表
    def add_order(self, order):
        if order in self.wait_list:
            return
        else:
            self.wait_list.append(order)

    # 从列表删除待处理信息
    def del_order(self, order):
        while order in self.wait_list:
            self.wait_list.remove(order)

           

执行线程检测到执行队列有数据,读取数据后将数据从队列中删除。

Drive.py

class Drive:

    def __init__(self, nnmi, receiveMsg, led, pi):
        super(Drive, self).__init__()
        self.nnmi = nnmi  # 接收数据模块
        self.receiveMsg = receiveMsg  # 串口数据接收模块
        self.led = led  # 灯
        self.pi = pi  # 树莓派

    # 循环处查询列表内任务线程方法
    def order_monitor(self):
        while self.receiveMsg.quit_sys == 0:
            time.sleep(2)
            while self.receiveMsg.is_pause == 1:
                pass
            if self.nnmi.wait_list:
                for order in self.nnmi.wait_list:
                    self.nnmi.del_order(order)
                    self.analysis_msg(order)
           

执行线程解析消息,并将消息调度给对应的设备

Drive.py

# 处理任务调度硬件模块
    def analysis_msg(self, data):
        # 分割接收消息
        msg_result = data.split(",")
        msg_len = msg_result[0]
        msg_data = msg_result[1]
        if msg_len == "4":
            message_id = int(msg_data[:2])
            mid = msg_data[2:6]
            value = int(msg_data[6:])
            if message_id == 1:
                self.led.set_mid(mid)
                self.led.led_on_off(value)
            elif message_id == 3:
                self.led.set_mid(mid)
                self.led.query_status()
            elif message_id == 6:
                self.pi.set_mid(mid)
                self.pi.execute_quit()
            else:
                print("无解析类型")
        else:
            print("命令下发长度不符合")
           

设备接收到消息后进行相关处理。处理成功以后,发送响应给华为OC平台。

Led.py

class Led:
    def __init__(self, ioObj, nmgs):
        super(Led, self).__init__()
        self.status = 0  # 当前灯的状态
        self.ioObj = ioObj  # GPIO设备基础操作模块
        self.nmgs = nmgs  # 发送消息模块
        self.io_id = 4  # 当前设备使用的针脚编号
        self.ioObj.setup_out_io(self.io_id)  # 初始化IO口
        self.mid = None  # 命令下发的响应码

    # 处理灯的开关命令
    def led_on_off(self, data):
        suc = "02" + self.mid + "00" + '%02x' % data  # 成功的返回信息
        fail = "02" + self.mid + "01" + '%02x' % data  # 失败的返回信息
        if data != 0 and data != 1:
            print("开关灯只接受0/1")
            self.nmgs.execute_at(fail)
        else:
            # 如果命令下发状态与当前灯状态一致则直接发送成功
            if data == self.status:
                self.nmgs.execute_at(suc)
            else:
                execute_result = self.ioObj.execute_output(self.io_id, data)
                # 如果执行成功就响应成功
                if execute_result == 1:
                    self.status = data
                    self.nmgs.execute_at(suc)
                else:
                    self.nmgs.execute_at(fail)
           

下面解析部分代码

初始化:

监听线程和执行线程启动后再执行初始化,进入初始化时会先进入快速初始化,如果快速初始化失败,则会进入默认初始化。默认初始化执行重试3次仍然失败,则进入退出流程。成功则启动心跳线程。

Pi.py

# 系统初始化
    def sys_init(self, index=0):
        # 如果重试3次皆失败,判定初始化失败
        if index == 3:
            return 2
        print("开始初始化")
        self.cfun.execute_at("1")  # 开启射频
        self.csq.execute_at()  # 查询信号
        time.sleep(5)
        cg_res = self.cgatt.integration_at("1")  # 入网
        # 查看入网是否成功
        if cg_res == 2:
            print("初始化遇到问题,开始默认初始化")
            self.nconfig.execute_at("AUTOCONNECT,FALSE")  # 手动入网模式
            self.nrb.execute_at()  # 重启
            index += 1
            self.sys_init(index)  # 递归
        print("结束初始化")
        return 1
           

校验结果:

校验结果分三种

  1. 普通校验,如果发生没有校验项时则会进入普通校验,也就是校验OK/ERROR,OK为成功,ERROR为失败。
  2. 校验正确值,就是如果结果与设定值相同为执行成功,不相同为失败。这里会调用父类的校验正确值开关方法,将设定值入参。
  3. 校验错误值,就是如果结果与设定值相同为执行失败,不相同为成功。这里会调用父类的校验错误值开关方法,将设定值入参。

ATBugUtil.py

# 校验返回结果
    def compile_result(self, result):
        # 有关键字正则表示需要结果校验,没有则只判断成功/失败
        if self.at_result_pattern:
            if self.at_result_pattern.search(result):
                at_result = result.split(":")
                if self.at_error_result:
                    self.status = self.compile_error_result(at_result[1])
                    print(result)
                elif self.at_suc_result:
                    self.status = self.compile_suc_result(at_result[1])
                    print(result)
        else:
            if self.ok.search(result):
                self.status = 1
                print(result)
                return True
            elif self.error.search(result):
                self.status = 2
                print(result)
                return True
        return False  # 返回 True/False是因为表示指令执行成功

    # 开启错误校验结果
    def on_compile_error_result(self, error):
        self.at_result_pattern = re.compile(self.at_name)
        self.at_error_result = error

    # 校验错误结果
    def compile_error_result(self, data):
        self.result = data
        if data != self.at_error_result:
            return 1
        return 2

    # 开启正确校验结果
    def on_compile_suc_result(self, suc):
        self.at_result_pattern = re.compile(self.at_name)
        self.at_suc_result = suc

    # 校验正确结果
    def compile_suc_result(self, data):
        self.result = data
        if data == self.at_suc_result:
            return 1
        return 2
           

在发送指令时,会将指令本身发给监听线程类,监听线程接收到数据以后会读取发给他的指令本身里的校验值。并根据不同的校验值执行不同的校验方法。

核心点就是这些的,其他的还需要开发者自己去专研。

下一章节将讲lettuce-Sea在树莓派上的调试,以及使用华为OC平台完成数据的接收和指令的下发。
欢迎加入我们的QQ群一起讨论IOT的问题。
之六-呕血制作-Lettuce IOT框架-移远BC35G+树莓派+华为OC+SpringBoot后台+微信小程序lettuce-Sea设备端python代码讲解

继续阅读