Step By Step
1、建立執行個體,登陸阿裡雲
控制台
2、執行個體下面分别建立Topic和Http
Group
3、pom.xml
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<version>1.0.2</version>
</dependency>
4、Producer Code Sample
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class ProducerDemo {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 設定HTTP接入域名(此處以公共雲生産環境為例)
"http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
// AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
"LTAIOZZg********",
// SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
"v7CjUJCMk7j9aK****************"
);
// 所屬的 Topic
final String topic = "****";
// Topic所屬執行個體ID,預設執行個體為空
final String instanceId = "MQ_INST_********";
// 擷取Topic的生産者
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// 循環發送40條消息
for (int i = 0; i < 40; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// 普通消息
pubMsg = new TopicMessage(
// 消息内容
"hello common mq!".getBytes(),
// 消息标簽
"A"
);
// 設定屬性
pubMsg.getProperties().put("a", String.valueOf(i));
// 設定KEY
pubMsg.setMessageKey("MessageKey");
} else {
pubMsg = new TopicMessage(
// 消息内容
"hello delay mq!".getBytes(),
// 消息标簽
"B"
);
// 設定屬性
pubMsg.getProperties().put("b", String.valueOf(i));
// 定時消息, 定時時間為10s後
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// 同步發送消息,隻要不抛異常就是成功
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步發送消息,隻要不抛異常就是成功
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條資料進行補償處理
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}
5、Consumer Code Sample
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;
public class ConsumerDemo {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 設定HTTP接入域名(此處以公共雲生産環境為例)
"http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
// AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
"LTAIOZZg********",
// SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
"v7CjUJCMk7j9aK****************"
);
String topicName = "****";
String consumer = "GID_****"; //Http Consumer Group Name
String messageTag =""; // Tag,為空表示訂閱全部Tag
String instanceId = "MQ_INST_******";
MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);
while(true) {
try {
// 消費消息,輪訓時間設定為3秒,一次至多拉去三條消息
List<Message> listMessage = mqConsumer.consumeMessage(3, 3);
if (listMessage == null || listMessage.size() == 0) {
System.out.println("Message is not exist!");
} else {
List<String> receiptHandles = new ArrayList<String>();
for (Message message : listMessage
) {
System.out.println("MessageBody" + message.getMessageBodyString());
receiptHandles.add(message.getReceiptHandle());
}
// 回調删除
mqConsumer.ackMessage(receiptHandles);
}
}catch (Exception ex)
{
System.out.println("error:" + ex.getMessage());
mqClient.close();
}
}
}
}