使用伺服器端rpc來控制模拟的溫度傳感器
效果圖如下:

下面把我的設定和代碼記錄一下:
增加裝置
如果使用伺服器端RPC指令,不需要在增加規則鍊了
增加控制按鈕
注意這裡的methond方法
getvalue setvalue要與代碼中對應的接收方法一緻
rpc代碼(C#使用mqttnet)
主要的代碼是下面這些
public static void Main(string[] args)
{
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
//建立tcp options 用于連接配接mqtt
var options = new MqttClientOptionsBuilder()
//clinet的名稱
.WithClientId("client3")
//mqtt broker的伺服器位址IP,預設的是1883端口
.WithTcpServer("192.168.137.131", 1883)
.WithCredentials("FOvUIh5e7sHD4BWKceb2", "")
//hivemq的登入使用者名
.WithUserProperty("FOvUIh5e7sHD4BWKceb2", "")
.WithCleanSession()
.Build();
mqttClient.ConnectAsync(options, CancellationToken.None);
mqttClient.UseConnectedHandler(e =>
{
Console.WriteLine("連接配接成功");
Console.WriteLine("目前連接配接線程名稱:" + Thread.CurrentThread.ManagedThreadId.ToString());
mqttClient.SubscribeAsync("v1/devices/me/rpc/request/+");
});
mqttClient.UseApplicationMessageReceivedHandler(e =>
{
string topic = e.ApplicationMessage.Topic;
Console.WriteLine("目前接受線程名稱:" + Thread.CurrentThread.ManagedThreadId.ToString());
if (topic.StartsWith("v1/devices/me/rpc/request/"))
{
string requestId = topic.Substring("v1/devices/me/rpc/request/".Length);
JObject data = JObject.Parse(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
if (data.GetValue("method").ToString() == "getValue")
{
var temp = new JObject { { "temperature", temperature } };
var message = new MqttApplicationMessageBuilder()
.WithTopic("v1/devices/me/rpc/response/" + requestId)
.WithPayload(temp.ToString())
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
//執行發送資料到tb
Console.WriteLine(DateTime.Now + "發送資料到tb");
mqttClient.PublishAsync(message, CancellationToken.None);
}
else if (data.GetValue("method").ToString() == "setValue")
{
string param = data.GetValue("params").ToString();
setValue(param);
}
}
});
Console.ReadLine();
}
其中下面這些就是解析rpc指令的
if (topic.StartsWith("v1/devices/me/rpc/request/"))
{
string requestId = topic.Substring("v1/devices/me/rpc/request/".Length);
JObject data = JObject.Parse(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
if (data.GetValue("method").ToString() == "getValue")
{
var temp = new JObject { { "temperature", temperature } };
var message = new MqttApplicationMessageBuilder()
.WithTopic("v1/devices/me/rpc/response/" + requestId)
.WithPayload(temp.ToString())
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
//執行發送資料到tb
Console.WriteLine(DateTime.Now + "發送資料到tb");
mqttClient.PublishAsync(message, CancellationToken.None);
}
else if (data.GetValue("method").ToString() == "setValue")
{
string param = data.GetValue("params").ToString();
setValue(param);
}
}
});
PS:
這裡沒有實作将溫控器設定的溫度值回傳到tb中,我感覺是線程的問題,因為我測試郭增加線程來處理資料回傳就會導緻不能擷取到溫控器的資料,這兩個線程不是一個。
這裡的問題留待後面研究。
python 版的mqtt
python版本的mqtt可以實作線程間的通信及實時監控,這裡也分享給大家
# -*- coding: utf-8 -*-
"""
Created on Sat Feb 6 14:10:01 2021
"""
# This Program illustrates the Server Side RPC on ThingsBoard IoT Platform
# Paste your ThingsBoard IoT Platform IP and Device access token
# Temperature_Controller_Server_Side_RPC.py : This program illustrates Server side RPC using a Simulated Temperature Controller
import os
import time
import sys
import json
import random
import paho.mqtt.client as mqtt
from threading import Thread
# Thingsboard platform credentials
THINGSBOARD_HOST = '192.168.137.131' #Change IP Address
ACCESS_TOKEN = 'FOvUIh5e7sHD4BWKceb2'
sensor_data = {'temperature': 25}
def publishValue(client):
INTERVAL = 2
print("Thread Started")
next_reading = time.time()
while True:
client.publish('v1/devices/me/telemetry', json.dumps(sensor_data),1)
next_reading += INTERVAL
sleep_time = next_reading - time.time()
if sleep_time > 0:
time.sleep(sleep_time)
def read_temperature():
temp = sensor_data['temperature']
return temp
# Function will set the temperature value in device
def setValue (params):
sensor_data['temperature'] = params
#print("Rx setValue is : ",sensor_data)
print("Temperature Set : ",params,"C")
# MQTT on_connect callback function
def on_connect(client, userdata, flags, rc):
print("連接配接成功")
#print("rc code:", rc)
client.subscribe('v1/devices/me/rpc/request/+')
# MQTT on_message callback function
def on_message(client, userdata, msg):
print('Topic: ' + msg.topic + '\nMessage: ' + str(msg.payload))
if msg.topic.startswith('v1/devices/me/rpc/request/'):
requestId = msg.topic[len('v1/devices/me/rpc/request/'):len(msg.topic)]
#print("requestId : ", requestId)
data = json.loads(msg.payload)
if data['method'] == 'getValue':
#print("getvalue request\n")
#print("sent getValue : ", sensor_data)
client.publish('v1/devices/me/rpc/response/' + requestId, json.dumps(sensor_data['temperature']), 1)
if data['method'] == 'setValue':
#print("setvalue request\n")
params = data['params']
setValue(params)
# create a client instance
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(ACCESS_TOKEN)
client.connect(THINGSBOARD_HOST,1883,60)
t = Thread(target=publishValue, args=(client,))
try:
client.loop_start()
t.start()
while True:
pass
except KeyboardInterrupt:
client.disconnect()