天天看點

RocketMQ-如何建立消費者一、DefaultMQPushConsumer二、DefaultMQPullConsumer

本文基于RocketMQ 4.7.1版本

rocketmq提供了兩個類用于消費消息,分别是DefaultMQPullConsumer和DefaultMQPushConsumer,下面分别介紹如何使用這兩個類。

文章目錄

  • 一、DefaultMQPushConsumer
  • 二、DefaultMQPullConsumer

一、DefaultMQPushConsumer

public static void main(String[] args) throws MQClientException {
    //建立消費者,消費組為consumer-A
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-A");
    //指定nameserver位址,可以有多個,使用分号分隔
    consumer.setNamesrvAddr("localhost:9876");
    //設定訂閱的主題,第二個參數表示該消費者可以消費哪些tag的消息,tag是生産者生産消息時标記的
    //*或者null表示接收所有的tag消息,可以使用“tag1||tag2”過濾消息,而且隻支援中間使用||分隔
    consumer.subscribe("topicTest", "*");
    //設定監聽器,當主題中有消息時,調用監聽器消費消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs.get(0).getQueueId());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //啟動消費者
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
           

DefaultMQPushConsumer提供了多個重載構造函數,例如:

  • public DefaultMQPushConsumer(final String consumerGroup)
  • public DefaultMQPushConsumer()
  • public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook)
  • public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy)
  • public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

上面的無參構造方法由于沒有指定消費組,是以使用預設的消費組:DEFAULT_CONSUMER。不過在啟動消費者的時候,rocketmq會檢查組名,如果組名等于預設消費組,rocketmq會報錯,是以如果使用預設構造方法建立對象後,還需要調用setConsumerGroup()方法重新設定組名。

下面介紹構造方法裡面每個參數的作用。

  • consumerGroup:指定消費者的消費組,每個消費者都屬于一個消費組,組内的每個消費者消費主題的一個或多個隊列,一個隊列隻能有一個消費者消費,rocketmq提供了再平衡機制保證當有多個消費者時,如果其中一個挂掉,可以将它消費的隊列配置設定給其他消費者;
  • namespace:命名空間,與訂閱的主題名組合在一起,形成最終的主題名;
  • rpcHook:該類提供了doBeforeRequest和doAfterResponse方法,用于在發送請求前和收到broker響應後對請求和響應内容做回調處理。
  • allocateMessageQueueStrategy:再平衡時使用,用于将主題隊列配置設定給各個消費者。
  • enableMsgTrace和customizedTraceTopic:用于消息軌迹,本文不對此介紹。

我們可以根據需求選擇不同的構造方法。

二、DefaultMQPullConsumer

private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();

public static void main(String[] args) throws MQClientException {
	//建立消費者
	DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer-A");
	consumer.setNamesrvAddr("localhost:9876");
	consumer.start();//啟動
	//擷取主題下所有的消息隊列
	Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
	for(MessageQueue mq:mqs){
		try {
			//擷取目前隊列的消費位移,第二個參數表示位移是從本地記憶體擷取,還是從broker擷取
			//true表示從broker擷取
			long offset = consumer.fetchConsumeOffset(mq,true);
			while(true){
				//第二個參數表示可以消費哪些tag的消息
				//第三個參數表示從哪個位移開始消費消息
				//第四個參數表示一次最大拉多少個消息
				PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, 
						getMessageQueueOffset(mq), 32);
				putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
				//根據拉取消息的狀态,切換到不同分支
				switch(pullResult.getPullStatus()){
				case FOUND:
					List<MessageExt> messageExtList = pullResult.getMsgFoundList();
					//消費消息
          for (MessageExt m : messageExtList) {
              System.out.println(new String(m.getBody()));
          }
					break;
				case NO_MATCHED_MSG:
					break;
				case NO_NEW_MSG:
					break;
				case OFFSET_ILLEGAL:
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	consumer.shutdown();
}

//儲存下次消費消息的位移,這裡将位移儲存到記憶體,也可以使用資料庫
private static void putMessageQueueOffset(MessageQueue mq,
		long nextBeginOffset) {
	OFFSE_TABLE.put(mq, nextBeginOffset);
}

//擷取本次要消費消息的位移
private static Long getMessageQueueOffset(MessageQueue mq) {
	Long offset = OFFSE_TABLE.get(mq);
	if(offset != null){
		return offset;
	}
	return 0l;
}
           

上面的代碼周遊每個隊列,将每個隊列的消息消費完畢之後,切換到下一個隊列消費。如果隊列的消息是源源不斷産生的,那麼可能會導緻後面的隊列長時間無法消費,我将代碼修改如下:

public static void main(String[] args) throws Exception{
        //建立消費者
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer-A");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.start();//啟動
        //擷取主題下所有的消息隊列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
        Object[] objs=mqs.toArray();
        for (int i=0;i<objs.length;i++) {
            MessageQueue mq = (MessageQueue) objs[i];
            //可以用于更新本地記錄的消費位移
            long offset=consumer.fetchConsumeOffset(mq, true);
            putMessageQueueOffset(mq, offset);
        }
        while(true) {
        	//一次隻拉取一部分消息,周遊每個隊列完畢後,在重新周遊一遍
        	//這裡假設生産者不斷産生消息,沒有終止
            for (int i=0;i<objs.length;i++) {
                MessageQueue mq=(MessageQueue)objs[i];
                try {
                    //其他代碼同上
                }
            }
        }
   }
           

可以看到,DefaultMQPullConsumer代碼相對DefaultMQPushConsumer來說,要複雜一些,我們需要手工維護消費位移。而DefaultMQPushConsumer是rocketmq自動維護的。

DefaultMQPullConsumer也提供了多個構造方法,如下:

  • public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook)
  • public DefaultMQPullConsumer(final String namespace, final String consumerGroup)

構造方法每個參數的作用可以參考DefaultMQPushConsumer。