上一篇主要講的是使用樹莓派與移遠BC35G模組的初始環境設定及調試而這一篇将要講 Lettuce
IOT架構中的lettuce-Sea裝置端代碼解析。
lettuce-Sea裝置端python代碼講解
上一節課我們雖然讓樹莓派與移遠BC35G模組連接配接起來了,但是他們之間的通信還需要一個“驅動”來銜接。這個“驅動”要有以下特點:
- 能不間斷的接收平台下發的指令。
- 能自動初始化NB-IoT的環境。
- 能對指令的結果進行解析,失敗自動重試
- 能定時上報心跳
- 能內建硬體的驅動,以便于更好的控制硬體
- 能完美退出
這就是我們lettuce-Sea裝置端所提供的能力。
先提供lettuce-Sea的源代碼
https://github.com/lipuqi/lettuce-Sea
首先我們要在樹莓派上搭建一個可以運作lettuce-Sea的python環境
所需環境清單
- 可運作python的環境,我安裝的是3.7.3的版本,現在最新版本是3.7.4
- RPi.GPIO子產品,這是樹莓派使用python對GPIO程式設計的子產品請自行百度安裝。我是用pip安裝的。
- pyserial子產品,這是python的序列槽調試子產品,也是必備的。我也是用pip安裝的。
- 如果你心情好安裝個git也沒問題,畢竟來回拖拽也怪麻煩的。
接下來我們來講一下lettuce-Sea的實作原理
第四篇我已經講過了,AT指令有4個類型
常用的就是查詢指令,執行指令(有參數),執行指令(無參數)
查詢指令一般都是先傳回結果,再傳回OK
例AT+CGATT? -> + CGATT:1 -> OK
執行指令(有參數)一般是直接傳回一個OK
例AT+CGATT=1 -> OK
執行指令(無參數)則跟查詢指令相同
例AT+CSQ -> + CSQ:23,99 -> OK
還有就是接收資料
例 +NNMI:2,0001
我們寫代碼就是要将這種規律寫成可以運作的程式。
lettuce-Sea是如何實作的呢?
首先lettuce-Sea是有4個線程的
主線程:主要是自動執行一些操作,例如開始運作的初始化,和子線程的開始等等。
監聽線程:主要是在開啟序列槽以後,就不斷監聽端口。看有沒有新資料上報,如果有就進行解析分類,主要分兩種。一種是主動執行的指令傳回的值,例如查詢指令,執行指令這些。
還有一種是被動接收的資料,比如平台下發一條指令給裝置。裝置就是從這個線程接收的,然後放到執行序列裡。在lettuce-Sea中,執行序列就是一個清單類型的類屬性。一會看代碼就明白了。
執行線程:就是不斷讀取執行序列,看有沒有資料,如果有資料,經裝置排程分發給相應的裝置驅動去執行。
上報心跳:這個就好了解了,就是定時向平台上報資料,告訴平台我還活着。
代碼目錄
我把AT指令,裝置,和基礎操作分為了不同的區域。
其中AT指令中ATBase是所有指令的父類,其他都是子類。其中NNMI是沒有繼承ATBase的,因為這個指令主要用于存放執行隊列。
裝置中Drive是排程類,Gpio是封裝GPIO子產品的類。led是led裝置驅動,pi裡有初始化方法,退出方法和心跳方法。
基礎操作中SerialPort是封裝pyserial子產品的類,ReceiveMsg負責接收消息,ATBugUtil是主程式。
代碼主要就是兩個流程:一個是自動執行指令,一個是被動接收指令
先講一下自動執行指令的生命周期
主程式發送一條指令給序列槽,這時主程式會阻塞等待結果(status屬性為結果辨別位,0為執行中,1為執行成功,2為失敗)
ATBase.py
class ATBase:
def __init__(self, serialPort, receiveMsg):
self.serialPort = serialPort # 序列槽基礎操作子產品
self.receiveMsg = receiveMsg # 序列槽資料接收子產品
self.at_name = "" # AT指令名稱
self.at_error_result = None # AT指令傳回資訊校驗(錯誤結果)
self.at_suc_result = None # AT指令傳回資訊校驗(正确結果)
self.status = 0 # 執行結果(0未執行1執行成功2執行失敗)
self.at_result_pattern = None # 比對傳回結果的正則類型
self.error = re.compile('ERROR') # 普通消息失敗
self.ok = re.compile('OK') # 普通消息成功
self.result = None # 執行結果
self.retry = 3 # 重試次數
# 發送at基礎方法
def send_at(self, data=None, at_type=None):
retry = self.retry
self.serialPort.write_data(self.at_name, data, at_type) # 調用序列槽基礎操作-寫入
self.receiveMsg.atObj = self # 将本類基本資訊注入序列槽資料接收以便更新執行情況
self.status = 0 # 複位執行結果
time.sleep(1)
# 執行寫入以後,主程式阻塞等待執行結果
while True:
if self.retry == 0:
print("執行失敗,不再重試")
self.retry = retry
self.off_compile_result() # 自動複位
return 2
if self.status == 1:
print("執行成功")
self.retry = retry
self.off_compile_result() # 自動複位
return 1
elif self.status == 2:
print("執行失敗,重試" + str(4 - self.retry))
self.retry -= 1
time.sleep(2)
return self.send_at(data, at_type) # 執行失敗後遞歸調用
序列槽處理後,将結果傳回對監聽線程接收。監聽線程判斷此執行的指令是否需要校驗(比如帶傳回值的可能就需要校驗,直接傳回OK/ERROR的就不需要)。無論是否校驗都會傳回一個結果辨別位status屬性,主程式擷取到屬性以後,如果成功則繼續往下執行,失敗則重試,我設定為3次重試。
ReceiveMsg.py
class ReceiveMsg:
def __init__(self, serialPort, nnmi, qlwevtind):
super(ReceiveMsg, self).__init__()
self.nnmi = nnmi # 接收資料子產品
self.qlwevtind = qlwevtind # 接收平台狀态子產品
self.serialPort = serialPort # 序列槽基礎操作子產品
self.atObj = None # 發送AT指令的子產品
self.quit_sys = 0 # 退出(此參數3個線程同步)
self.is_pause = 0 # 暫停(此參數3個線程同步)
# 處理消息的線程方法
def receive_data(self):
while self.quit_sys == 0:
while self.is_pause == 1:
pass
result = self.serialPort.read_data() # 接收資料
if result:
# 先比對是否為上報資料和平台上報狀态,不是則比對是否為發送指令的回值
if self.nnmi.at_result_pattern.search(result):
at_result = result.split(":")
print("接到資料")
print(at_result[1])
self.nnmi.add_order(at_result[1])
elif self.qlwevtind.at_result_pattern.search(result):
at_result = result.split(":")
print("接到平台狀态")
print(at_result[1])
self.qlwevtind.oc_analysis_msg(at_result[1])
elif self.atObj:
if self.atObj.compile_result(result):
self.atObj = None
else:
print("未設定比對項資料")
print(result)
被動接收指令的生命周期
華為OC平台下發指令給通信模組,通信模組将消息以序列槽發給程式。程式的監聽線程接收到消息後,添加到執行隊列中(NNMI類)。
NNMI.py
class NNMI:
def __init__(self):
self.at_name = "NNMI"
self.at_result_pattern = re.compile(self.at_name)
self.wait_list = [] # 待處理資訊清單
# 添加待處理資訊到清單
def add_order(self, order):
if order in self.wait_list:
return
else:
self.wait_list.append(order)
# 從清單删除待處理資訊
def del_order(self, order):
while order in self.wait_list:
self.wait_list.remove(order)
執行線程檢測到執行隊列有資料,讀取資料後将資料從隊列中删除。
Drive.py
class Drive:
def __init__(self, nnmi, receiveMsg, led, pi):
super(Drive, self).__init__()
self.nnmi = nnmi # 接收資料子產品
self.receiveMsg = receiveMsg # 序列槽資料接收子產品
self.led = led # 燈
self.pi = pi # 樹莓派
# 循環處查詢清單内任務線程方法
def order_monitor(self):
while self.receiveMsg.quit_sys == 0:
time.sleep(2)
while self.receiveMsg.is_pause == 1:
pass
if self.nnmi.wait_list:
for order in self.nnmi.wait_list:
self.nnmi.del_order(order)
self.analysis_msg(order)
執行線程解析消息,并将消息排程給對應的裝置
Drive.py
# 處理任務排程硬體子產品
def analysis_msg(self, data):
# 分割接收消息
msg_result = data.split(",")
msg_len = msg_result[0]
msg_data = msg_result[1]
if msg_len == "4":
message_id = int(msg_data[:2])
mid = msg_data[2:6]
value = int(msg_data[6:])
if message_id == 1:
self.led.set_mid(mid)
self.led.led_on_off(value)
elif message_id == 3:
self.led.set_mid(mid)
self.led.query_status()
elif message_id == 6:
self.pi.set_mid(mid)
self.pi.execute_quit()
else:
print("無解析類型")
else:
print("指令下發長度不符合")
裝置接收到消息後進行相關處理。處理成功以後,發送響應給華為OC平台。
Led.py
class Led:
def __init__(self, ioObj, nmgs):
super(Led, self).__init__()
self.status = 0 # 目前燈的狀态
self.ioObj = ioObj # GPIO裝置基礎操作子產品
self.nmgs = nmgs # 發送消息子產品
self.io_id = 4 # 目前裝置使用的針腳編号
self.ioObj.setup_out_io(self.io_id) # 初始化IO口
self.mid = None # 指令下發的響應碼
# 處理燈的開關指令
def led_on_off(self, data):
suc = "02" + self.mid + "00" + '%02x' % data # 成功的傳回資訊
fail = "02" + self.mid + "01" + '%02x' % data # 失敗的傳回資訊
if data != 0 and data != 1:
print("開關燈隻接受0/1")
self.nmgs.execute_at(fail)
else:
# 如果指令下發狀态與目前燈狀态一緻則直接發送成功
if data == self.status:
self.nmgs.execute_at(suc)
else:
execute_result = self.ioObj.execute_output(self.io_id, data)
# 如果執行成功就響應成功
if execute_result == 1:
self.status = data
self.nmgs.execute_at(suc)
else:
self.nmgs.execute_at(fail)
下面解析部分代碼
初始化:
監聽線程和執行線程啟動後再執行初始化,進入初始化時會先進入快速初始化,如果快速初始化失敗,則會進入預設初始化。預設初始化執行重試3次仍然失敗,則進入退出流程。成功則啟動心跳線程。
Pi.py
# 系統初始化
def sys_init(self, index=0):
# 如果重試3次皆失敗,判定初始化失敗
if index == 3:
return 2
print("開始初始化")
self.cfun.execute_at("1") # 開啟射頻
self.csq.execute_at() # 查詢信号
time.sleep(5)
cg_res = self.cgatt.integration_at("1") # 入網
# 檢視入網是否成功
if cg_res == 2:
print("初始化遇到問題,開始預設初始化")
self.nconfig.execute_at("AUTOCONNECT,FALSE") # 手動入網模式
self.nrb.execute_at() # 重新開機
index += 1
self.sys_init(index) # 遞歸
print("結束初始化")
return 1
校驗結果:
校驗結果分三種
- 普通校驗,如果發生沒有校驗項時則會進入普通校驗,也就是校驗OK/ERROR,OK為成功,ERROR為失敗。
- 校驗正确值,就是如果結果與設定值相同為執行成功,不相同為失敗。這裡會調用父類的校驗正确值開關方法,将設定值入參。
- 校驗錯誤值,就是如果結果與設定值相同為執行失敗,不相同為成功。這裡會調用父類的校驗錯誤值開關方法,将設定值入參。
ATBugUtil.py
# 校驗傳回結果
def compile_result(self, result):
# 有關鍵字正則表示需要結果校驗,沒有則隻判斷成功/失敗
if self.at_result_pattern:
if self.at_result_pattern.search(result):
at_result = result.split(":")
if self.at_error_result:
self.status = self.compile_error_result(at_result[1])
print(result)
elif self.at_suc_result:
self.status = self.compile_suc_result(at_result[1])
print(result)
else:
if self.ok.search(result):
self.status = 1
print(result)
return True
elif self.error.search(result):
self.status = 2
print(result)
return True
return False # 傳回 True/False是因為表示指令執行成功
# 開啟錯誤校驗結果
def on_compile_error_result(self, error):
self.at_result_pattern = re.compile(self.at_name)
self.at_error_result = error
# 校驗錯誤結果
def compile_error_result(self, data):
self.result = data
if data != self.at_error_result:
return 1
return 2
# 開啟正确校驗結果
def on_compile_suc_result(self, suc):
self.at_result_pattern = re.compile(self.at_name)
self.at_suc_result = suc
# 校驗正确結果
def compile_suc_result(self, data):
self.result = data
if data == self.at_suc_result:
return 1
return 2
在發送指令時,會将指令本身發給監聽線程類,監聽線程接收到資料以後會讀取發給他的指令本身裡的校驗值。并根據不同的校驗值執行不同的校驗方法。
核心點就是這些的,其他的還需要開發者自己去專研。