天天看點

阿裡雲Rocket MQ Java Http SDK發送消費消息示例Demo

Step By Step

1、建立執行個體,登陸阿裡雲

控制台
阿裡雲Rocket MQ Java Http SDK發送消費消息示例Demo
阿裡雲Rocket MQ Java Http SDK發送消費消息示例Demo

2、執行個體下面分别建立Topic和Http

Group

阿裡雲Rocket MQ Java Http SDK發送消費消息示例Demo
阿裡雲Rocket MQ Java Http SDK發送消費消息示例Demo

3、pom.xml

<dependency>
            <groupId>com.aliyun.mq</groupId>
            <artifactId>mq-http-sdk</artifactId>
            <version>1.0.2</version>
        </dependency>           

4、Producer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;

public class ProducerDemo {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // 設定HTTP接入域名(此處以公共雲生産環境為例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
                "LTAIOZZg********",
                // SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
                "v7CjUJCMk7j9aK****************"
        );

        // 所屬的 Topic
        final String topic = "****";
        // Topic所屬執行個體ID,預設執行個體為空
        final String instanceId = "MQ_INST_********";

        // 擷取Topic的生産者
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // 循環發送40條消息
            for (int i = 0; i < 40; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // 普通消息
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello common mq!".getBytes(),
                            // 消息标簽
                            "A"
                    );
                    // 設定屬性
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 設定KEY
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello delay mq!".getBytes(),
                            // 消息标簽
                            "B"
                    );
                    // 設定屬性
                    pubMsg.getProperties().put("b", String.valueOf(i));
                    // 定時消息, 定時時間為10s後
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步發送消息,隻要不抛異常就是成功
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步發送消息,隻要不抛異常就是成功
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條資料進行補償處理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}           

5、Consumer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;

public class ConsumerDemo {

    public static void main(String[] args) {

        MQClient mqClient = new MQClient(
                // 設定HTTP接入域名(此處以公共雲生産環境為例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
                "LTAIOZZg********",
                // SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
                "v7CjUJCMk7j9aK****************"
        );

        String topicName = "****";
        String consumer = "GID_****"; //Http Consumer Group Name
        String messageTag =""; // Tag,為空表示訂閱全部Tag
        String instanceId = "MQ_INST_******";

        MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);

        while(true) {
            try {

                // 消費消息,輪訓時間設定為3秒,一次至多拉去三條消息
                List<Message> listMessage = mqConsumer.consumeMessage(3, 3);

                if (listMessage == null || listMessage.size() == 0) {
                    System.out.println("Message is not exist!");
                } else {
                    List<String> receiptHandles = new ArrayList<String>();
                    for (Message message : listMessage
                    ) {
                        System.out.println("MessageBody" + message.getMessageBodyString());
                        receiptHandles.add(message.getReceiptHandle());
                    }
                    // 回調删除
                    mqConsumer.ackMessage(receiptHandles);
                }
            }catch (Exception ex)
            {
                System.out.println("error:" + ex.getMessage());
                mqClient.close();
            }
        }
    }
}           

參考連結

RocketMQ HTTP 協定 SDK