Step By Step
服務使用大圖

- 1、不同裝置通過SDK和平台側建立連接配接,實作裝置與平台側的互動通信;
- 2、通過規則流轉功能,将裝置上報的消息流轉到MQ Topic,也可以通過MQ Topic向MQTT Topic下發消息;
- 3、基于Server端管控API,實作消息的直接下發、裝置線上狀态查詢以及Group的建立。
一、MQTT Server建立(公網區域)
1、建立執行個體
2、Topic和Group建立
二、裝置端Java Code Sample
1、pom.xml
<dependencies>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.48</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
2、Device Code Sample
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
/**
* MQ4IOT 執行個體 ID,購買後控制台擷取
*/
String instanceId = "post-cn-n6w*********";
/**
* 接入點位址,購買 MQ4IOT 執行個體,且配置完成後即可擷取,接入點位址必須填寫配置設定的域名,不得使用 IP 位址直接連接配接,否則可能會導緻用戶端異常。
*/
String endPoint = "post-cn-n6w********.mqtt.aliyuncs.com";
/**
* 賬号 accesskey,從賬号系統控制台擷取
*/
String accessKey = "LTAIOZZg********";
/**
* 賬号 secretKey,從賬号系統控制台擷取,僅在Signature鑒權模式下需要設定
*/
String secretKey = "v7CjUJCMk7j9aK****************";
/**
* MQ4IOT clientId,由業務系統配置設定,需要保證每個 tcp 連接配接都不一樣,保證全局唯一,如果不同的用戶端對象(tcp 連接配接)使用了相同的 clientId 會導緻連接配接異常斷開。
* clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申請,DeviceId 由業務方自己設定,clientId 總長度不得超過64個字元。
*/
String clientId = "GID_MQTT_Client1@@@device1";
/**
* MQ4IOT 消息的一級 topic,需要在控制台申請才能使用。
* 如果使用了沒有申請或者沒有被授權的 topic 會導緻鑒權失敗,服務端會斷開用戶端連接配接。
*/
final String parentTopic = "MQTT_Topic";
/**
* MQ4IOT支援子級 topic,用來做自定義的過濾,此處為示意,可以填寫任何字元串,具體參考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
* 需要注意的是,完整的 topic 長度不得超過128個字元。
*/
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
/**
* QoS參數代表傳輸品質,可選0,1,2,根據實際需求合理設定,具體參考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 用戶端使用的協定和端口必須比對,具體參考文檔 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
* 如果是 SSL 加密則設定ssl://endpoint:8883
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 用戶端設定好發送逾時時間,防止無限阻塞
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 用戶端連接配接成功後就需要盡快訂閱需要的 topic
*/
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消費消息的回調接口,需要確定該接口不抛異常,該接口運作傳回即代表消息消費成功。
* 消費消息需要保證在規定時間内完成,如果消費耗時超過服務端約定的逾時時間,對于可靠傳輸的模式,服務端可能會重試推送,業務需要做好幂等去重處理。逾時時間約定參考限制
* https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 1; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
/**
* 發送普通消息時,topic 必須和接收方訂閱的 topic 一緻,或者符合通配符比對規則
*/
mqttClient.publish(mq4IotTopic, message);
/**
* MQ4IoT支援點對點消息,即如果發送方明确知道該消息隻需要給特定的一個裝置接收,且知道對端的 clientId,則可以直接發送點對點消息。
* 點對點消息不需要經過訂閱關系比對,可以簡化訂閱方的邏輯。點對點消息的 topic 格式規範是 {{parentTopic}}/p2p/{{targetClientId}}
*/
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}
工具類:
util3、測試效果
4、消息流轉軌迹查詢
三、規則流轉測試
1、MQ建立三個不同類型的Topic
2、建立Group,用于MQ側消費消息
3、MQTT側配置流轉規則
4、MQ側代碼測試
4.1 pom.xml
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.1.Final</version>
</dependency>
4.2 Code Sample
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台建立的 Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "GID_MessageConsumer");
// AccessKey ID 阿裡雲身份驗證,在阿裡雲 RAM 控制台建立。
properties.put(PropertyKeyConst.AccessKey, "LTAIOZZg********");
// Accesskey Secret 阿裡雲身份驗證,在阿裡雲服 RAM 控制台建立。
properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
// 設定 TCP 接入域名,進入控制台的執行個體詳情頁面的 TCP 協定用戶端接入點區域檢視。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_***************_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
// 叢集訂閱方式 (預設)。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 廣播訂閱方式。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
//1、訂閱裝置上行消息
consumer.subscribe("MessageFromMQTT", "*", new MessageListener() { //訂閱多個 Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
//2、訂閱裝置上下線消息
consumer.subscribe("DevcieOnlineAndOffline", "*", new MessageListener() { //訂閱全部 Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
4.3 測試效果(先啟動消費端,然後裝置端上行消息)
4.4 通過MQ發送消息到MQTT
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;
public class SendMQMessageToMQTT {
public static void main(String[] args) {
Properties properties = new Properties();
// AccessKeyId 阿裡雲身份驗證,在阿裡雲使用者資訊管理控制台擷取。
properties.put(PropertyKeyConst.AccessKey,"LTAIOZZg**********");
// AccessKeySecret 阿裡雲身份驗證,在阿裡雲使用者資訊管理控制台擷取。
properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
//設定發送逾時時間,機關毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定 TCP 接入域名,進入控制台的執行個體詳情頁面的 TCP 協定用戶端接入點區域檢視。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_********_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
Producer producer = ONSFactory.createProducer(properties);
// mqttSecondTopic:https://help.aliyun.com/document_detail/112971.html?spm=a2c4g.11186623.6.579.403242ca4pOcpC
properties.put("mqttSecondTopic","testMq4Iot");
// 在發送消息前,必須調用 start 方法來啟動 Producer,隻需調用一次即可。
producer.start();
//循環發送消息。
for (int i = 0; i < 1; i++){
Message msg = new Message("MessageToMQTT","","MQ Message To MQTT".getBytes());
msg.setKey("ORDERID_" + i);
msg.setUserProperties(properties);
try {
SendResult sendResult = producer.send(msg);
// 同步發送消息,隻要不抛異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// 在應用退出前,銷毀 Producer 對象。
// 注意:如果不銷毀也沒有問題。
producer.shutdown();
}
}
4.5 The Result
4.6 消息軌迹查詢
四、MQTT雲端API測試
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.4</version>
</dependency>
2、發送消息Code Sample
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;
public class SendMessage {
public static void main(String[] args) {
DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
IAcsClient client = new DefaultAcsClient(profile);
SendMessageRequest request = new SendMessageRequest();
request.setRegionId("mq-internet-access");
request.setInstanceId("post-cn-n6w********");
request.setPayload("message from manager api!");
request.setMqttTopic("MQTT_Topic/testMq4Iot");
try {
SendMessageResponse response = client.getAcsResponse(request);
System.out.println(new Gson().toJson(response));
} catch (ServerException e) {
e.printStackTrace();
} catch (ClientException e) {
System.out.println("ErrCode:" + e.getErrCode());
System.out.println("ErrMsg:" + e.getErrMsg());
System.out.println("RequestId:" + e.getRequestId());
}
}
}
4、查詢裝置狀态Code Sample
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;
public class QuerySessionByClientId {
public static void main(String[] args) {
DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
IAcsClient client = new DefaultAcsClient(profile);
QuerySessionByClientIdRequest request = new QuerySessionByClientIdRequest();
request.setRegionId("mq-internet-access");
request.setInstanceId("post-cn-n6w********");
request.setClientId("GID_MQTT_Client1@@@device1");
try {
QuerySessionByClientIdResponse response = client.getAcsResponse(request);
System.out.println(new Gson().toJson(response));
} catch (ServerException e) {
e.printStackTrace();
} catch (ClientException e) {
System.out.println("ErrCode:" + e.getErrCode());
System.out.println("ErrMsg:" + e.getErrMsg());
System.out.println("RequestId:" + e.getRequestId());
}
}
}
5、測試效果