天天看點

RocketMQ實戰_03 消費者Consumer與Spring整合RocketMQ實戰_03 消費者Consumer與Spring整合

RocketMQ實戰_03 消費者Consumer與Spring整合

有些路隻能自己一個人去走,有些關口隻能自己一個人去闖

一、建立Consumer類

package com.slx.trade.common.rocketmq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.slx.trade.common.exception.SlxMQException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Author: slx
 * @Date: 2019/3/28 12:52
 */
public class SlxMQConsumer {
    public static final Logger LOGGER = LoggerFactory.getLogger(SlxMQConsumer.class);
    private String groupName;
    private String topic;
    private String tag = "*"; //多個tag以||分割
    private String namesvrAddr;
    //最小線程數
    private int consumeThreadMin = 10;
    //最大線程數
    private int consumeThreadMax = 64;

    private IMessageProcessor iMessageProcessor;

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public String getNamesvrAddr() {
        return namesvrAddr;
    }

    public void setNamesvrAddr(String namesvrAddr) {
        this.namesvrAddr = namesvrAddr;
    }

    public int getConsumeThreadMin() {
        return consumeThreadMin;
    }

    public void setConsumeThreadMin(int consumeThreadMin) {
        this.consumeThreadMin = consumeThreadMin;
    }

    public int getConsumeThreadMax() {
        return consumeThreadMax;
    }

    public void setConsumeThreadMax(int consumeThreadMax) {
        this.consumeThreadMax = consumeThreadMax;
    }

    public IMessageProcessor getiMessageProcessor() {
        return iMessageProcessor;
    }

    public void setiMessageProcessor(IMessageProcessor iMessageProcessor) {
        this.iMessageProcessor = iMessageProcessor;
    }

    public void init() throws SlxMQException {
        if (StringUtils.isBlank(groupName)) {
            throw new SlxMQException("groupName is blank");
        }
        if (StringUtils.isBlank(topic)) {
            throw new SlxMQException("topic is blank");
        }
        if (StringUtils.isBlank(namesvrAddr)) {
            throw new SlxMQException("namesvrAddr is blank");
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.groupName);
        consumer.setNamesrvAddr(this.namesvrAddr);

        try {
            consumer.subscribe(this.topic,this.tag);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setConsumeThreadMin(this.consumeThreadMin);
            consumer.setConsumeThreadMax(this.consumeThreadMax);
            SlxMessageListener slxMessageListener = new SlxMessageListener();
            slxMessageListener.setiMessageProcessor(this.iMessageProcessor);
            consumer.registerMessageListener(slxMessageListener);
            consumer.start();
            LOGGER.info("consumer is start! groupName:{},topic:{},namesrvAddr:{}",groupName,topic,namesvrAddr);

        } catch (MQClientException e) {
            LOGGER.error("consumer is error! groupName:{},topic:{},namesrvAddr:{}",groupName,topic,namesvrAddr);
            throw new SlxMQException(e);
        }

    }
}
           

處理消息接口

public interface IMessageProcessor {
    /**
     * 處理消息
     * @param messageExt
     * @return
     */
    boolean handleMessage(MessageExt messageExt);
}
           

接口的簡單實作

/**
 * @Author: slx
 * @Date: 2019/3/28 13:34
 */
public class TestProcessor implements IMessageProcessor {
    public boolean handleMessage(MessageExt messageExt) {
        System.out.println("收到消息:" + messageExt.toString());
        return true;
    }
}
           

消息監聽類

/**
 * @Author: slx
 * @Date: 2019/3/28 13:19
 */
public class SlxMessageListener implements MessageListenerConcurrently {

    private IMessageProcessor iMessageProcessor;

    public void setiMessageProcessor(IMessageProcessor iMessageProcessor) {
        this.iMessageProcessor = iMessageProcessor;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt messageExt : list) {
            boolean result =iMessageProcessor.handleMessage(messageExt);
            if (!result) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
           

二、配置檔案,整合spring與rocketmq-producer

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context-4.0.xsd">

    <bean id="iMessageProcessor" class="TestProcessor"></bean>
    <bean id="slxMQConsumer" class="com.slx.trade.common.rocketmq.SlxMQConsumer" init-method="init">
        <property name="groupName" value="SlxConsumerGroup"></property>
        <property name="topic" value="TestTopic"></property>
        <property name="namesvrAddr" value="192.168.0.121:9876;192.168.0.128:9876"></property>
        <property name="iMessageProcessor" ref="iMessageProcessor"></property>
    </bean>

</beans>
           

三、測試驗證

1.測試代碼

/**
 * @Author: slx
 * @Date: 2019/3/28 14:40
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:xml/spring-rocketmq-consumer.xml")
public class TestConsumer {

    @Test
    public void testProducer() throws InterruptedException {
        Thread.sleep(100L);
    }
}
           

2.測試結果

14:50:42.634 [main] INFO com.slx.trade.common.rocketmq.SlxMQConsumer - consumer is start! groupName:SlxConsumerGroup,topic:TestTopic,namesrvAddr:192.168.0.121:9876;192.168.0.128:9876
收到消息:MessageExt [queueId=0, storeSize=154, queueOffset=0, sysFlag=0, bornTimestamp=1553746914330, bornHost=/192.168.0.107:51202, storeTimestamp=1553746874396, storeHost=/192.168.0.121:10911, msgId=C0A8007900002A9F00000000000012BC, commitLogOffset=4796, bodyCRC=1976694453, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TestTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=123456, WAIT=true, TAGS=order}, body=21]]