1.測試環境:
broker :1master,1slave
producer: 1
consumer:3
2.架構圖:
3.實作功能
Producer 投遞到Broker 的消息,要實作順序消費,而且Consumer 消費組C1,C2,C3要實作負載均衡消費
4.技術原理
Broker 的裡面的Topic(消息邏輯存儲單元)包含多個Queue(假定是4個),該Queue 的Nums 要> Consumer 執行個體數量,
将投遞的所有消息根據ID做索引配置設定到指定的Queue ,每個Queue 又被指定的單個Consumer消費
不同的Queue 之間實作了并發消費,同一個Queue内部實作單線程順序消費,思想類似于ConcurrentHashMap 的分段鎖
5.示例代碼
PayController.java
package net.xdclass.xdclassmq.controller;
import net.xdclass.xdclassmq.domain.ProductOrder;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.List;
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/api/v2/pay_cb")
public Object callback() throws Exception {
List<ProductOrder> list = ProductOrder.getOrderList();
for(int i=0; i< list.size(); i++){
ProductOrder order = list.get(i);
Message message = new Message(JmsConfig.ORDERLY_TOPIC,"",
order.getOrderId()+"",order.toString().getBytes());
SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//将消息進行索引處理,配置設定到不同的queue
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
},order.getOrderId());
System.out.printf("發送結果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType());
}
return new HashMap<>();
}
}
PayProduct
package net.xdclass.xdclassmq.domain;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ProductOrder implements Serializable {
//訂單id
private long orderId;
//操作類型
private String type;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public ProductOrder(){}
public ProductOrder(long orderId ,String type){
this.orderId = orderId;
this.type = type;
}
public static List<ProductOrder> getOrderList(){
//模拟産生許多順序的消息
List<ProductOrder> list = new ArrayList<>();
list.add(new ProductOrder(111L,"建立訂單"));
list.add(new ProductOrder(222L,"建立訂單"));
list.add(new ProductOrder(111L,"支付訂單"));
list.add(new ProductOrder(222L,"支付訂單"));
list.add(new ProductOrder(111L,"完成訂單"));
list.add(new ProductOrder(333L,"建立訂單"));
list.add(new ProductOrder(222L,"完成訂單"));
list.add(new ProductOrder(333L,"支付訂單"));
list.add(new ProductOrder(333L,"完成訂單"));
return list;
}
@Override
public String toString() {
return "ProductOrder{" +
"orderId=" + orderId +
", type='" + type + '\'' +
'}';
}
}
JmsConfig.java
package net.xdclass.xdclassmq.jms;
public class JmsConfig {
public static final String NAME_SERVER = "192.168.33.129:9876;192.168.33.130:9876";
public static final String TOPIC = "xdclass_pay_test_topic_888";
public static final String ORDERLY_TOPIC = "xdclass_pay_test_topic_666";
}
PayOrderlyConsumer.java
package net.xdclass.xdclassmq.jms;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class PayOrderlyConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "pay_orderly_consumer_group";
public PayOrderlyConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//預設是叢集方式,可以更改為廣播,但是廣播方式不支援重試
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*");
consumer.registerMessageListener( new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
MessageExt msg = msgs.get(0);
try {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//做業務邏輯操作 TODO
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
PayProducer.java
package net.xdclass.xdclassmq.jms;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
@Component
public class PayProducer {
private String producerGroup = "pay_producer_group";
private DefaultMQProducer producer;
public PayProducer(){
producer = new DefaultMQProducer(producerGroup);
//生産者投遞消息重試次數
producer.setRetryTimesWhenSendFailed(3);
//指定NameServer位址,多個位址以 ; 隔開
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 對象在使用之前必須要調用一次,隻能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在應用上下文,使用上下文監聽器,進行關閉
*/
public void shutdown(){
this.producer.shutdown();
}
}
consumer 節點啟動多個:
IDEA 模拟測試勾選去掉 ,根據配置不同的 application.yml 的端口,繼續啟動服務即啟動多個consumer執行個體了