天天看點

Rocketmq 消費者組 順序、平均消費實作原理

1.測試環境:

  broker :1master,1slave

  producer: 1

  consumer:3

2.架構圖:

Rocketmq 消費者組 順序、平均消費實作原理

3.實作功能

Producer 投遞到Broker 的消息,要實作順序消費,而且Consumer 消費組C1,C2,C3要實作負載均衡消費

4.技術原理

Broker 的裡面的Topic(消息邏輯存儲單元)包含多個Queue(假定是4個),該Queue 的Nums 要> Consumer 執行個體數量,

将投遞的所有消息根據ID做索引配置設定到指定的Queue ,每個Queue 又被指定的單個Consumer消費

不同的Queue 之間實作了并發消費,同一個Queue内部實作單線程順序消費,思想類似于ConcurrentHashMap 的分段鎖

5.示例代碼

PayController.java

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.domain.ProductOrder;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @RequestMapping("/api/v2/pay_cb")
    public Object callback() throws Exception {

        List<ProductOrder> list  = ProductOrder.getOrderList();

        for(int i=0; i< list.size(); i++){
            ProductOrder order = list.get(i);
            Message message = new Message(JmsConfig.ORDERLY_TOPIC,"",
                    order.getOrderId()+"",order.toString().getBytes());

         SendResult sendResult =  payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {              
                    //将消息進行索引處理,配置設定到不同的queue
                    Long id = (Long) arg;
                    long index = id % mqs.size();
                    return mqs.get((int)index);
                }
            },order.getOrderId());


         System.out.printf("發送結果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType());

        }



        return new HashMap<>();
    }


}
           

PayProduct

package net.xdclass.xdclassmq.domain;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProductOrder implements Serializable {

    //訂單id
    private long orderId;

    //操作類型
    private String type;


    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public  ProductOrder(){}

    public  ProductOrder(long orderId ,String type){
        this.orderId = orderId;
        this.type = type;

    }


    public static List<ProductOrder> getOrderList(){
         //模拟産生許多順序的消息
        List<ProductOrder> list = new ArrayList<>();
        list.add(new ProductOrder(111L,"建立訂單"));
        list.add(new ProductOrder(222L,"建立訂單"));
        list.add(new ProductOrder(111L,"支付訂單"));
        list.add(new ProductOrder(222L,"支付訂單"));
        list.add(new ProductOrder(111L,"完成訂單"));
        list.add(new ProductOrder(333L,"建立訂單"));
        list.add(new ProductOrder(222L,"完成訂單"));
        list.add(new ProductOrder(333L,"支付訂單"));
        list.add(new ProductOrder(333L,"完成訂單"));

        return list;

    }

    @Override
    public String toString() {
        return "ProductOrder{" +
                "orderId=" + orderId +
                ", type='" + type + '\'' +
                '}';
    }
}
           

JmsConfig.java

package net.xdclass.xdclassmq.jms;

public class JmsConfig {


    public static final String NAME_SERVER = "192.168.33.129:9876;192.168.33.130:9876";

    public static final String TOPIC = "xdclass_pay_test_topic_888";

    public static final String ORDERLY_TOPIC = "xdclass_pay_test_topic_666";

}
           
PayOrderlyConsumer.java
           
package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class PayOrderlyConsumer {


    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_orderly_consumer_group";

    public PayOrderlyConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //預設是叢集方式,可以更改為廣播,但是廣播方式不支援重試
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*");


        consumer.registerMessageListener( new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt msg = msgs.get(0);
                try {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                //做業務邏輯操作 TODO

                return ConsumeOrderlyStatus.SUCCESS;

            } catch (Exception e) {

                e.printStackTrace();
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            }

        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}
           
PayProducer.java
           
package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class PayProducer {

    private String producerGroup = "pay_producer_group";

    private DefaultMQProducer producer;


    public  PayProducer(){
        producer = new DefaultMQProducer(producerGroup);

        //生産者投遞消息重試次數
        producer.setRetryTimesWhenSendFailed(3);

        //指定NameServer位址,多個位址以 ; 隔開
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);

        start();
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 對象在使用之前必須要調用一次,隻能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    /**
     * 一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown(){
        this.producer.shutdown();
    }


}
           

consumer 節點啟動多個:

IDEA 模拟測試勾選去掉 ,根據配置不同的 application.yml 的端口,繼續啟動服務即啟動多個consumer執行個體了

Rocketmq 消費者組 順序、平均消費實作原理

繼續閱讀