本文基于RocketMQ 4.7.1版本
rocketmq提供了兩個類用于消費消息,分别是DefaultMQPullConsumer和DefaultMQPushConsumer,下面分别介紹如何使用這兩個類。
文章目錄
- 一、DefaultMQPushConsumer
- 二、DefaultMQPullConsumer
一、DefaultMQPushConsumer
public static void main(String[] args) throws MQClientException {
//建立消費者,消費組為consumer-A
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-A");
//指定nameserver位址,可以有多個,使用分号分隔
consumer.setNamesrvAddr("localhost:9876");
//設定訂閱的主題,第二個參數表示該消費者可以消費哪些tag的消息,tag是生産者生産消息時标記的
//*或者null表示接收所有的tag消息,可以使用“tag1||tag2”過濾消息,而且隻支援中間使用||分隔
consumer.subscribe("topicTest", "*");
//設定監聽器,當主題中有消息時,調用監聽器消費消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs.get(0).getQueueId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消費者
consumer.start();
System.out.printf("Consumer Started.%n");
}
DefaultMQPushConsumer提供了多個重載構造函數,例如:
- public DefaultMQPushConsumer(final String consumerGroup)
- public DefaultMQPushConsumer()
- public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook)
- public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy)
- public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
上面的無參構造方法由于沒有指定消費組,是以使用預設的消費組:DEFAULT_CONSUMER。不過在啟動消費者的時候,rocketmq會檢查組名,如果組名等于預設消費組,rocketmq會報錯,是以如果使用預設構造方法建立對象後,還需要調用setConsumerGroup()方法重新設定組名。
下面介紹構造方法裡面每個參數的作用。
- consumerGroup:指定消費者的消費組,每個消費者都屬于一個消費組,組内的每個消費者消費主題的一個或多個隊列,一個隊列隻能有一個消費者消費,rocketmq提供了再平衡機制保證當有多個消費者時,如果其中一個挂掉,可以将它消費的隊列配置設定給其他消費者;
- namespace:命名空間,與訂閱的主題名組合在一起,形成最終的主題名;
- rpcHook:該類提供了doBeforeRequest和doAfterResponse方法,用于在發送請求前和收到broker響應後對請求和響應内容做回調處理。
- allocateMessageQueueStrategy:再平衡時使用,用于将主題隊列配置設定給各個消費者。
- enableMsgTrace和customizedTraceTopic:用于消息軌迹,本文不對此介紹。
我們可以根據需求選擇不同的構造方法。
二、DefaultMQPullConsumer
private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
public static void main(String[] args) throws MQClientException {
//建立消費者
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer-A");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();//啟動
//擷取主題下所有的消息隊列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
for(MessageQueue mq:mqs){
try {
//擷取目前隊列的消費位移,第二個參數表示位移是從本地記憶體擷取,還是從broker擷取
//true表示從broker擷取
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
//第二個參數表示可以消費哪些tag的消息
//第三個參數表示從哪個位移開始消費消息
//第四個參數表示一次最大拉多少個消息
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
//根據拉取消息的狀态,切換到不同分支
switch(pullResult.getPullStatus()){
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
//消費消息
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}
//儲存下次消費消息的位移,這裡将位移儲存到記憶體,也可以使用資料庫
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
//擷取本次要消費消息的位移
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if(offset != null){
return offset;
}
return 0l;
}
上面的代碼周遊每個隊列,将每個隊列的消息消費完畢之後,切換到下一個隊列消費。如果隊列的消息是源源不斷産生的,那麼可能會導緻後面的隊列長時間無法消費,我将代碼修改如下:
public static void main(String[] args) throws Exception{
//建立消費者
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer-A");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();//啟動
//擷取主題下所有的消息隊列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
Object[] objs=mqs.toArray();
for (int i=0;i<objs.length;i++) {
MessageQueue mq = (MessageQueue) objs[i];
//可以用于更新本地記錄的消費位移
long offset=consumer.fetchConsumeOffset(mq, true);
putMessageQueueOffset(mq, offset);
}
while(true) {
//一次隻拉取一部分消息,周遊每個隊列完畢後,在重新周遊一遍
//這裡假設生産者不斷産生消息,沒有終止
for (int i=0;i<objs.length;i++) {
MessageQueue mq=(MessageQueue)objs[i];
try {
//其他代碼同上
}
}
}
}
可以看到,DefaultMQPullConsumer代碼相對DefaultMQPushConsumer來說,要複雜一些,我們需要手工維護消費位移。而DefaultMQPushConsumer是rocketmq自動維護的。
DefaultMQPullConsumer也提供了多個構造方法,如下:
- public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook)
- public DefaultMQPullConsumer(final String namespace, final String consumerGroup)
構造方法每個參數的作用可以參考DefaultMQPushConsumer。