天天看點

消息服務實時消費裝置狀态變化和資料

[0.準備工作

0.1 注冊阿裡雲賬号

使用個人淘寶賬号或手機号,開通阿裡雲賬号,并通過實名認證(可以用支付寶認證)

0.2 免費開通IoT物聯網套件

IoT套件産品官網 https://www.aliyun.com/product/iot

消息服務官網 https://mns.console.aliyun.com/#/list/cn-shanghai

0.3 軟體環境

Nodejs安裝 https://nodejs.org/en/download/

Java8 安裝

開發工具:Sublime Text3/ IntelliJ IDEA

https://www.sublimetext.com/3

1.阿裡雲IoT控制台配置服務端訂閱

阿裡雲IoT物聯網套件開通 https://www.aliyun.com/product/iot

1.1 建立産品(基礎版)

我們在阿裡雲IoT控制台,建立産品:空氣檢測,選擇基礎版。

1.2 在産品下添加裝置

我們在阿裡雲IoT控制台,裝置管理裡空氣檢測産品下添加一個具體裝置。

1.2 在産品下配置服務端訂閱

我們在阿裡雲IoT控制台,為空氣檢測産品開通服務端訂閱,勾選裝置上報消息和裝置狀态變化通知,開通後會有MNS的區域:cn-shanghai和隊列資訊:aliyun-iot-a1jnUEKYhw4,如下。

通過閱讀阿裡雲IoT文檔,我們了解到隊列中消息結構體如下:

複制代碼

{

"payload": "Base64 Encode的資料",

"messagetype": "status",

"messageid": 996000000000000001,

"topic": "具體的裝置Topic",

"timestamp": 1526450324

}

messageid:IoT套件生成的消息ID

messagetype:指的是消息類型:裝置狀态status和裝置上報消息upload

topic:表示該消息源自套件中的哪個topic,當messageType=status時,topic為null,當messageType=upload時,topic為具體的裝置Topic

payload:資料為Base64 Encode的資料。當messageType=status時,資料是裝置狀态資料;當messageType=upload時,data即為裝置釋出到Topic中的原始資料。

timestamp:隊列中消息生成時間戳,并非業務的時間戳

2.裝置端開發

我們采用nodejs腳本模拟裝置,與IoT雲端建立連接配接,上報資料。

2.1 擷取nodejs版IoT SDK

package.json中添加npm依賴"aliyun-iot-mqtt": "0.0.4"子產品

"name": "aliyun-iot",

"dependencies": {

"aliyun-iot-mqtt": "^0.0.4"

},

"author": "wongxming",

"license": "MIT"

下載下傳安裝SDK

$npm install

2.2 編寫裝置端應用程式代碼

我們需要在控制台擷取裝置身份三元組:productKey,deviceName,deviceSecret,以及産品分區regionId

/**

  • package.json 添加依賴:"aliyun-iot-mqtt": "0.0.4"

    node iot-mns.js

    /

    const mqtt = require('aliyun-iot-mqtt');

    //裝置三元組

    const options = {

    productKey: "RY8ExdyS6lU",

    deviceName: "officeThermometer",

    deviceSecret: "oauYYavdIV9QOt7d9WcrnIjXSNc2i26A",

    regionId: "cn-shanghai"

    };

    //裝置與雲 建立連接配接,裝置上線

    const client = mqtt.getAliyunIotMqttClient(options);

    //主題topic

    const topic =

    ${options.productKey}/${options.deviceName}/update

    ;

    var data = {

    temperature: Math.floor((Math.random()20)+10),

    humidity: Math.floor((Math.random()*80)+20),

    //指定topic釋出資料到雲端

    client.publish(topic, JSON.stringify(data));

    console.log("===postData topic=" + topic)

    console.log(data)

    //裝置下線

    //client.end()

2.3 啟動模拟裝置腳本

$node iot-mns.js

腳本執行後,我們在IoT雲端控制台産品-日志服務裡檢視裝置行為分析日志,根據時間順序,我們看到

首先在10:53:05時,裝置建立連接配接,上線;

然後在10:53:12時,裝置斷開連接配接,下線。

檢視裝置上行消息分析日志,根據時間順序,我們看到

首先裝置publish message,

然後流轉到RuleEngine規則引擎,

最終Transmit MNS的消息隊列aliyun-iot-a1jnUEKYhw4

我們切換到MNS控制台,選擇華東2區域,可以看到消息隊列aliyun-iot-a1jnUEKYhw4有3條活躍消息MNS控制台:https://mns.console.aliyun.com/#/list/cn-shanghai

3 消費隊列中裝置消息

3.1 消息服務MNS使用

我們以java開發為例,pom中添加依賴aliyun-sdk-mns,httpasyncclient,fastjson,如下:

<dependencies>

<dependency>

<groupId>com.aliyun.mns</groupId>

<artifactId>aliyun-sdk-mns</artifactId>

<version>1.1.8</version>

<classifier>jar-with-dependencies</classifier>

</dependency>

<groupId>org.apache.httpcomponents</groupId>

<artifactId>httpasyncclient</artifactId>

<version>4.0.1</version>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.42</version>

</dependencies>

我們通過mns的sdk,建立連接配接,輪詢擷取隊列消息。為了友善閱讀,我們對消息的payload資料做base64解碼,完整應用程式代碼如下:

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.aliyun.mns.client.CloudAccount;

import com.aliyun.mns.client.CloudQueue;

import com.aliyun.mns.client.MNSClient;

import com.aliyun.mns.model.Message;

import org.apache.commons.codec.binary.Base64;

public class ComsumerDemo {

public static String accessKeyId = "AK";

public static String accessKeySecret = "AK秘鑰";

public static String endpoint = "mns公網Endpoint";

public static void main(String[] args) {

CloudAccount account = new CloudAccount(accessKeyId,accessKeySecret,endpoint);

MNSClient client = account.getMNSClient();

//擷取消息隊列

CloudQueue queue = client.getQueueRef("aliyun-iot-a1jnUEKYhw4");

while (true) {

Message popMsg = queue.popMessage(10);

if (popMsg != null) {

System.out.println("message id: " + popMsg.getMessageId());

System.out.println("message body: \n" + decodeBase64(popMsg.getMessageBodyAsString()));

//删除消息

queue.deleteMessage(popMsg.getReceiptHandle());

public static String decodeBase64(String jsonString) {

try {

JSONObject json = JSON.parseObject(jsonString);

String payload = new String(Base64.decodeBase64(json.getString("payload")));

json.put("payload", JSON.parseObject(payload));

return json.toJSONString();

} catch (Exception e) {

e.printStackTrace();

return null;

3.2 運作程式

AA