天天看點

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

Step By Step

服務使用大圖

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
  • 1、不同裝置通過SDK和平台側建立連接配接,實作裝置與平台側的互動通信;
  • 2、通過規則流轉功能,将裝置上報的消息流轉到MQ Topic,也可以通過MQ Topic向MQTT Topic下發消息;
  • 3、基于Server端管控API,實作消息的直接下發、裝置線上狀态查詢以及Group的建立。

一、MQTT Server建立(公網區域)

1、建立執行個體

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

2、Topic和Group建立

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

二、裝置端Java Code Sample

1、pom.xml

<dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
        </dependencies>           

2、Device Code Sample

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {

    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 執行個體 ID,購買後控制台擷取
         */
        String instanceId = "post-cn-n6w*********";
        /**
         * 接入點位址,購買 MQ4IOT 執行個體,且配置完成後即可擷取,接入點位址必須填寫配置設定的域名,不得使用 IP 位址直接連接配接,否則可能會導緻用戶端異常。
         */
        String endPoint = "post-cn-n6w********.mqtt.aliyuncs.com";
        /**
         * 賬号 accesskey,從賬号系統控制台擷取
         */
        String accessKey = "LTAIOZZg********";
        /**
         * 賬号 secretKey,從賬号系統控制台擷取,僅在Signature鑒權模式下需要設定
         */
        String secretKey = "v7CjUJCMk7j9aK****************";
        /**
         * MQ4IOT clientId,由業務系統配置設定,需要保證每個 tcp 連接配接都不一樣,保證全局唯一,如果不同的用戶端對象(tcp 連接配接)使用了相同的 clientId 會導緻連接配接異常斷開。
         * clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申請,DeviceId 由業務方自己設定,clientId 總長度不得超過64個字元。
         */
        String clientId = "GID_MQTT_Client1@@@device1";
        /**
         * MQ4IOT 消息的一級 topic,需要在控制台申請才能使用。
         * 如果使用了沒有申請或者沒有被授權的 topic 會導緻鑒權失敗,服務端會斷開用戶端連接配接。
         */
        final String parentTopic = "MQTT_Topic";
        /**
         * MQ4IOT支援子級 topic,用來做自定義的過濾,此處為示意,可以填寫任何字元串,具體參考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         * 需要注意的是,完整的 topic 長度不得超過128個字元。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS參數代表傳輸品質,可選0,1,2,根據實際需求合理設定,具體參考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 用戶端使用的協定和端口必須比對,具體參考文檔 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密則設定ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 用戶端設定好發送逾時時間,防止無限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 用戶端連接配接成功後就需要盡快訂閱需要的 topic
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消費消息的回調接口,需要確定該接口不抛異常,該接口運作傳回即代表消息消費成功。
                 * 消費消息需要保證在規定時間内完成,如果消費耗時超過服務端約定的逾時時間,對于可靠傳輸的模式,服務端可能會重試推送,業務需要做好幂等去重處理。逾時時間約定參考限制
                 * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
                 */
                System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 1; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  發送普通消息時,topic 必須和接收方訂閱的 topic 一緻,或者符合通配符比對規則
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支援點對點消息,即如果發送方明确知道該消息隻需要給特定的一個裝置接收,且知道對端的 clientId,則可以直接發送點對點消息。
             * 點對點消息不需要經過訂閱關系比對,可以簡化訂閱方的邏輯。點對點消息的 topic 格式規範是  {{parentTopic}}/p2p/{{targetClientId}}
             */
            final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}           

工具類:

util

3、測試效果

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

4、消息流轉軌迹查詢

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

三、規則流轉測試

1、MQ建立三個不同類型的Topic

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

2、建立Group,用于MQ側消費消息

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

3、MQTT側配置流轉規則

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

4、MQ側代碼測試

4.1 pom.xml
<dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.7.1.Final</version>
        </dependency>           
4.2 Code Sample
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制台建立的 Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "GID_MessageConsumer");
        // AccessKey ID 阿裡雲身份驗證,在阿裡雲 RAM 控制台建立。
        properties.put(PropertyKeyConst.AccessKey, "LTAIOZZg********");
        // Accesskey Secret 阿裡雲身份驗證,在阿裡雲服 RAM 控制台建立。
        properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
        // 設定 TCP 接入域名,進入控制台的執行個體詳情頁面的 TCP 協定用戶端接入點區域檢視。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_***************_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
        // 叢集訂閱方式 (預設)。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 廣播訂閱方式。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        //1、訂閱裝置上行消息
        consumer.subscribe("MessageFromMQTT", "*", new MessageListener() { //訂閱多個 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        //2、訂閱裝置上下線消息
        consumer.subscribe("DevcieOnlineAndOffline", "*", new MessageListener() { //訂閱全部 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}
           
4.3 測試效果(先啟動消費端,然後裝置端上行消息)
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
4.4 通過MQ發送消息到MQTT
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;

public class SendMQMessageToMQTT {

    public static void main(String[] args) {

            Properties properties = new Properties();
            // AccessKeyId 阿裡雲身份驗證,在阿裡雲使用者資訊管理控制台擷取。
            properties.put(PropertyKeyConst.AccessKey,"LTAIOZZg**********");
            // AccessKeySecret 阿裡雲身份驗證,在阿裡雲使用者資訊管理控制台擷取。
            properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
            //設定發送逾時時間,機關毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 設定 TCP 接入域名,進入控制台的執行個體詳情頁面的 TCP 協定用戶端接入點區域檢視。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_********_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
            Producer producer = ONSFactory.createProducer(properties);

            // mqttSecondTopic:https://help.aliyun.com/document_detail/112971.html?spm=a2c4g.11186623.6.579.403242ca4pOcpC
            properties.put("mqttSecondTopic","testMq4Iot");

            // 在發送消息前,必須調用 start 方法來啟動 Producer,隻需調用一次即可。
            producer.start();

            //循環發送消息。
            for (int i = 0; i < 1; i++){
                Message msg = new Message("MessageToMQTT","","MQ Message To MQTT".getBytes());
                msg.setKey("ORDERID_" + i);
                msg.setUserProperties(properties);

                try {
                    SendResult sendResult = producer.send(msg);
                    // 同步發送消息,隻要不抛異常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                }
                catch (Exception e) {
                    // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條資料進行補償處理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }

            // 在應用退出前,銷毀 Producer 對象。
            // 注意:如果不銷毀也沒有問題。
            producer.shutdown();
        }

    }           
4.5 The Result
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
4.6 消息軌迹查詢
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

四、MQTT雲端API測試

<dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.6</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.4</version>
        </dependency>           

2、發送消息Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class SendMessage {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        SendMessageRequest request = new SendMessageRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setPayload("message from manager api!");
        request.setMqttTopic("MQTT_Topic/testMq4Iot");

        try {
            SendMessageResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }
    }
}           
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo
阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

4、查詢裝置狀态Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class QuerySessionByClientId {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        QuerySessionByClientIdRequest request = new QuerySessionByClientIdRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setClientId("GID_MQTT_Client1@@@device1");

        try {
            QuerySessionByClientIdResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}           

5、測試效果

阿裡雲微服務消息隊列(MQTT For IoT)使用Demo

更多參考

QuerySessionByClientId 快速使用 MQTT 的 Java SDK 收發消息(跨産品資料流入) MQ發送普通消息 MQ訂閱消息 阿裡雲微服務消息隊列Token C# 裝置端示例Demo 阿裡雲微服務消息隊列Token Java Code Sample 阿裡雲微服務消息隊列Token C# Code Sample