作者:俏巴
概述
MQTT協定是基于PUB/SUB的異步通信模式,不适用于服務端同步控制裝置端傳回結果的場景。物聯網平台基于MQTT協定制定了一套請求和響應的同步機制,無需改動MQTT協定即可實作同步通信。物聯網平台提供API給服務端,裝置端隻需要按照固定的格式回複PUB消息,服務端使用API,即可同步擷取裝置端的響應結果。RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客戶機/伺服器模式,使用者不需要了解底層技術協定,即可遠端請求服務。RRPC則可以實作由服務端請求裝置端并能夠使裝置端響應的功能。本文主要基于開源Java MQTT Client,分别針對系統Topic和自定義Topic,示範阿裡雲物聯網平台RRPC的實作。
RRPC原理

- 1、物聯網平台收到來自使用者伺服器的RRPC調用,下發一條RRPC請求消息給裝置。消息體為使用者傳入的資料,Topic為物聯網平台定義的Topic,其中含有唯一的RRPC消息ID。
- 2、裝置收到下行消息後,按照指定Topic格式(包含之前雲端下發的唯一的RRPC消息ID)回複一條RRPC響應消息給雲端,雲端提取出Topic中的消息ID,和之前的RRPC請求消息比對上,然後回複給使用者伺服器。
- 3、如果調用時裝置不線上,雲端會給使用者伺服器傳回裝置離線的錯誤;如果裝置沒有在逾時時間内(8秒内)回複RRPC響應消息,雲端會給使用者伺服器傳回逾時錯誤。
更多原理介紹可以參考
阿裡雲官方文檔。
實驗測試
準備工作
1、建立産品和裝置
參考:
阿裡雲物聯網平台Qucik Start建立産品和裝置部分。
2、建立自定義Topic
3、開源SDK的使用
基于開源JAVA MQTT Client連接配接阿裡雲IoT4、服務端RRpc API的調用:調用該接口向指定裝置發送請求消息,并同步傳回響應。
這裡我們使用Open API Explorer快速測試調用,RRPC測試
位址系統Topic測試
參數:
RRPC調用的系統Topic格式如下:
- RRPC請求消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/${messageId}
- RRPC響應消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/response/${messageId}
- RRPC訂閱Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/+
1、裝置端Code
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
public class IoTRRPCForSysTopic {
// 裝置三元組資訊
public static String productKey = "******";
public static String deviceName = "******";
public static String deviceSecret = "********";
public static String regionId = "cn-shanghai";
// RRPC 系統Topic
private static String subTopic = "/sys/" + productKey + "/" + deviceName+ "/rrpc/request/+";
private static MqttClient mqttClient;
public static void main(String [] args) {
initAliyunIoTClient();
// RRPC訂閱Topic
try {
mqttClient.subscribe(subTopic);
} catch (MqttException e) {
e.printStackTrace();
}
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("message: " + new String(message.getPayload()));
// 根據RRPC請求消息Topic,建構RRPC響應消息Topic
String responseTopic = topic.replace("request","response");
MqttMessage message1 = new MqttMessage("resonse demo".getBytes("utf-8"));
mqttClient.publish(responseTopic,message1);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("IMqttDeliveryToken: " + token);
}
});
}
/***
* 初始化Client
*/
private static void initAliyunIoTClient() {
try {
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
}
2、服務端下發消息
3、裝置端訂閱情況
自定義Topic測試
參數
RRPC調用自定義Topic的格式如下:
- RRPC請求消息Topic:/ext/rrpc/${messageId}/${topic}
- RRPC響應消息Topic:/ext/rrpc/${messageId}/${topic}
- RRPC訂閱Topic:/ext/rrpc/+/${topic}
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
public class IoTRRPCForPersonalTopic {
// 裝置三元組資訊
public static String productKey = "******";
public static String deviceName = "******";
public static String deviceSecret = "********";
public static String regionId = "cn-shanghai";
// RRPC 自定義Topic
private static String personalTopic = "/" + productKey + "/" + deviceName + "/user/rrpcdemo";
private static String subTopic = "/ext/rrpc/+" + personalTopic;
private static MqttClient mqttClient;
public static void main(String [] args){
initAliyunIoTClient();
try {
mqttClient.subscribe(subTopic);
} catch (MqttException e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("topic: " + topic);
System.out.println("message: " + new String(message.getPayload()));
// 設定響應 Topic 及 message
MqttMessage msg = new MqttMessage("demo".getBytes());
mqttClient.publish(topic,msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("IMqttDeliveryToken: " + token);
}
});
}
/***
* 初始化Client
*/
private static void initAliyunIoTClient() {
try {
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
// 從雲端下發自定義格式Topic的RRPC調用指令到裝置端時,裝置端必須在進行MQTT CONNECT協定設定時,在clientId中增加ext=1參數。
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + ",ext=1|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
}
注意
從雲端下發自定義格式Topic的RRPC調用指令到裝置端時,裝置端必須在進行MQTT CONNECT協定設定時,在clientId中增加ext=1參數。
參考連結
調用系統Topic 調用自定義Topic