天天看點

基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試

作者:俏巴

概述

MQTT協定是基于PUB/SUB的異步通信模式,不适用于服務端同步控制裝置端傳回結果的場景。物聯網平台基于MQTT協定制定了一套請求和響應的同步機制,無需改動MQTT協定即可實作同步通信。物聯網平台提供API給服務端,裝置端隻需要按照固定的格式回複PUB消息,服務端使用API,即可同步擷取裝置端的響應結果。RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客戶機/伺服器模式,使用者不需要了解底層技術協定,即可遠端請求服務。RRPC則可以實作由服務端請求裝置端并能夠使裝置端響應的功能。本文主要基于開源Java MQTT Client,分别針對系統Topic和自定義Topic,示範阿裡雲物聯網平台RRPC的實作。

RRPC原理

基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試
  • 1、物聯網平台收到來自使用者伺服器的RRPC調用,下發一條RRPC請求消息給裝置。消息體為使用者傳入的資料,Topic為物聯網平台定義的Topic,其中含有唯一的RRPC消息ID。
  • 2、裝置收到下行消息後,按照指定Topic格式(包含之前雲端下發的唯一的RRPC消息ID)回複一條RRPC響應消息給雲端,雲端提取出Topic中的消息ID,和之前的RRPC請求消息比對上,然後回複給使用者伺服器。
  • 3、如果調用時裝置不線上,雲端會給使用者伺服器傳回裝置離線的錯誤;如果裝置沒有在逾時時間内(8秒内)回複RRPC響應消息,雲端會給使用者伺服器傳回逾時錯誤。

更多原理介紹可以參考

阿裡雲官方文檔

實驗測試

準備工作

1、建立産品和裝置

參考:

阿裡雲物聯網平台Qucik Start

建立産品和裝置部分。

2、建立自定義Topic

基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試

3、開源SDK的使用

基于開源JAVA MQTT Client連接配接阿裡雲IoT

4、服務端RRpc API的調用:調用該接口向指定裝置發送請求消息,并同步傳回響應。

這裡我們使用Open API Explorer快速測試調用,RRPC測試

位址
基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試

系統Topic測試

參數:

RRPC調用的系統Topic格式如下:

  • RRPC請求消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/${messageId}
  • RRPC響應消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/response/${messageId}
  • RRPC訂閱Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/+

1、裝置端Code

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTRRPCForSysTopic {

    // 裝置三元組資訊
    public static String productKey = "******";
    public static String deviceName = "******";
    public static String deviceSecret = "********";
    public static String regionId = "cn-shanghai";

    // RRPC 系統Topic
    private static String subTopic = "/sys/" + productKey + "/" + deviceName+ "/rrpc/request/+";
    private static MqttClient mqttClient;

    public static void main(String [] args) {

        initAliyunIoTClient();

        // RRPC訂閱Topic
        try {
            mqttClient.subscribe(subTopic);
        } catch (MqttException e) {
            e.printStackTrace();
        }

        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("connectionLost:" + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("message: " + new String(message.getPayload()));

                // 根據RRPC請求消息Topic,建構RRPC響應消息Topic
                String responseTopic = topic.replace("request","response");
                MqttMessage message1 = new MqttMessage("resonse demo".getBytes("utf-8"));
                mqttClient.publish(responseTopic,message1);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
//                System.out.println("IMqttDeliveryToken: " + token);
            }
        });
    }

    /***
     * 初始化Client
     */
    private static void initAliyunIoTClient() {

        try {
            String clientId = "java" + System.currentTimeMillis();

            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);

            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }
}
           

2、服務端下發消息

基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試

3、裝置端訂閱情況

基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試

自定義Topic測試

參數

RRPC調用自定義Topic的格式如下:

  • RRPC請求消息Topic:/ext/rrpc/${messageId}/${topic}
  • RRPC響應消息Topic:/ext/rrpc/${messageId}/${topic}
  • RRPC訂閱Topic:/ext/rrpc/+/${topic}
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTRRPCForPersonalTopic {

    // 裝置三元組資訊
    public static String productKey = "******";
    public static String deviceName = "******";
    public static String deviceSecret = "********";
    public static String regionId = "cn-shanghai";

    // RRPC 自定義Topic
    private static String personalTopic = "/" + productKey + "/" + deviceName + "/user/rrpcdemo";
    private static String subTopic = "/ext/rrpc/+" + personalTopic;
    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();

        try {
            mqttClient.subscribe(subTopic);
        } catch (MqttException e) {
            e.printStackTrace();

            System.out.println(e.getMessage());
        }

        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("connectionLost");
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("topic: " + topic);
                System.out.println("message: " + new String(message.getPayload()));

                // 設定響應 Topic 及 message
                MqttMessage msg = new MqttMessage("demo".getBytes());
                mqttClient.publish(topic,msg);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
//                System.out.println("IMqttDeliveryToken: " + token);
            }
        });
    }

    /***
     * 初始化Client
     */
    private static void initAliyunIoTClient() {

        try {
            String clientId = "java" + System.currentTimeMillis();

            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);

            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            // 從雲端下發自定義格式Topic的RRPC調用指令到裝置端時,裝置端必須在進行MQTT CONNECT協定設定時,在clientId中增加ext=1參數。
            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + ",ext=1|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }
}
           

注意

從雲端下發自定義格式Topic的RRPC調用指令到裝置端時,裝置端必須在進行MQTT CONNECT協定設定時,在clientId中增加ext=1參數。

基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試
基于開源Java MQTT Client的阿裡雲物聯網平台RRPC功能測試

參考連結

調用系統Topic 調用自定義Topic