自建MQTT遷移IoT物聯網平台實戰
前言
随着業務增長,自建MQTT叢集維護成本越來越高,連接配接穩定性,消息時延不斷遇到上層業務挑戰,遷移阿裡雲IoT物聯網平台是非常明智的最佳選擇。
現有業務背景
企業基于自建MQTT叢集的資料互動如下圖:

裝置端publish業務資料到MQTT叢集,流轉到後端業務伺服器處理;業務伺服器把指令釋出到MQTT叢集,觸達到裝置端。具體業務資料定義如下:
業務場景 | 主題Topic | 封包Payload |
---|---|---|
裝置上報 | cdb/data/post | DE02,10,17,011101010,am024,1d478f |
服務端控制指令 | cdb/cmd/push | CMD,82923,ad322 |
架構方案
當我們從自己MQTT叢集遷移到IoT物聯網平台,我們可以采用以下技術方案,實作低成本的快速遷移,減少裝置端和業務系統的改造。
IoT裝置遷移上雲三步走:
- 裝置做固件更新,修改接入域名為IoT平台Endpoint
- 調整現有通信topic為IoT物聯網平台産品自定義Topic
- 配置規則引擎,把裝置資料流轉到服務端訂閱AMQP消費組,業務伺服器實時接收到裝置資料
裝置遷移上雲實戰
建立産品
首先,我們接入物聯網平台控制台,左側欄選擇産品,建立新産品:充電寶機櫃。如下圖:
- 所屬品類:自定義品類
- 節點類型:直連裝置
- 連網方式:WiFi (這裡可以根據實際情況選擇)
- 資料格式:ICA 标準資料格式 JSON(這裡也可以選 透傳/自定義)
然後,把現有系統業務Topic映射到自定義通信的Topic類,如下表:
自建原有 | IoT平台産品自定義Topic | 權限 | 描述 |
---|---|---|---|
/${productKey}/${deviceName}/user/data/up | 釋出 | 裝置上報資料到雲端 | |
/${productKey}/${deviceName}/user/cmd/down | 訂閱 | 雲端下發控制指令 |
最後,我們接入産品詳情,選擇Topic類清單,點選自定義Topic,添加Topic如下圖:
至此,我們完成了在IoT物聯網平台的産品建立和通信定義。
注冊裝置
我們基于産品,建立一個具體裝置,擷取到裝置身份認證資訊(三元組),如下圖:
資料流轉配置
當我們建立完成産品,注冊好裝置之後,就需要在IoT平台配置資料流轉方案了。
服務端訂閱AMQP消費組
首先,我們要在控制台的服務端訂閱,建立我們充電寶業務的AMQP消費組,用來消費裝置生成的資料。如下圖:
然後,我們進入消費組詳情,檢視消費組狀态。由于我們裝置端和服務端都沒有連接配接到平台,資料都是為空。如下圖:
規則引擎
接下來我們要做的是配置規則引擎處理資料,并流轉到已建立的消費組上。
我們選擇雲産品流轉,點選建立規則,選擇二進制格式(二進制泛指非JSON結構資料)。如下圖:
然後,我們編寫SQL,讓IoT物聯網平台處理裝置資料,并攜帶裝置資訊。如下圖:
SELECT
deviceName() as deviceName,
timestamp() as timestamp ,
payload() as payload
FROM "/a********E/+/user/data/up"
接下來,我們添加資料轉發操作到指定的AMQP消費組,如下圖:
完整的規則引擎配置如下圖所示:
最後,我們在規則引擎清單,啟動規則。如下圖:
裝置開發
在完成了雲上控制台的配置工作後,我們要做的就是裝置端業務開發。這裡我們在Mac上用nodejs腳本模拟裝置業務行為,裝置MQTT連接配接,資料上報,指令接收。
完整代碼如下:
// 引入依賴mqtt庫,或自己實作
const mqtt = require('aliyun-iot-mqtt');
// 裝置身份
var options = {
productKey: "裝置pk",
deviceName: "裝置dn",
deviceSecret: "裝置ds",
regionId: "cn-shanghai"
};
// 1.建立連接配接
const client = mqtt.getAliyunIotMqttClient(options);
// 2.裝置接收雲端指令資料
client.on('message', function(topic, message) {
console.log("topic " + topic)
console.log("message " + message)
})
// 3. 裝置訂閱指令的Topic
client.subscribe(`/${options.productKey}/${options.deviceName}/user/cmd/down`)
// 4. 模拟裝置 上報資料(原始封包)
setInterval(function() {
client.publish(`/${options.productKey}/${options.deviceName}/user/data/up`, getPostData(),{qos:1});
}, 1000);
// 模拟 裝置原有封包格式
function getPostData() {
let payload = "DE02,"+Math.floor((Math.random() * 20) + 10)
+","+Math.floor((Math.random() * 20) + 10)
+",011101010,am024,1d478f";
console.log("payload=[ " + payload+" ]")
return JSON.stringify(payload);
}
至此,我們完成了裝置端業務開發。
服務端開發
服務端我們以Java為例,示範裝置資料接收和控制指令的下發。
業務伺服器接收裝置資料
參考服務端訂閱AMQP文檔
https://help.aliyun.com/document_detail/143601.htmlpackage com.aliyun.iot;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
public class AMQPClient {
private final static Logger logger = LoggerFactory.getLogger(AMQPClient.class);
public static void main(String[] args) throws Exception {
//參數說明,請參見上一篇文檔:AMQP用戶端接入說明。
String accessKey = "阿裡雲賬号ak";
String accessSecret = "阿裡雲賬号as";
String consumerGroupId = "服務端訂閱消費組ID";
long timeStamp = System.currentTimeMillis();
//簽名方法:支援hmacmd5,hmacsha1和hmacsha256
String signMethod = "hmacsha1";
//控制台服務端訂閱中消費組狀态頁用戶端ID一欄将顯示clientId參數。
//建議使用機器UUID、MAC位址、IP等唯一辨別等作為clientId。便于您區分識别不同的用戶端。
String clientId = "ecs_"+System.currentTimeMillis();
//UserName組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
String userName = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//password組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//按照qpid-jms的規範,組裝連接配接URL。
String connectionUrl = "failover:(amqps://替換你的阿裡雲賬号UID.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息後,需要手動調用message.acknowledge()
// Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
logger.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
System.out.println();
//如果建立Session選擇的是Session.CLIENT_ACKNOWLEDGE,這裡需要手動ACK。
//message.acknowledge();
//如果要對收到的消息做耗時的處理,請異步處理,確定這裡不要有耗時邏輯。
} catch (Exception e) {
e.printStackTrace();
}
}
};
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 連接配接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 嘗試過最大重試次數之後,最終連接配接失敗。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 連接配接中斷。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 連接配接中斷後又自動重連上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};
/**
* password簽名計算方法,請參見上一篇文檔:AMQP用戶端接入說明。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
}
業務伺服器下發控制指令
參考雲端API文檔
https://help.aliyun.com/document_detail/69793.htmlpackage com.aliyun.iot;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.iot.model.v20170420.PubRequest;
import com.aliyuncs.iot.model.v20170420.PubResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
public class PubClient {
public static void main(String[] args) {
DefaultProfile profile = DefaultProfile.getProfile(
"cn-shanghai",
"替換你的ak",
"替換你的as");
IAcsClient client = new DefaultAcsClient(profile);
PubRequest request = new PubRequest();
request.setTopicFullName("/替換裝置pk/替換裝置dn/user/cmd/down");
request.setMessageContent("Q01ELDgyOTIzLGFkMzIyCiA=");//原始封包 : CMD,82923,ad322
request.setProductKey("替換裝置pk");
request.setQos(1);
try {
PubResponse response = client.getAcsResponse(request);
System.out.println(new Gson().toJson(response));
} catch (ClientException e) {
System.out.println("ErrCode:" + e.getErrCode());
System.out.println("ErrMsg:" + e.getErrMsg());
System.out.println("RequestId:" + e.getRequestId());
}
}
}
至此,我們完成了服務端業務開發。
整體運作
在完成以上業務開發之後,我們先啟動服務端程式和IoT物聯網平台建立連接配接,然後啟動裝置端模拟腳本。
裝置上報資料
裝置端日志:
在裝置端日志,我們看到列印出來的業務封包如下:
服務端日志:
在裝置列印出來日志同時,我們能看到服務端實時在列印從IoT物聯網平台擷取到的消息資料,如下圖:
此時,我們在物聯網平台控制台,消費組詳情裡,也能看到消息處理速率,堆積量,最後一條消息處理時間,以及服務端ecs的清單。如下圖:
日志服務
在阿裡雲IoT的控制台日志服務裡,我們可以檢視裝置行為日志:
在日志服務的上行消息分析裡,可以檢視消息詳情,包括Topic和Payload。
也可以跟蹤上行消息的流轉過程,如下圖:
服務端下發控制指令
我們在服務端執行Pub接口調用,像裝置下發控制指令。如下圖:
此時,裝置端會實時列印出來指令資訊,如下圖:
同樣,我們可以在控制台的日志服務,下行消息分析中,檢視到指令消息流轉完整過程,如下圖:
至此,我們在盡量少改動的前提下,完成了存量裝置上雲的遷移工作。