RocketMQ 普通消息發送
普通消息同步發送
生産者向 RocketMQ 發送一條消息,RocketMQ 傳回生産者其發送結果,可用于判斷是否發送成功。
使用場景
對消息可靠程度要求比較高、需要有是否發送成功的應答的場景。比如:重要的消息通知、通信通知等。
代碼實作
以下是核心代碼片段,詳情可以檢視 GitHub 上的源碼: rocketmq-learning ,如果覺得對你有幫助,希望可以給我個小星星鼓勵鼓勵噢~
- 生産者定義
@Bean
public DefaultMQProducer defaultMQProducer() throws MQClientException {
// 建立消息生産者
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
// 設定生産者 NameServer 位址,用于尋找 Broker
defaultMQProducer.setNamesrvAddr(rocketMQProducerProperties.getNameServerAddr());
// 設定生産者組
defaultMQProducer.setProducerGroup((RocketMQConstant.PRODUCER_GROUP_PREFIX + "client"));
// 啟動生産者組
defaultMQProducer.start();
// 把建立的生産者放到一個集合,當程式結束時統一銷毀
mqProducers.add(defaultMQProducer);
return defaultMQProducer;
}
- 生産者發送消息
@ApiOperation("同步發送普通消息")
@GetMapping("/sync-ordinary")
public SendResult sendOrdinaryMessageSynchronously() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client"), "sync", "send ordinary message synchronously".getBytes(StandardCharsets.UTF_8));
SendResult sendResult = this.defaultMQProducer.send(message);
log.info("消息發送狀态:{}", sendResult);
return sendResult;
}
- 消費者定義
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer(MessageListenerConcurrently defaultListener) throws MQClientException {
// 建立消息消費者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
// 設定消費者 NameServer 位址,用于尋找 Broker
defaultMQPushConsumer.setNamesrvAddr(rocketMQConsumerProperties.getNameServerAddr());
// 設定消費者組
defaultMQPushConsumer.setConsumerGroup((RocketMQConstant.CONSUMER_GROUP_PREFIX + "client"));
// 設定消費者組訂閱的 Topic 等資訊
defaultMQPushConsumer.subscribe((RocketMQConstant.TOPIC_PREFIX + "client"), "*");
// 設定消費者消息監聽器
defaultMQPushConsumer.setMessageListener(defaultListener);
// 啟動消費者
defaultMQPushConsumer.start();
// 把建立的消費者放到一個集合中,當程式結束時統一銷毀
mqConsumers.add(defaultMQPushConsumer);
return defaultMQPushConsumer;
}
- 消費者監聽消息
@Slf4j
@Component
public class DefaultListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(messageExtList)) {
log.info("本次消息為空");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt messageExt : messageExtList) {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("消息 topic: {}, tags: {}, 消息内容:{}", topic, tags, body);
if (messageExt.getDelayTimeLevel() != 0) {
log.info("本次消息延時等級:{}, 延時時長為:{}", messageExt.getDelayTimeLevel(), messageExt.getProperty("delayTime"));
}
try {
// 線程休眠模拟消費者業務執行
TimeUnit.MILLISECONDS.sleep(1500);
} catch (InterruptedException e) {
log.info("消費者業務邏輯發生異常", e);
log.info("本次消息将放入重試隊列");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
普通消息異步發送
RocketMQ 将會在成功接收到消息後或接收異常時開啟一個異步線程回調生産者的接口,通知生産者本次消息的發送狀态。
使用場景
一般對響應時間敏感的業務場景都合适。适合發送的消息太大或者業務對等待發送結果的時間較為敏感。
代碼實作
和普通消息同步發送的差別在于發送時調用的方法,其他代碼都一緻。
@ApiOperation("異步發送普通消息")
@GetMapping("/async-ordinary")
public String sendOrdinaryMessageAsynchronously() throws RemotingException, InterruptedException, MQClientException {
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client"), "async", "send ordinary message asynchronously".getBytes(StandardCharsets.UTF_8));
this.defaultMQProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息發送成功:{}", sendResult.toString());
}
@Override
public void onException(Throwable e) {
log.info("消息發送失敗,原因:", e);
}
});
return "send complete";
}
普通消息單向發送
把消息發送後就完成本次操作,性能較高。
使用場景
适合不需要關心消息發送的的到達狀态的場景,如日志采集等。
代碼實作
和普通消息同步發送的差別在于發送時調用的方法,其他代碼都一緻。
@ApiOperation("發送單向普通消息")
@GetMapping("/one-way")
public String sendOneWayMessage() throws RemotingException, InterruptedException, MQClientException {
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client"), "one-way", "send one-way message".getBytes(StandardCharsets.UTF_8));
this.defaultMQProducer.sendOneway(message);
return "send complete";
}
RocketMQ 消息消費模式
叢集消費模式
如果一個消費者組内有多個消費者,它們訂閱同一個 Topic 的消息,當隊列中有消息到來時,RocketMQ 會「雨露均沾」地分發這些消息給各個消費者, 消費者均攤這些消息 ,這些消息隻會被投放到具體一個消費者執行個體,消息隻會被消費一次。
預設的模式,消費進度存儲在 Broker 中,可靠性更高。
代碼實作
- 定義兩個叢集模式的消費者
/**
* 叢集消費的消費者 1
*/
@Bean
public DefaultMQPushConsumer clusteringMQPushConsumerOne(MessageListenerConcurrently clusteringListenerOne) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.setNamesrvAddr(rocketMQConsumerProperties.getNameServerAddr());
defaultMQPushConsumer.setInstanceName("clustering-consumer-one");
defaultMQPushConsumer.setConsumerGroup((RocketMQConstant.CONSUMER_GROUP_PREFIX + "client-clustering"));
// 設定消費模式,預設是叢集消費模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
defaultMQPushConsumer.subscribe((RocketMQConstant.TOPIC_PREFIX + "client-clustering"), "*");
defaultMQPushConsumer.setMessageListener(clusteringListenerOne);
defaultMQPushConsumer.start();
mqConsumers.add(defaultMQPushConsumer);
return defaultMQPushConsumer;
}
/**
* 叢集消費的消費者 2
*/
@Bean
public DefaultMQPushConsumer clusteringMQPushConsumerTwo(MessageListenerConcurrently clusteringListenerTwo) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.setNamesrvAddr(this.rocketMQConsumerProperties.getNameServerAddr());
defaultMQPushConsumer.setInstanceName("clustering-consumer-two");
defaultMQPushConsumer.setConsumerGroup((RocketMQConstant.CONSUMER_GROUP_PREFIX + "client-clustering"));
// 設定消費模式,預設是叢集消費模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
defaultMQPushConsumer.subscribe((RocketMQConstant.TOPIC_PREFIX + "client-clustering"), "*");
defaultMQPushConsumer.setMessageListener(clusteringListenerTwo);
defaultMQPushConsumer.start();
mqConsumers.add(defaultMQPushConsumer);
return defaultMQPushConsumer;
}
由于需要同一個消費者組定義多個消費者,RocketMQ 不能自動區分這些消費者,是以我們需要手動為消費者設定一個用于區分的名字,使用 setInstanceName() 方法。
- 消費結果
可以看到兩個消費者是共同平分了這些消息的。
廣播消費模式
如果一個消費者組内有多個消費者,它們訂閱同一個 Topic 的消息,當隊列中有消息到來時,這些消息都會被 投放到每一個消費者執行個體上 。
這種消費模式下,消費進度不會儲存到 Broker 中,而是持久化到消費者執行個體中,因為消息被複制成多分給多個消費者進行消費了,消費進度隻和消費者執行個體相關。
消息重複消費的風險會變大,不支援順序消費,無法重置消費位點,當消費者用戶端重新開機,會丢失重新開機時間段内傳到 RocketMQ 的消息, 一般情況不推薦使用 。
代碼實作
- 定義兩個廣播模式的消費者,和叢集模式的定義唯一的差別就是消費模式的差別。
/**
* 廣播消費的消費者 1
*/
@Bean
public DefaultMQPushConsumer broadcastMQPushConsumerOne(MessageListenerConcurrently broadcastListenerOne) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.setNamesrvAddr(this.rocketMQConsumerProperties.getNameServerAddr());
defaultMQPushConsumer.setInstanceName("broadcast-consumer-one");
defaultMQPushConsumer.setConsumerGroup((RocketMQConstant.CONSUMER_GROUP_PREFIX + "client-broadcast"));
// 設定消費模式,預設是叢集消費模式
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
defaultMQPushConsumer.subscribe((RocketMQConstant.TOPIC_PREFIX + "client-broadcast"), "*");
defaultMQPushConsumer.setMessageListener(broadcastListenerOne);
defaultMQPushConsumer.start();
mqConsumers.add(defaultMQPushConsumer);
return defaultMQPushConsumer;
}
/**
* 廣播消費的消費者 2
*/
@Bean
public DefaultMQPushConsumer broadcastMQPushConsumerTwo(MessageListenerConcurrently broadcastListenerTwo) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.setNamesrvAddr(this.rocketMQConsumerProperties.getNameServerAddr());
defaultMQPushConsumer.setInstanceName("broadcast-consumer-two");
defaultMQPushConsumer.setConsumerGroup((RocketMQConstant.CONSUMER_GROUP_PREFIX + "client-broadcast"));
// 設定消費模式,預設是叢集消費模式
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
defaultMQPushConsumer.subscribe((RocketMQConstant.TOPIC_PREFIX + "client-broadcast"), "*");
defaultMQPushConsumer.setMessageListener(broadcastListenerTwo);
defaultMQPushConsumer.start();
mqConsumers.add(defaultMQPushConsumer);
return defaultMQPushConsumer;
}
- 消費結果
可以看到盡管消息消費的順序不盡相同,但是兩個消費者都消費了每一個消息。
RocketMQ 順序消息
生産者按照順序把消息發送到 RocketMQ,然後 RocketMQ 按照投遞消息的順序把消息投遞給消費者消費。
使用場景
适合邏輯上具有先後次序的業務場景。比如:先下單後支付等。
順序消費消息
一般消費者消費消息時會實作 MessageListenerConcurrently 接口,消費者可以并發地消費消息,提高消費效率。
但是當消費者需要按順序消費消息則需要實作 MessageListenerOrderly 接口。并且當消息消費異常時,傳回的狀态是 SUSPEND_CURRENT_QUEUE_A_MOMENT 代表等待一會之後再消費,不能放到重試隊列,因為會導緻順序性被破壞。
代碼實作,以全局有序消費者為例:
@Slf4j
@Component
public class GlobalOrderListener implements MessageListenerOrderly {
private final Lock lock = new ReentrantLock();
// 随機消費失敗 3 次示範順序消息遇到消費不到的消息的處理方式
private int times = 0;
// 記錄上一次消費失敗消息的 number 屬性值,下一次消費時不再失敗
private int lastNumber = -1;
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeOrderlyContext context) {
// 能保證每次隻有一條消息
MessageExt messageExt = messageExtList.get(0);
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
if (times < 3) {
int number = Integer.parseInt(messageExt.getProperty("number"));
// 如果是 3 的倍數且失敗次數還沒達到,那麼手動讓本次消息消費失敗
if (lastNumber != number && number % 3 == 0) {
log.info("GlobalOrderListener 消費消息失敗,稍後再消費");
try {
lock.lock();
times++;
lastNumber = number;
} finally {
lock.unlock();
}
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
} else {
log.info("GlobalOrderListener 成功消費消息:{}", body);
return ConsumeOrderlyStatus.SUCCESS;
}
} else {
log.info("GlobalOrderListener 成功消費消息:{}", body);
return ConsumeOrderlyStatus.SUCCESS;
}
}
}
生産全局順序消息
隻建立一個 Queue,生産者把所有消息都發送到這個 Queue 上,此時所有消息都隻能按照先進先出的特點消費。而且一個Queue隻能由一個消費者來訂閱,是以也隻能有一個消費者來消費消息,此時消息中間件的存在意義很低。
這種方式導緻整個業務變得不靈活,而且效率也不高, 不推薦使用 。
代碼實作
- 生産者定義
@Bean
public DefaultMQProducer globalMQProducer() throws MQClientException {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr(rocketMQProducerProperties.getNameServerAddr());
defaultMQProducer.setProducerGroup((RocketMQConstant.PRODUCER_GROUP_PREFIX + "client-global-order"));
// 全局有序消息,生産者隻定義一個隊列
defaultMQProducer.setDefaultTopicQueueNums(1);
defaultMQProducer.start();
mqProducers.add(defaultMQProducer);
return defaultMQProducer;
}
- 發送消息
@ApiOperation("測試全局有序消息")
@GetMapping("/global-order")
public String sendGlobalOrderMessage() throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
for (int i = 1; i <= 20; i++) {
String messageBody = "測試全局有序第" + i + "條消息";
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-global-order"), messageBody.getBytes(StandardCharsets.UTF_8));
message.putUserProperty("number", String.valueOf(i));
this.globalMQProducer.send(message);
}
return "send complete";
}
- 消費結果
生産局部順序消息
對消息指定發送到一個具體的 Queue,這些消息在局部上是有序的,正如購買手機、衣服時,兩種商品都需要經過下訂單、扣庫存、付款的流程,商品的這些流程是有順序要求的,但是兩種商品之間的流程是沒有關聯的,是以可以處理成局部有序的。
推薦使用這種方式,分區有序的消費方式不會降低太多消費性能。
代碼實作
- 生産者定義
@Bean
public DefaultMQProducer partitionedMQProducer() throws MQClientException {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr(rocketMQProducerProperties.getNameServerAddr());
defaultMQProducer.setProducerGroup((RocketMQConstant.PRODUCER_GROUP_PREFIX + "client-partitioned-order"));
// 由于消費者方定義了兩個消費者來示範此功能,是以定義兩個隊列來對應兩個消費者
defaultMQProducer.setDefaultTopicQueueNums(2);
defaultMQProducer.start();
mqProducers.add(defaultMQProducer);
return defaultMQProducer;
}
- 發送消息,在發送消息時,多加兩個參數:第一個參數類型是 MessageQueueSelector 的匿名内部類,用于定義消息隊列選擇算法,計算這個消息将被投遞到哪一個消息隊列上。第二參數是選擇算法中使用到的,比如我這裡的實作就是分别用 1-10 和 2 進行模運算(因為一開始隻定義了兩個隊列),計算的結果就是隊列的序号。
@ApiOperation("測試分區有序消息")
@GetMapping("/partitioned-order")
public String sendPartitionedOrderMessage() throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
for (int i = 1; i <= 10; i++) {
if (i % 2 == 0) {
String messageBody = "手機訂單建立-" + i;
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-partitioned-order"), "phone-order", messageBody.getBytes(StandardCharsets.UTF_8));
message.putUserProperty("number", String.valueOf(i));
this.partitionedMQProducer.send(message, (messageQueueList, msg, arg) -> {
Integer id = (Integer) arg;
//使用取模算法确定 id 存放到哪個隊列
//index 就是要存放的隊列的索引
int index = id % 2;
return messageQueueList.get(index);
}, i);
messageBody = "手機訂單支付-" + i;
message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-partitioned-order"), "phone-pay", messageBody.getBytes(StandardCharsets.UTF_8));
message.putUserProperty("number", String.valueOf(i));
this.partitionedMQProducer.send(message, (messageQueueList, msg, arg) -> {
Integer id = (Integer) arg;
int index = id % 2;
return messageQueueList.get(index);
}, i);
messageBody = "手機訂單發貨-" + i;
message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-partitioned-order"), "phone-deliver", messageBody.getBytes(StandardCharsets.UTF_8));
message.putUserProperty("number", String.valueOf(i));
this.partitionedMQProducer.send(message, (messageQueueList, msg, arg) -> {
Integer id = (Integer) arg;
int index = id % 2;
return messageQueueList.get(index);
}, i);
} else {
String messageBody = "衣服訂單建立-" + i;
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-partitioned-order"), "clothes-order", messageBody.getBytes(StandardCharsets.UTF_8));
message.putUserProperty("number", String.valueOf(i));
this.partitionedMQProducer.send(message, (messageQueueList, msg, arg) -> {
Integer id = (Integer) arg;
int index = id % 2;
return messageQueueList.get(index);
}, i);
messageBody = "衣服訂單支付-" + i;
message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-partitioned-order"), "clothes-pay", messageBody.getBytes(StandardCharsets.UTF_8));
message.putUserProperty("number", String.valueOf(i));
this.partitionedMQProducer.send(message, (messageQueueList, msg, arg) -> {
Integer id = (Integer) arg;
int index = id % 2;
return messageQueueList.get(index);
}, i);
messageBody = "衣服訂單發貨-" + i;
message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-partitioned-order"), "clothes-deliver", messageBody.getBytes(StandardCharsets.UTF_8));
message.putUserProperty("number", String.valueOf(i));
this.partitionedMQProducer.send(message, (messageQueueList, msg, arg) -> {
Integer id = (Integer) arg;
int index = id % 2;
return messageQueueList.get(index);
}, i);
}
}
return "send complete";
}
RocketMQ 延時消息
生産者把消息發送給 RocketMQ 時,不希望 RocketMQ 立馬把消息投遞到消費者,而是延遲一定的時間,再投遞,這種消息就是延時消息。
社群版的 RocketMQ 目前是支援了 18 個固定的延時間隔。
延時等級定義在 RocketMQ 服務端的 MessageStoreConfig 類中的如下變量中。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
使用場景
電商交易系統的訂單逾時未支付,自動取消訂單。下訂單時鎖定庫存,如果 30 分鐘後這個消息投遞給了下遊的消費服務,消費者服務會去檢查這個訂單的狀态,如果支付成功,則忽略不處理;如果訂單依然是未支付,那麼取消訂單,釋放庫存等。
代碼實作
生産者、消費者定義和發送普通消息一緻,隻是調用的方法有差別
- 發送消息
@ApiOperation("發送延時消息")
@GetMapping("/delay-message")
public String sendDelayMessage() throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client"), "delay", "send third delay level message".getBytes(StandardCharsets.UTF_8));
message.setDelayTimeLevel(3);
message.putUserProperty("delayTime", "10 秒");
this.defaultMQProducer.send(message);
return "send complete";
}
- 消費結果,當消費者進入一個穩定消費的狀态後,可以看到當生産者發送消息後隔 10 秒左右消費者才有消息消費的日志出現
RocketMQ 批量消息
當有大批量的消息需要發送時,生産者還是一條一條地發,會出現系統瓶頸,可以把這些消息放到一個集合裡面,一次性發送一個集合所有消息。
但是批量消息也有大小上的限制,一次發送的組裝後的消息不能超過消息最大限制(預設是 4MB),是以組裝消息時需要注意,當超出限制時需要把消息清單分割後再發送。
代碼實作
生産者、消費者定義和發送普通消息一緻,隻是調用的方法有差別
- 定義消息分隔器
public class MessagesSplitter implements Iterator<List<Message>> {
private final int MAX_SIZE = 1024 * 1024 * 4;
private final int LOG_SIZE = 20;
private final List<Message> messages;
private int currentIndex = 0;
public MessagesSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currentIndex < messages.size();
}
@Override
public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
while (nextIndex < messages.size()) {
Message message = messages.get(nextIndex);
// 計算目前消息的長度
int singleMessageSize = calcMessageTotalSize(message);
// 隻要消息還沒超出長度限制就一直往後累計直到達到消息長度限制
if (singleMessageSize + totalSize > MAX_SIZE) {
break;
} else {
totalSize += singleMessageSize;
}
nextIndex++;
}
// 提取子集合
List<Message> subList = messages.subList(startIndex, nextIndex);
currentIndex = nextIndex;
return subList;
}
// 計算一個消息的尺寸
private int calcMessageTotalSize(Message message) {
int size = message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
size += entry.getKey().length();
size += entry.getValue().length();
}
size += LOG_SIZE;
return size;
}
// 擷取下一個應該取的索引
private int getStartIndex() {
// 先擷取目前集合第一個消息的長度
Message currentMessage = messages.get(currentIndex);
int currentMessageSize = calcMessageTotalSize(currentMessage);
while (currentMessageSize > MAX_SIZE) {
// 如果這個消息的長度本就大于消息長度限制
// 那麼就取下一個消息,直到消息長度小于長度限制
currentIndex += 1;
currentMessage = messages.get(currentIndex);
currentMessageSize = calcMessageTotalSize(currentMessage);
}
return currentIndex;
}
}
- 發送消息,使用分割器每次擷取一批大小合适的消息
@ApiOperation("批量發送消息")
@GetMapping("/batch-message")
public String sendBatchMessage() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
List<Message> messages = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
String messageBody = "測試批量發送消息第" + i + "條消息";
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client"), "batch", messageBody.getBytes(StandardCharsets.UTF_8));
messages.add(message);
}
// 每次擷取一批不超出消息大小限制的消息來發送
MessagesSplitter messagesSplitter = new MessagesSplitter(messages);
while (messagesSplitter.hasNext()) {
List<Message> subMessageList = messagesSplitter.next();
SendResult sendResult = this.defaultMQProducer.send(subMessageList);
log.info("消息發送狀态: {}", sendResult);
}
return "send complete";
}
RocketMQ 過濾消息
RocketMQ 過濾消息是指消費者通過一定的方式篩選自己需要的消息,過濾消息有 Tag 過濾和 SQL 過濾兩種方式。
Tag 過濾
生産者發送消息時傳入 Tag,消費者訂閱消息時,指定訂閱某些 Tag。這種方式使用起來比較容易,效率高,适用于簡單過濾的場景。比如隻訂閱手機類型、衣服類型的訂單消息。
代碼實作
- 消費者定義,監聽器邏輯和普通消息的監聽器大同小異,不羅列出來了
/**
* 使用 Tag 過濾的消費者
*/
@Bean
public DefaultMQPushConsumer tagFilterConsumer(MessageListenerConcurrently tagListenerOne) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.setNamesrvAddr(rocketMQConsumerProperties.getNameServerAddr());
defaultMQPushConsumer.setConsumerGroup((RocketMQConstant.CONSUMER_GROUP_PREFIX + "client-tag-filter"));
defaultMQPushConsumer.subscribe((RocketMQConstant.TOPIC_PREFIX + "client-tag-filter"),
MessageSelector.byTag("phone || shoes"));
defaultMQPushConsumer.setMessageListener(tagListenerOne);
defaultMQPushConsumer.start();
mqConsumers.add(defaultMQPushConsumer);
return defaultMQPushConsumer;
}
- 發送消息
@ApiOperation("測試 tag 過濾消息")
@GetMapping("/tag-filter-message")
public String tagFilterMessage() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
// 消費者方設定如下
// 消費者 1 隻接受 tag 為 phone 或 shoes 的消息
// 消費者 2 隻接受 tag 為 phone 或 clothes,并且 price 位于 [10,20] 區間的消息
Message message1 = new Message((RocketMQConstant.TOPIC_PREFIX + "client-tag-filter"), "phone", "手機訂單消息:17 元".getBytes(StandardCharsets.UTF_8));
message1.putUserProperty("price", "17");
this.defaultMQProducer.send(message1);
log.info("生産者發送消息:{}", message1);
Message message2 = new Message((RocketMQConstant.TOPIC_PREFIX + "client-tag-filter"), "phone", "手機訂單消息:26 元".getBytes(StandardCharsets.UTF_8));
message2.putUserProperty("price", "26");
this.defaultMQProducer.send(message2);
log.info("生産者發送消息:{}", message2);
Message message3 = new Message((RocketMQConstant.TOPIC_PREFIX + "client-tag-filter"), "clothes", "衣服訂單消息:19 元".getBytes(StandardCharsets.UTF_8));
message3.putUserProperty("price", "19");
this.defaultMQProducer.send(message3);
log.info("生産者發送消息:{}", message3);
Message message4 = new Message((RocketMQConstant.TOPIC_PREFIX + "client-tag-filter"), "shoes", "鞋子訂單消息:null".getBytes(StandardCharsets.UTF_8));
this.defaultMQProducer.send(message4);
log.info("生産者發送消息:{}", message4);
return "send complete";
}
- 消費結果,最終隻有 tag 為 phone 和 clothes 的消息能被消費者消費
SQL 過濾
SQL 過濾是指使用一些類似 SQL 語句的文法進行過濾 ,如 is null、between 等關鍵詞。生産者在發送消息時,給消息自定義某些屬性;消費者訂閱消息時使用 SQL 語句來對這些屬性進行過濾,這種方式實作起來有難度,但是靈活。
但是要使用這個 SQL 過濾的特性,有一個前提就是:Broker 需要開啟屬性過濾。要開啟這個功能,需要在 broker.conf 檔案中加入 enablePropertyFilter=true 。否則消費者啟動時會提示:
Caused by: org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2242) ~[rocketmq-client-4.8.0.jar:4.8.0]
at org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:449) ~[rocketmq-client-4.8.0.jar:4.8.0]
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:648) ~[rocketmq-client-4.8.0.jar:4.8.0]
...
- 消費者定義
@Bean
public DefaultMQPushConsumer sqlFilterConsumer(MessageListenerConcurrently defaultListener) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
defaultMQPushConsumer.setNamesrvAddr(rocketMQConsumerProperties.getNameServerAddr());
defaultMQPushConsumer.setConsumerGroup((RocketMQConstant.CONSUMER_GROUP_PREFIX + "client-sql-filter"));
defaultMQPushConsumer.subscribe((RocketMQConstant.TOPIC_PREFIX + "client-sql-filter"),
MessageSelector.bySql("price is not null and price between 10 and 30"));
defaultMQPushConsumer.setMessageListener(defaultListener);
defaultMQPushConsumer.start();
mqConsumers.add(defaultMQPushConsumer);
return defaultMQPushConsumer;
}
- 發送消息
@ApiOperation("測試 sql 過濾消息")
@GetMapping("/sql-filter-message")
public String sqlFilterMessage() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
// 消費者方設定如下
// 隻有 price 在 [10-30] 區間才能接收并消費
Message message1 = new Message((RocketMQConstant.TOPIC_PREFIX + "client-sql-filter"), "phone", "手機訂單消息:18 元".getBytes(StandardCharsets.UTF_8));
message1.putUserProperty("price", "18");
this.defaultMQProducer.send(message1);
log.info("生産者發送消息:{}", message1);
Message message2 = new Message((RocketMQConstant.TOPIC_PREFIX + "client-sql-filter"), "clothes", "衣服訂單消息:7 元".getBytes(StandardCharsets.UTF_8));
message2.putUserProperty("price", "7");
this.defaultMQProducer.send(message2);
log.info("生産者發送消息:{}", message2);
Message message3 = new Message((RocketMQConstant.TOPIC_PREFIX + "client-sql-filter"), "clothes", "衣服訂單消息:20 元".getBytes(StandardCharsets.UTF_8));
message3.putUserProperty("price", "20");
this.defaultMQProducer.send(message3);
log.info("生産者發送消息:{}", message3);
return "send complete";
}
- 消費結果
可以看到隻有價格位于 [10, 30] 的兩條消息能成功被消費
RocketMQ 事務消息
基于可以發送事務消息這一特性,RocketMQ 成為了分布式事務的解決方案之一,RocketMQ 的事務消息适用于所有對資料最終一緻性有強需求的場景。
unknown
使用場景
RocketMQ 的事務消息适用于所有對資料最終一緻性有強需求的場景。
代碼實作
由于消費者及其監聽器邏輯與普通消息差別不大,是以代碼重點展示生産者代碼及其結果
- 生産者定義
@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "learning.rocketmq.producer.producer-switch", name = "transaction", havingValue = "true")
public class RocketMQTransactionProducerConfiguration extends RocketMQBaseProducerConfiguration {
@Bean
public TransactionMQProducer transactionMQProducer(TransactionListener bizTransactionListener) throws MQClientException {
// 定義事務型生産者
TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
transactionMQProducer.setNamesrvAddr(rocketMQProducerProperties.getNameServerAddr());
transactionMQProducer.setProducerGroup((RocketMQConstant.PRODUCER_GROUP_PREFIX + "client-transactional"));
// 定義事務監聽器
transactionMQProducer.setTransactionListener(bizTransactionListener);
transactionMQProducer.start();
mqProducers.add(transactionMQProducer);
return transactionMQProducer;
}
@Bean
public TransactionListener bizTransactionListener() {
return new TransactionListener() {
// 執行生産者方本地事務
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("接收到 RocketMQ 的 Half 消息的響應,現在執行本地事務。..");
int number = (Integer) arg;
try {
// 事務執行邏輯執行一個除法運算,可以示範執行失敗的情況
Integer result = 100 / number;
log.info("事務執行結果:{}", result);
// 線程睡眠 500 毫秒模拟本地事務執行
TimeUnit.MILLISECONDS.sleep(500);
log.info("本地事務執行成功,給 RocketMQ 發送 ACK 響應");
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.info("本地事務執行發生異常,需要復原事務");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 回查本地事務執行情況
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("由于 RocketMQ 長時間無法收到消息的狀态或本地執行事務狀态為 UNKNOW,現在執行補償事務/回查本地事務。..");
return LocalTransactionState.COMMIT_MESSAGE;
}
};
}
}
當使用事務型生産者時,就能展現出生産者組的作用:當生産者發生當機時,Broker 可以向同一個組内其他生産者調用回查本地事務執行情況。
- 消息發送
@ApiOperation("發送事務消息")
@GetMapping("/{number}")
public String sendTransactionMessage(@PathVariable Integer number) throws MQClientException {
log.info("接收到事務請求,準備執行生産者本地事務。..");
Message message = new Message((RocketMQConstant.TOPIC_PREFIX + "client-transaction"), "通知消費者執行本地事務的事務消息".getBytes(StandardCharsets.UTF_8));
// 把 number 傳入,在執行本地事務時使用
this.transactionMQProducer.sendMessageInTransaction(message, number);
return "事務消息發送成功";
}
- 生産者本地事務執行成功
生産者事務執行成功後,會發送 ACK 到 RocketMQ 通知本次事務成功送出了,然後消費者能收到消息進行消費。
- 生産者本地事務執行失敗
number 參數傳入 0 導緻除 0 異常。
復原事務後,消費者無法收到此消息。
這篇文章使用 spring-boot 內建 rocketmq-client 的方式示範了 RocketMQ 大部分的使用場景,希望能給有需要的你有幫助。