天天看點

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

自建MQTT遷移IoT物聯網平台實戰

前言

随着業務增長,自建MQTT叢集維護成本越來越高,連接配接穩定性,消息時延不斷遇到上層業務挑戰,遷移阿裡雲IoT物聯網平台是非常明智的最佳選擇。

現有業務背景

企業基于自建MQTT叢集的資料互動如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

裝置端publish業務資料到MQTT叢集,流轉到後端業務伺服器處理;業務伺服器把指令釋出到MQTT叢集,觸達到裝置端。具體業務資料定義如下:

業務場景 主題Topic 封包Payload
裝置上報 cdb/data/post DE02,10,17,011101010,am024,1d478f
服務端控制指令 cdb/cmd/push CMD,82923,ad322

架構方案

當我們從自己MQTT叢集遷移到IoT物聯網平台,我們可以采用以下技術方案,實作低成本的快速遷移,減少裝置端和業務系統的改造。

IoT裝置遷移上雲三步走:

  • 裝置做固件更新,修改接入域名為IoT平台Endpoint
  • 調整現有通信topic為IoT物聯網平台産品自定義Topic
  • 配置規則引擎,把裝置資料流轉到服務端訂閱AMQP消費組,業務伺服器實時接收到裝置資料
自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

裝置遷移上雲實戰

建立産品

首先,我們接入物聯網平台控制台,左側欄選擇産品,建立新産品:充電寶機櫃。如下圖:

  • 所屬品類:自定義品類
  • 節點類型:直連裝置
  • 連網方式:WiFi (這裡可以根據實際情況選擇)
  • 資料格式:ICA 标準資料格式 JSON(這裡也可以選 透傳/自定義)
自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

然後,把現有系統業務Topic映射到自定義通信的Topic類,如下表:

自建原有 IoT平台産品自定義Topic 權限 描述
/${productKey}/${deviceName}/user/data/up 釋出 裝置上報資料到雲端
/${productKey}/${deviceName}/user/cmd/down 訂閱 雲端下發控制指令

最後,我們接入産品詳情,選擇Topic類清單,點選自定義Topic,添加Topic如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

至此,我們完成了在IoT物聯網平台的産品建立和通信定義。

注冊裝置

我們基于産品,建立一個具體裝置,擷取到裝置身份認證資訊(三元組),如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

資料流轉配置

當我們建立完成産品,注冊好裝置之後,就需要在IoT平台配置資料流轉方案了。

服務端訂閱AMQP消費組

首先,我們要在控制台的服務端訂閱,建立我們充電寶業務的AMQP消費組,用來消費裝置生成的資料。如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

然後,我們進入消費組詳情,檢視消費組狀态。由于我們裝置端和服務端都沒有連接配接到平台,資料都是為空。如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

規則引擎

接下來我們要做的是配置規則引擎處理資料,并流轉到已建立的消費組上。

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

我們選擇雲産品流轉,點選建立規則,選擇二進制格式(二進制泛指非JSON結構資料)。如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

然後,我們編寫SQL,讓IoT物聯網平台處理裝置資料,并攜帶裝置資訊。如下圖:

SELECT 
deviceName() as deviceName,
timestamp() as timestamp ,
payload() as payload 
FROM "/a********E/+/user/data/up"           
自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

接下來,我們添加資料轉發操作到指定的AMQP消費組,如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

完整的規則引擎配置如下圖所示:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

最後,我們在規則引擎清單,啟動規則。如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

裝置開發

在完成了雲上控制台的配置工作後,我們要做的就是裝置端業務開發。這裡我們在Mac上用nodejs腳本模拟裝置業務行為,裝置MQTT連接配接,資料上報,指令接收。

完整代碼如下:

// 引入依賴mqtt庫,或自己實作
const mqtt = require('aliyun-iot-mqtt');
// 裝置身份
var options = {
    productKey: "裝置pk",
    deviceName: "裝置dn",
    deviceSecret: "裝置ds",
    regionId: "cn-shanghai"
};

// 1.建立連接配接
const client = mqtt.getAliyunIotMqttClient(options);

// 2.裝置接收雲端指令資料
client.on('message', function(topic, message) {
    console.log("topic " + topic)
    console.log("message " + message)
})
// 3. 裝置訂閱指令的Topic
client.subscribe(`/${options.productKey}/${options.deviceName}/user/cmd/down`)

// 4. 模拟裝置 上報資料(原始封包)
setInterval(function() {
    client.publish(`/${options.productKey}/${options.deviceName}/user/data/up`, getPostData(),{qos:1});

}, 1000);


// 模拟 裝置原有封包格式
function getPostData() {
    let payload = "DE02,"+Math.floor((Math.random() * 20) + 10)
    +","+Math.floor((Math.random() * 20) + 10)
    +",011101010,am024,1d478f";

    console.log("payload=[ " + payload+" ]")
    return JSON.stringify(payload);
}
           

至此,我們完成了裝置端業務開發。

服務端開發

服務端我們以Java為例,示範裝置資料接收和控制指令的下發。

業務伺服器接收裝置資料

參考服務端訂閱AMQP文檔 

https://help.aliyun.com/document_detail/143601.html
package com.aliyun.iot;

import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;

public class AMQPClient {

    private final static Logger logger = LoggerFactory.getLogger(AMQPClient.class);

    public static void main(String[] args) throws Exception {
        //參數說明,請參見上一篇文檔:AMQP用戶端接入說明。
        String accessKey = "阿裡雲賬号ak";
        String accessSecret = "阿裡雲賬号as";
        String consumerGroupId = "服務端訂閱消費組ID";
        long timeStamp = System.currentTimeMillis();
        //簽名方法:支援hmacmd5,hmacsha1和hmacsha256
        String signMethod = "hmacsha1";
        //控制台服務端訂閱中消費組狀态頁用戶端ID一欄将顯示clientId參數。
        //建議使用機器UUID、MAC位址、IP等唯一辨別等作為clientId。便于您區分識别不同的用戶端。
        String clientId = "ecs_"+System.currentTimeMillis();

        //UserName組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
        String userName = clientId + "|authMode=aksign"
                + ",signMethod=" + signMethod
                + ",timestamp=" + timeStamp
                + ",authId=" + accessKey
                + ",consumerGroupId=" + consumerGroupId
                + "|";
        //password組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        //按照qpid-jms的規範,組裝連接配接URL。
        String connectionUrl = "failover:(amqps://替換你的阿裡雲賬号UID.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
                + "?failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // Create Connection
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // Create Session
        // Session.CLIENT_ACKNOWLEDGE: 收到消息後,需要手動調用message.acknowledge()
        // Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // Create Receiver Link
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    private static MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                byte[] body = message.getBody(byte[].class);
                String content = new String(body);
                String topic = message.getStringProperty("topic");
                String messageId = message.getStringProperty("messageId");
                logger.info("receive message"
                        + ", topic = " + topic
                        + ", messageId = " + messageId
                        + ", content = " + content);
                System.out.println();
                //如果建立Session選擇的是Session.CLIENT_ACKNOWLEDGE,這裡需要手動ACK。
                //message.acknowledge();
                //如果要對收到的消息做耗時的處理,請異步處理,確定這裡不要有耗時邏輯。
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * 連接配接成功建立。
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * 嘗試過最大重試次數之後,最終連接配接失敗。
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            logger.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * 連接配接中斷。
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * 連接配接中斷後又自動重連上。
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

    /**
     * password簽名計算方法,請參見上一篇文檔:AMQP用戶端接入說明。
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
    }
}
           

業務伺服器下發控制指令

參考雲端API文檔

https://help.aliyun.com/document_detail/69793.html
package com.aliyun.iot;

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.iot.model.v20170420.PubRequest;
import com.aliyuncs.iot.model.v20170420.PubResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;

public class PubClient {

    public static void main(String[] args) {

        DefaultProfile profile = DefaultProfile.getProfile(
                "cn-shanghai",
                "替換你的ak",
                "替換你的as");
        IAcsClient client = new DefaultAcsClient(profile);

        PubRequest request = new PubRequest();
        request.setTopicFullName("/替換裝置pk/替換裝置dn/user/cmd/down");
        request.setMessageContent("Q01ELDgyOTIzLGFkMzIyCiA=");//原始封包 : CMD,82923,ad322
        request.setProductKey("替換裝置pk");
        request.setQos(1);

        try {
            PubResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));

        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }
    }
}
           

至此,我們完成了服務端業務開發。

整體運作

在完成以上業務開發之後,我們先啟動服務端程式和IoT物聯網平台建立連接配接,然後啟動裝置端模拟腳本。

裝置上報資料

裝置端日志:

在裝置端日志,我們看到列印出來的業務封包如下:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

服務端日志:

在裝置列印出來日志同時,我們能看到服務端實時在列印從IoT物聯網平台擷取到的消息資料,如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

此時,我們在物聯網平台控制台,消費組詳情裡,也能看到消息處理速率,堆積量,最後一條消息處理時間,以及服務端ecs的清單。如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

日志服務

在阿裡雲IoT的控制台日志服務裡,我們可以檢視裝置行為日志:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

在日志服務的上行消息分析裡,可以檢視消息詳情,包括Topic和Payload。

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

也可以跟蹤上行消息的流轉過程,如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

服務端下發控制指令

我們在服務端執行Pub接口調用,像裝置下發控制指令。如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

此時,裝置端會實時列印出來指令資訊,如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

同樣,我們可以在控制台的日志服務,下行消息分析中,檢視到指令消息流轉完整過程,如下圖:

自建MQTT遷移阿裡雲物聯網平台指南自建MQTT遷移IoT物聯網平台實戰

至此,我們在盡量少改動的前提下,完成了存量裝置上雲的遷移工作。

官方活動

1、挑戰裝置接入量,超低折扣+百萬代金券等你拿

2、物聯網平台企業版(包年包月),執行個體獨享,資料隔離,萬台裝置接入低至9.5元/天

3、物聯網平台公共執行個體,小規格套餐,每年18元,開發調試/小型項目傳遞必備

4、物聯網平台公共執行個體,按用量購買,低至6折,過大年活動正在進行中

5、更多服務