天天看點

阿裡雲物聯網平台之獨享執行個體下的amqp用戶端連接配接

1、首先找到控制台獨享執行個體
阿裡雲物聯網平台之獨享執行個體下的amqp用戶端連接配接
2、找到接入域名
阿裡雲物聯網平台之獨享執行個體下的amqp用戶端連接配接
3、Java Sample

import java.net.URI;

import java.util.Hashtable;

import javax.crypto.Mac;

import javax.crypto.spec.SecretKeySpec;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.naming.Context;

import javax.naming.InitialContext;

import org.apache.commons.codec.binary.Base64;

import org.apache.qpid.jms.JmsConnection;

import org.apache.qpid.jms.JmsConnectionListener;

import org.apache.qpid.jms.message.JmsInboundMessageDispatch;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class AmqpJavaClientDemo {

private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

public static void main(String[] args) throws Exception {
    //參數說明,請參見上一篇文檔:AMQP用戶端接入說明。
    String accessKey = "";
    String accessSecret = "";
    String consumerGroupId ="";
    String iotInstanceId = "";
    long timeStamp = System.currentTimeMillis();
    //簽名方法:支援hmacmd5,hmacsha1和hmacsha256
    String signMethod = "hmacsha1";
    //控制台服務端訂閱中消費組狀态頁用戶端ID一欄将顯示clientId參數。
    //建議使用機器UUID、MAC位址、IP等唯一辨別等作為clientId。便于您區分識别不同的用戶端。
    String clientId = "112233";

    //UserName組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
    String userName = clientId + "|authMode=aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",iotInstanceId=" + iotInstanceId
            + ",consumerGroupId=" + consumerGroupId
            + "|";
    //password組裝方法,請參見上一篇文檔:AMQP用戶端接入說明。
    String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
    String password = doSign(signContent,accessSecret, signMethod);
    //按照qpid-jms的規範,組裝連接配接URL。
    String connectionUrl = "failover:(amqps://iot-cn-******.amqp.iothub.aliyuncs.com:5671?amqp.idleTimeout=80000)"
            + "?failover.reconnectDelay=30";

    Hashtable<String, String> hashtable = new Hashtable<String, String>();
    hashtable.put("connectionfactory.SBCF",connectionUrl);
    hashtable.put("queue.QUEUE", "default");
    hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    Context context = new InitialContext(hashtable);
    ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
    Destination queue = (Destination)context.lookup("QUEUE");
    // Create Connection
    Connection connection = cf.createConnection(userName, password);
    ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
    // Create Session
    // Session.CLIENT_ACKNOWLEDGE: 收到消息後,需要手動調用message.acknowledge()
    // Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    connection.start();
    // Create Receiver Link
    MessageConsumer consumer = session.createConsumer(queue);
    consumer.setMessageListener(messageListener);
}

private static MessageListener messageListener = new MessageListener() {
    @Override
    public void onMessage(Message message) {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            logger.info("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
            //如果建立Session選擇的是Session.CLIENT_ACKNOWLEDGE,這裡需要手動ACK。
            //message.acknowledge();
            //如果要對收到的消息做耗時的處理,請異步處理,確定這裡不要有耗時邏輯。
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
};

private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
    /**
     * 連接配接成功建立。
     */
    @Override
    public void onConnectionEstablished(URI remoteURI) {
        logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
    }

    /**
     * 嘗試過最大重試次數之後,最終連接配接失敗。
     */
    @Override
    public void onConnectionFailure(Throwable error) {
        logger.error("onConnectionFailure, {}", error.getMessage());
    }

    /**
     * 連接配接中斷。
     */
    @Override
    public void onConnectionInterrupted(URI remoteURI) {
        logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
    }

    /**
     * 連接配接中斷後又自動重連上。
     */
    @Override
    public void onConnectionRestored(URI remoteURI) {
        logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
    }

    @Override
    public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

    @Override
    public void onSessionClosed(Session session, Throwable cause) {}

    @Override
    public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

    @Override
    public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};

/**
 * password簽名計算方法,請參見上一篇文檔:AMQP用戶端接入說明。
 */
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
    SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
    Mac mac = Mac.getInstance(signMethod);
    mac.init(signingKey);
    byte[] rawHmac = mac.doFinal(toSignString.getBytes());
    return Base64.encodeBase64String(rawHmac);
}           

}

4、确認用戶端線上
阿裡雲物聯網平台之獨享執行個體下的amqp用戶端連接配接