github代碼下載下傳位址
1.RocketMQ有三種消息
1)普通消息
2)順序消息
3)事務消息
2.順序消息
概念:是MQ提供的一種嚴格按照順序進行釋出和消費的消息類型;是以可以看出,順序消息由兩部分組成,順序釋出和
順序消費。
3.在MQ中如何保證順序消費
(1) 消息發送是保證是順序的 (2) 消息被存儲時保證是和發送的順序一緻 (3)消息被消費時保證和存儲的順序一緻
發送消息保證是順序的意味着對于有順序的消息,使用者要保證使用同一個線程采用同步的方式發送;而存儲和發送的
順序一緻則要求在同一個線程發送過來的消息A和消息B,存儲時在空間上消息A一定在消息B之前;最後消費和存儲的順
序保持一緻則要求,在消息A和消息B到達Consumer之後必須按照先消費消息A在消費消息B的順序執行
對于兩個訂單的消息的原始資料:a1、b1、b2、a2、a3、b3(絕對時間下發生的順序):
1) 在發送時,a訂單的消息必須保證a1 ,a2 , a3的順序發送,b訂單的消息也一樣,但是a,b訂單之間的消息沒有順序關系;
這意味這a,b訂單的消息可以通過不同的線程發送出去
2) 在存儲時,需要分别保證a,b訂單各自消息的順序,但是a,b訂單之間的消息可以不保證
3)在消費時,隻要保證每一個分區隻有一個線程來去處理即可當然,如果a、b在一個分區中,在收到消息後也可以将他們拆分到不同線程中處理,不過要權衡一下收益
4.生産者代碼
1)生産者
package com.roger.order.producer;
import com.roger.order.entity.OrderMsgDTO;
import com.roger.utils.SnowflakeIdWorker;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.List;
public class OrderMqProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer defaultMQProducer =
new DefaultMQProducer("orderMQProducerGroup");
defaultMQProducer.setNamesrvAddr("172.20.10.60:9876");
defaultMQProducer.start();
defaultMQProducer.createTopic("OrderTopic", "OrderTopic", 3);
String[] tags = new String[]{"TagC", "TagP", "TagF"};
List<OrderMsgDTO> orderMsgList = new ArrayList<>();
int orderCount = 5;
for (int i = 0; i < orderCount; i++) {
long orderId = SnowflakeIdWorker.getInstance().nextId();
OrderMqProducer.builderOrderMsgList(orderMsgList, orderId);
}
for (int i = 0; i < orderMsgList.size(); i++) {
OrderMsgDTO orderMsgDTO = orderMsgList.get(i);
String body = orderMsgDTO.toString();
Message msg = new Message("OrderTopic",
tags[i % tags.length],
"OrderKey" + i,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
//List<MessageQueue> msgQueList 消息要發送的Topic下的所有分區
//Message message 消息對象
// Object args 額外的參數,使用者可以自己傳遞參數
// 比如為了把同一個訂單的消息發送到同一個分區中,
// 可以把訂單号作為一個參數傳遞過去然後mod分區個數,
// 就可以保證把同一個訂單的消息發送到同一個分區中去
@Override
public MessageQueue select(List<MessageQueue> msgQueList, Message message, Object args) {
long orderId = (long) args;
long index = orderId % msgQueList.size();
return msgQueList.get((int) index);
}
}, orderMsgDTO.getOrderId());
System.out.println(sendResult +
String.format("message [%s] send success.",
new String(msg.getBody())));
}
//defaultMQProducer.shutdown();
}
private static void builderOrderMsgList(List<OrderMsgDTO> orderMsgList, long orderId) {
orderMsgList.add(new OrderMsgDTO(orderId, "Create"));
orderMsgList.add(new OrderMsgDTO(orderId, "PayOff"));
orderMsgList.add(new OrderMsgDTO(orderId, "Finish"));
}
}
2.topic配置資訊
3 運作結果- 5個訂單的消息分到了三個隊列(queue=0,1,2)中去了
SendResult [sendStatus=SEND_OK,
msgId=AC140A051DC818B4AAC2938813480000,
offsetMsgId=AC140A3C00002A9F00000000000297A4,
messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0],
queueOffset=3
]
message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK,
msgId=AC140A051DC818B4AAC2938813510001,
offsetMsgId=AC140A3C00002A9F000000000002988D,
messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0],
queueOffset=4
]
message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK,
msgId=AC140A051DC818B4AAC2938813C90002,
offsetMsgId=AC140A3C00002A9F0000000000029976,
messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0],
queueOffset=5
]
message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813EA0003, offsetMsgId=AC140A3C00002A9F0000000000029A5F, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=3]message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813F00004, offsetMsgId=AC140A3C00002A9F0000000000029B48, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=4]message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814000005, offsetMsgId=AC140A3C00002A9F0000000000029C31, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=5]message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814A20006, offsetMsgId=AC140A3C00002A9F0000000000029D1A, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=12]message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815060007, offsetMsgId=AC140A3C00002A9F0000000000029E03, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=13]message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815110008, offsetMsgId=AC140A3C00002A9F0000000000029EEC, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=14]message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815150009, offsetMsgId=AC140A3C00002A9F0000000000029FD5, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881546000A, offsetMsgId=AC140A3C00002A9F000000000002A0BE, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881549000B, offsetMsgId=AC140A3C00002A9F000000000002A1A8, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938819F0000C, offsetMsgId=AC140A3C00002A9F000000000002A292, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881A30000D, offsetMsgId=AC140A3C00002A9F000000000002A37C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938821A0000E, offsetMsgId=AC140A3C00002A9F000000000002A466, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)] send success.
5.消費者代碼-兩個消費者代碼相同隻是細微的差别
1.代碼
package com.roger.order.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class OrderMqConsumer1 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer defaultMQPushConsumer =
new DefaultMQPushConsumer("orderMQPushConsumerGroup");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.setNamesrvAddr("172.20.10.60:9876");
defaultMQPushConsumer.subscribe("OrderTopic","*");
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
Random r = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgExtList, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.println("目前線程名:" + Thread.currentThread().getName() + ";Receive new message:");
for(MessageExt msgExt : msgExtList){
System.out.println(String.format("Consume message [%s],TagName [%s]",
new String(msgExt.getBody()),
msgExt.getTags()));
try {
//簡單業務處理邏輯
TimeUnit.SECONDS.sleep(r.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
defaultMQPushConsumer.start();
System.out.println("OrderMqConsumer1 Started...");
}
}
2.OrderMqConsumer1運作結果-
(a) 消費了queue=0 和queue=1的資料
(b) 每個訂單按照順序執行,并且是訂單号相同的消息同一個線程執行的
OrderMqConsumer1 Started...
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)],TagName [TagC]
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)],TagName [TagP]
目前線程名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)],TagName [TagC]
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)],TagName [TagF]
目前線程名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)],TagName [TagP]
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)],TagName [TagC]
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)],TagName [TagP]
目前線程名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)],TagName [TagF]
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)],TagName [TagF]
目前線程名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)],TagName [TagC]
目前線程名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)],TagName [TagP]
目前線程名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)],TagName [TagF]
3.OrderMqConsumer1運作結果-
OrderMqConsumer2 Started...
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)],TagName [TagC]
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)],TagName [TagP]
目前線程名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)],TagName [TagF]