天天看點

之六-嘔血制作-Lettuce IOT架構-移遠BC35G+樹莓派+華為OC+SpringBoot背景+微信小程式lettuce-Sea裝置端python代碼講解

上一篇主要講的是使用樹莓派與移遠BC35G模組的初始環境設定及調試而這一篇将要講 Lettuce

IOT架構中的lettuce-Sea裝置端代碼解析。

lettuce-Sea裝置端python代碼講解

上一節課我們雖然讓樹莓派與移遠BC35G模組連接配接起來了,但是他們之間的通信還需要一個“驅動”來銜接。這個“驅動”要有以下特點:

  1. 能不間斷的接收平台下發的指令。
  2. 能自動初始化NB-IoT的環境。
  3. 能對指令的結果進行解析,失敗自動重試
  4. 能定時上報心跳
  5. 能內建硬體的驅動,以便于更好的控制硬體
  6. 能完美退出

這就是我們lettuce-Sea裝置端所提供的能力。

先提供lettuce-Sea的源代碼

https://github.com/lipuqi/lettuce-Sea

首先我們要在樹莓派上搭建一個可以運作lettuce-Sea的python環境

所需環境清單

  1. 可運作python的環境,我安裝的是3.7.3的版本,現在最新版本是3.7.4
  2. RPi.GPIO子產品,這是樹莓派使用python對GPIO程式設計的子產品請自行百度安裝。我是用pip安裝的。
  3. pyserial子產品,這是python的序列槽調試子產品,也是必備的。我也是用pip安裝的。
  4. 如果你心情好安裝個git也沒問題,畢竟來回拖拽也怪麻煩的。

接下來我們來講一下lettuce-Sea的實作原理

第四篇我已經講過了,AT指令有4個類型

之六-嘔血制作-Lettuce IOT架構-移遠BC35G+樹莓派+華為OC+SpringBoot背景+微信小程式lettuce-Sea裝置端python代碼講解

常用的就是查詢指令,執行指令(有參數),執行指令(無參數)

查詢指令一般都是先傳回結果,再傳回OK

例AT+CGATT? -> + CGATT:1 -> OK

執行指令(有參數)一般是直接傳回一個OK

例AT+CGATT=1 -> OK

執行指令(無參數)則跟查詢指令相同

例AT+CSQ -> + CSQ:23,99 -> OK

還有就是接收資料

例 +NNMI:2,0001

我們寫代碼就是要将這種規律寫成可以運作的程式。

lettuce-Sea是如何實作的呢?

首先lettuce-Sea是有4個線程的

之六-嘔血制作-Lettuce IOT架構-移遠BC35G+樹莓派+華為OC+SpringBoot背景+微信小程式lettuce-Sea裝置端python代碼講解

主線程:主要是自動執行一些操作,例如開始運作的初始化,和子線程的開始等等。

監聽線程:主要是在開啟序列槽以後,就不斷監聽端口。看有沒有新資料上報,如果有就進行解析分類,主要分兩種。一種是主動執行的指令傳回的值,例如查詢指令,執行指令這些。

還有一種是被動接收的資料,比如平台下發一條指令給裝置。裝置就是從這個線程接收的,然後放到執行序列裡。在lettuce-Sea中,執行序列就是一個清單類型的類屬性。一會看代碼就明白了。

執行線程:就是不斷讀取執行序列,看有沒有資料,如果有資料,經裝置排程分發給相應的裝置驅動去執行。

上報心跳:這個就好了解了,就是定時向平台上報資料,告訴平台我還活着。

代碼目錄

之六-嘔血制作-Lettuce IOT架構-移遠BC35G+樹莓派+華為OC+SpringBoot背景+微信小程式lettuce-Sea裝置端python代碼講解

我把AT指令,裝置,和基礎操作分為了不同的區域。

其中AT指令中ATBase是所有指令的父類,其他都是子類。其中NNMI是沒有繼承ATBase的,因為這個指令主要用于存放執行隊列。

裝置中Drive是排程類,Gpio是封裝GPIO子產品的類。led是led裝置驅動,pi裡有初始化方法,退出方法和心跳方法。

基礎操作中SerialPort是封裝pyserial子產品的類,ReceiveMsg負責接收消息,ATBugUtil是主程式。

代碼主要就是兩個流程:一個是自動執行指令,一個是被動接收指令

先講一下自動執行指令的生命周期

之六-嘔血制作-Lettuce IOT架構-移遠BC35G+樹莓派+華為OC+SpringBoot背景+微信小程式lettuce-Sea裝置端python代碼講解

主程式發送一條指令給序列槽,這時主程式會阻塞等待結果(status屬性為結果辨別位,0為執行中,1為執行成功,2為失敗)

ATBase.py

class ATBase:

    def __init__(self, serialPort, receiveMsg):
        self.serialPort = serialPort  # 序列槽基礎操作子產品
        self.receiveMsg = receiveMsg  # 序列槽資料接收子產品
        self.at_name = ""  # AT指令名稱
        self.at_error_result = None  # AT指令傳回資訊校驗(錯誤結果)
        self.at_suc_result = None  # AT指令傳回資訊校驗(正确結果)
        self.status = 0  # 執行結果(0未執行1執行成功2執行失敗)
        self.at_result_pattern = None  # 比對傳回結果的正則類型
        self.error = re.compile('ERROR')  # 普通消息失敗
        self.ok = re.compile('OK')  # 普通消息成功
        self.result = None  # 執行結果
        self.retry = 3  # 重試次數

    # 發送at基礎方法
    def send_at(self, data=None, at_type=None):
        retry = self.retry
        self.serialPort.write_data(self.at_name, data, at_type)  # 調用序列槽基礎操作-寫入
        self.receiveMsg.atObj = self  # 将本類基本資訊注入序列槽資料接收以便更新執行情況
        self.status = 0  # 複位執行結果
        time.sleep(1)
        # 執行寫入以後,主程式阻塞等待執行結果
        while True:
            if self.retry == 0:
                print("執行失敗,不再重試")
                self.retry = retry
                self.off_compile_result()  # 自動複位
                return 2
            if self.status == 1:
                print("執行成功")
                self.retry = retry
                self.off_compile_result()  # 自動複位
                return 1
            elif self.status == 2:
                print("執行失敗,重試" + str(4 - self.retry))
                self.retry -= 1
                time.sleep(2)
                return self.send_at(data, at_type)  # 執行失敗後遞歸調用
           

序列槽處理後,将結果傳回對監聽線程接收。監聽線程判斷此執行的指令是否需要校驗(比如帶傳回值的可能就需要校驗,直接傳回OK/ERROR的就不需要)。無論是否校驗都會傳回一個結果辨別位status屬性,主程式擷取到屬性以後,如果成功則繼續往下執行,失敗則重試,我設定為3次重試。

ReceiveMsg.py

class ReceiveMsg:

    def __init__(self, serialPort, nnmi, qlwevtind):
        super(ReceiveMsg, self).__init__()
        self.nnmi = nnmi   # 接收資料子產品
        self.qlwevtind = qlwevtind   # 接收平台狀态子產品
        self.serialPort = serialPort  # 序列槽基礎操作子產品
        self.atObj = None  # 發送AT指令的子產品
        self.quit_sys = 0  # 退出(此參數3個線程同步)
        self.is_pause = 0  # 暫停(此參數3個線程同步)

    #  處理消息的線程方法
    def receive_data(self):
        while self.quit_sys == 0:
            while self.is_pause == 1:
                pass
            result = self.serialPort.read_data()  # 接收資料
            if result:
                # 先比對是否為上報資料和平台上報狀态,不是則比對是否為發送指令的回值
                if self.nnmi.at_result_pattern.search(result):
                    at_result = result.split(":")
                    print("接到資料")
                    print(at_result[1])
                    self.nnmi.add_order(at_result[1])
                elif self.qlwevtind.at_result_pattern.search(result):
                    at_result = result.split(":")
                    print("接到平台狀态")
                    print(at_result[1])
                    self.qlwevtind.oc_analysis_msg(at_result[1])
                elif self.atObj:
                    if self.atObj.compile_result(result):
                        self.atObj = None
                else:
                    print("未設定比對項資料")
                    print(result)
           

被動接收指令的生命周期

之六-嘔血制作-Lettuce IOT架構-移遠BC35G+樹莓派+華為OC+SpringBoot背景+微信小程式lettuce-Sea裝置端python代碼講解

華為OC平台下發指令給通信模組,通信模組将消息以序列槽發給程式。程式的監聽線程接收到消息後,添加到執行隊列中(NNMI類)。

NNMI.py

class NNMI:

    def __init__(self):
        self.at_name = "NNMI"
        self.at_result_pattern = re.compile(self.at_name)
        self.wait_list = []  # 待處理資訊清單

    # 添加待處理資訊到清單
    def add_order(self, order):
        if order in self.wait_list:
            return
        else:
            self.wait_list.append(order)

    # 從清單删除待處理資訊
    def del_order(self, order):
        while order in self.wait_list:
            self.wait_list.remove(order)

           

執行線程檢測到執行隊列有資料,讀取資料後将資料從隊列中删除。

Drive.py

class Drive:

    def __init__(self, nnmi, receiveMsg, led, pi):
        super(Drive, self).__init__()
        self.nnmi = nnmi  # 接收資料子產品
        self.receiveMsg = receiveMsg  # 序列槽資料接收子產品
        self.led = led  # 燈
        self.pi = pi  # 樹莓派

    # 循環處查詢清單内任務線程方法
    def order_monitor(self):
        while self.receiveMsg.quit_sys == 0:
            time.sleep(2)
            while self.receiveMsg.is_pause == 1:
                pass
            if self.nnmi.wait_list:
                for order in self.nnmi.wait_list:
                    self.nnmi.del_order(order)
                    self.analysis_msg(order)
           

執行線程解析消息,并将消息排程給對應的裝置

Drive.py

# 處理任務排程硬體子產品
    def analysis_msg(self, data):
        # 分割接收消息
        msg_result = data.split(",")
        msg_len = msg_result[0]
        msg_data = msg_result[1]
        if msg_len == "4":
            message_id = int(msg_data[:2])
            mid = msg_data[2:6]
            value = int(msg_data[6:])
            if message_id == 1:
                self.led.set_mid(mid)
                self.led.led_on_off(value)
            elif message_id == 3:
                self.led.set_mid(mid)
                self.led.query_status()
            elif message_id == 6:
                self.pi.set_mid(mid)
                self.pi.execute_quit()
            else:
                print("無解析類型")
        else:
            print("指令下發長度不符合")
           

裝置接收到消息後進行相關處理。處理成功以後,發送響應給華為OC平台。

Led.py

class Led:
    def __init__(self, ioObj, nmgs):
        super(Led, self).__init__()
        self.status = 0  # 目前燈的狀态
        self.ioObj = ioObj  # GPIO裝置基礎操作子產品
        self.nmgs = nmgs  # 發送消息子產品
        self.io_id = 4  # 目前裝置使用的針腳編号
        self.ioObj.setup_out_io(self.io_id)  # 初始化IO口
        self.mid = None  # 指令下發的響應碼

    # 處理燈的開關指令
    def led_on_off(self, data):
        suc = "02" + self.mid + "00" + '%02x' % data  # 成功的傳回資訊
        fail = "02" + self.mid + "01" + '%02x' % data  # 失敗的傳回資訊
        if data != 0 and data != 1:
            print("開關燈隻接受0/1")
            self.nmgs.execute_at(fail)
        else:
            # 如果指令下發狀态與目前燈狀态一緻則直接發送成功
            if data == self.status:
                self.nmgs.execute_at(suc)
            else:
                execute_result = self.ioObj.execute_output(self.io_id, data)
                # 如果執行成功就響應成功
                if execute_result == 1:
                    self.status = data
                    self.nmgs.execute_at(suc)
                else:
                    self.nmgs.execute_at(fail)
           

下面解析部分代碼

初始化:

監聽線程和執行線程啟動後再執行初始化,進入初始化時會先進入快速初始化,如果快速初始化失敗,則會進入預設初始化。預設初始化執行重試3次仍然失敗,則進入退出流程。成功則啟動心跳線程。

Pi.py

# 系統初始化
    def sys_init(self, index=0):
        # 如果重試3次皆失敗,判定初始化失敗
        if index == 3:
            return 2
        print("開始初始化")
        self.cfun.execute_at("1")  # 開啟射頻
        self.csq.execute_at()  # 查詢信号
        time.sleep(5)
        cg_res = self.cgatt.integration_at("1")  # 入網
        # 檢視入網是否成功
        if cg_res == 2:
            print("初始化遇到問題,開始預設初始化")
            self.nconfig.execute_at("AUTOCONNECT,FALSE")  # 手動入網模式
            self.nrb.execute_at()  # 重新開機
            index += 1
            self.sys_init(index)  # 遞歸
        print("結束初始化")
        return 1
           

校驗結果:

校驗結果分三種

  1. 普通校驗,如果發生沒有校驗項時則會進入普通校驗,也就是校驗OK/ERROR,OK為成功,ERROR為失敗。
  2. 校驗正确值,就是如果結果與設定值相同為執行成功,不相同為失敗。這裡會調用父類的校驗正确值開關方法,将設定值入參。
  3. 校驗錯誤值,就是如果結果與設定值相同為執行失敗,不相同為成功。這裡會調用父類的校驗錯誤值開關方法,将設定值入參。

ATBugUtil.py

# 校驗傳回結果
    def compile_result(self, result):
        # 有關鍵字正則表示需要結果校驗,沒有則隻判斷成功/失敗
        if self.at_result_pattern:
            if self.at_result_pattern.search(result):
                at_result = result.split(":")
                if self.at_error_result:
                    self.status = self.compile_error_result(at_result[1])
                    print(result)
                elif self.at_suc_result:
                    self.status = self.compile_suc_result(at_result[1])
                    print(result)
        else:
            if self.ok.search(result):
                self.status = 1
                print(result)
                return True
            elif self.error.search(result):
                self.status = 2
                print(result)
                return True
        return False  # 傳回 True/False是因為表示指令執行成功

    # 開啟錯誤校驗結果
    def on_compile_error_result(self, error):
        self.at_result_pattern = re.compile(self.at_name)
        self.at_error_result = error

    # 校驗錯誤結果
    def compile_error_result(self, data):
        self.result = data
        if data != self.at_error_result:
            return 1
        return 2

    # 開啟正确校驗結果
    def on_compile_suc_result(self, suc):
        self.at_result_pattern = re.compile(self.at_name)
        self.at_suc_result = suc

    # 校驗正确結果
    def compile_suc_result(self, data):
        self.result = data
        if data == self.at_suc_result:
            return 1
        return 2
           

在發送指令時,會将指令本身發給監聽線程類,監聽線程接收到資料以後會讀取發給他的指令本身裡的校驗值。并根據不同的校驗值執行不同的校驗方法。

核心點就是這些的,其他的還需要開發者自己去專研。

下一章節将講lettuce-Sea在樹莓派上的調試,以及使用華為OC平台完成資料的接收和指令的下發。
歡迎加入我們的QQ群一起讨論IOT的問題。
之六-嘔血制作-Lettuce IOT架構-移遠BC35G+樹莓派+華為OC+SpringBoot背景+微信小程式lettuce-Sea裝置端python代碼講解

繼續閱讀