概述
服務端可以直接訂閱産品下所有類型的消息:裝置上報消息、裝置狀态變化通知、網關發現子裝置上報、裝置生命周期變更、裝置拓撲關系變更。配置服務端訂閱後,物聯網平台會将産品下所有裝置的已訂閱類型的消息轉發至您的服務端。
AMQP(Advanced Message Queuing Protocol)即進階消息隊列協定。您配置AMQP服務端訂閱後,物聯網平台會将産品下所有已訂閱類型的消息,通過AMQP通道推送至您的服務端。
本文通過MQTT.fx模拟裝置接入,
MQTT.fx接入物聯網平台。通過Java SDK進行服務端訂閱示範,
SDK下載下傳位址消息流轉示意圖

AMQP服務端訂閱優勢
- 支援多消費組。同一個賬号,可以在開發環境下使消費組A訂閱産品A,同時在正式環境下使消費組B訂閱産品B。
- 友善排查問題。支援檢視用戶端狀态、檢視堆積和消費速率。
- 線性擴充。在消費者能力足夠,即用戶端機器足夠的情況下,可輕松線性擴充推送能力。
- 實時消息優先推送,消息堆積不會影響服務。裝置實時消息直接推送,推送限流或失敗時進入堆積隊列,堆積态消息采用降級模式,不會影響實時推送能力。即使消費者的用戶端當機,或因消費能力不足堆積了消息,消費端恢複後,裝置生成的消息也可以和堆積消息并行發送,使裝置優先恢複可用态。
操作流程
1.在物聯網平台的控制台上建立對應産品的AMQP服務端訂閱,并勾選需要推送的消息類型
2.填寫相關參數,并啟動消費端。
參數說明- 添加依賴如下:
<dependencies>
<!-- amqp 1.0 qpid client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.47.0</version>
</dependency>
<!-- util for base64-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.22</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.22</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
- 代碼示例如下:
import java.net.URI;
import java.util.Hashtable;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpJavaClientDemo {
private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
public static void main(String[] args) throws Exception {
//參數說明,請參見上一篇文檔:AMQP用戶端接入說明。
String accessKey = "******";
String accessSecret = "******";
String consumerGroupId = "******";
long timeStamp = System.currentTimeMillis();
//簽名方法:支援hmacmd5,hmacsha1和hmacsha256
String signMethod = "hmacsha1";
//控制台服務端訂閱中消費組狀态頁用戶端ID一欄将顯示clientId參數。
//建議使用機器UUID、MAC位址、IP等唯一辨別等作為clientId。便于您區分識别不同的用戶端。
String clientId = "******";
//UserName組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
String userName = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//password組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//按照qpid-jms的規範,組裝連接配接URL。
String connectionUrl = "failover:(amqps://${UID}.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
+ "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息後,需要手動調用message.acknowledge()
// Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
logger.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
System.out.println("topic:" + topic);
System.out.println("payload:" + content);
System.out.println(" ");
//如果建立Session選擇的是Session.CLIENT_ACKNOWLEDGE,這裡需要手動ACK。
message.acknowledge();
//如果要對收到的消息做耗時的處理,請異步處理,確定這裡不要有耗時邏輯。
} catch (Exception e) {
e.printStackTrace();
}
}
};
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 連接配接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 嘗試過最大重試次數之後,最終連接配接失敗。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 連接配接中斷。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 連接配接中斷後又自動重連上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};
/**
* password簽名計算方法,請參見上一篇文檔:AMQP用戶端接入說明。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
}
3.啟動消費端後,在控制台進行檢視
4.裝置上線并上報一條消息
5.服務端檢視訂閱的消息
說明
- 消息不保序。您可以使用時間戳來實作業務的相對有序。
- 消息可能重複發送。為了確定消息送達,在某些情況下,同一條消息可能重複發送,直到用戶端傳回ACK或消息過期。您可以根據消息ID做唯一性處理。
- 堆積清理。可在用戶端設定cleanSession,每次上線清空堆積消息。