天天看点

【RocketMQ源码分析】消费者

基本使用

首先看看consumer的简单使用:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 使用consumer group初始化DefaultMQPushConsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 指定namesrv
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅topic
        consumer.subscribe("Topic", "*");
        // 注册消费消息的回调
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动consumer
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
           

基本概念

消息获取模式

1.Consumer有两种消费获取模式:

  • DefaultMQPushConsumer:类似于Broker Push消息到Consumer方式,但实际仍然是Consumer内部后台从Broker Pull消息,采用长轮询方式拉消息,实时性同push方式一致,且不会无谓的拉消息导致Broker、Consumer压力增大
  • DefaultMQPullConsumer:主动拉取模式

2.消费模式

广播消费:Rocketmq会将消息发送给Group中的每一个消费者

集群消费:同一条消息,只能被Group中的任意一个消费者消费

3.Topic

RocketMQ中都是通过Topic来发送和消费消息的,但是Topic也仅仅是逻辑上的一个概念,而1个Topic下又包含了若干个逻辑队列,即消息队列,消息内容实际是存放在队列中的(保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引),而队列又存储在Broker中的.

【RocketMQ源码分析】消费者

4.OffsetStore管理消息的Offset

  • LocalFileOffsetStore:广播模式下,每个消费者都会消费所有的Queue,所以他们只需要保存自己的offset就行了,所以本地就行;
  • RemoteBrokerOffsetStore:但是集群模式下,可能会消费者宕机,重启,扩容等情况,所以不能将offset保存到本地,使用RemoteBrokerOffsetStore的话,就可以将offset交给Broker保存,交给Broker保存的方法是persistAll(),这样当消费者正常启动后,从Broker哪里获取这个offset就行了。

DefaultMQPushConsumer启动流程

DefaultMQPushConsumer.start方法里面主要做一些类初始化的工作,先看看DefaultMQPushConsumer启动方法:

public void start() throws MQClientException {
        switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            // 复制订阅关系
            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            this.mQClientFactory =
                    MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
                        this.rpcHook);  //@1

            // 初始化Rebalance变量
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());   //@2
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
                .getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = ClientApisEnhancerFactory.getDefaultMQClientImpl(PullAPIWrapper.class, 
            		mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());  //@3
            // 每次拉消息之后,都会进行一次过滤。
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            }
            else {
                // 广播消费/集群消费
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    this.offsetStore =
                            new LocalFileOffsetStore(this.mQClientFactory,
                                this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                case CLUSTERING:
                    this.offsetStore =
                            new RemoteBrokerOffsetStore(this.mQClientFactory,
                                this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                default:
                    break;
                }
            }
            // 加载消费进度
            this.offsetStore.load();

            // 启动消费消息服务
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this,
                            (MessageListenerOrderly) this.getMessageListenerInner());   //@4
            }
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this,
                            (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();   //@5

            boolean registerOK =
                    mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group["
                        + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please."
                        + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
            }

            mQClientFactory.start();   //@6
            log.info("the consumer [{}] start OK", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                    + this.serviceState//
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
        default:
            break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.mQClientFactory.rebalanceImmediately();
    }
           

@1.实例化MQClientInstance

RocketMQ 在同一个 JVM 进程拥有一个 clientConfigId(客户端ID)该JVM进程中不同的消息消费组的消息客户端ID相同,因为在JVM进程中对于每一个 ClientConfig 只会实例化一个 MQClientInstance。

@2.初始化rebalanceImpl

RebalanceService其实是一个线程的封装,RocetMQ 中通过RebalanceService线程实现消费队列负载。RebalanceService在DefaultMQPushConsumerImpl#start()方法中调用了MQClientInstance#start()方法,在MQClientInstance#start()方法中调用了RebalanceService#start()方法。这个线程就是隔一段时间执行以下MQClientInstance#doRebalance()方法,而具体的doRebalance方法实现是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的属性

@3.初始化pullAPIWrapper、pullMessageService拉取消息

初始化pullAPIWrapper,pullAPIWrapper对Pull接口进行进一步的封装,而pullMessageService是一个线程的封装,开启长轮询拉消息服务,单线程异步拉取,在MQClientInstance#start()方法中调用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息

@4.初始化消息消费服务

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
            new ConsumeMessageOrderlyService(this,
                (MessageListenerOrderly) this.getMessageListenerInner());
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
            new ConsumeMessageConcurrentlyService(this,
                (MessageListenerConcurrently) this.getMessageListenerInner());
}
           

Message保存到ProcessQueue后,会使用ConsumeMessageService消费消息,ConsumeMessageService也是一个接口,它有2个实现,分别是ConsumeMessageOrderlyServiceConsumeMessageConcurrentlyService。根据监听器MessageListener的类型,实例化对应的消息消费服务ConsumeMessageService:

  • ConsumeMessageOrderlyService

    同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费

  • ConsumeMessageConcurrentlyService

    并发消费消息服务

@5.启动消费消息服务

如果消息消费模式为集群模式,启动定时任务,默认每隔20s执行一次锁定分配给自己的消息消费队列。

1:根据当前负载的消息队列,按照 Broker分类存储在Map。负载的消息队列在RebalanceService时根据当前消费者数量与消息消费队列按照负载算法进行分配,然后尝试对该消息队列加锁,如果申请锁成功,则加入到待拉取任务中。

2:根据Broker获取主节点的地址。

3:向Broker发送锁定消息队列请求,该方法会返回本次成功锁定的消息消费队列,关于Broker端消息队列锁定实现见下文详细分析。

4:遍历本次成功锁定的队列来更新对应的ProcessQueue的locked状态,如果locked为false,则设置成true,并更新锁定时间。

5:遍历mqs,如果消息队列未成功锁定,需要将ProceeQueue的locked状态为false,在该处理队列未被其他消费者锁定之前,该消息队列将暂停拉取消息。

@6.启动MQClientInstance

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                }

                this.mQClientAPIImpl.start();
                this.startScheduledTask();
                this.pullMessageService.start();
                this.rebalanceService.start();

                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId()
                        + "] has been created before, and failed.", null);
            default:
                break;
            }
        }
    }
           
  • this.mQClientAPIImpl.start(); //远程通信,NettyRemotingClient 启动
  • this.startScheduledTask(); //1.定时获取Name Server地址 2.定时从Name Server获取Topic路由信息 3.定时清理下线的Broker,向所有Broker发送心跳信息 4.定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker)
  • this.pullMessageService.start(); //轮训拉取消息

    pullMessageService是一个线程的封装,开启长轮询拉消息服务,单线程异步拉取,在MQClientInstance#start()方法中调用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息

  • this.rebalanceService.start(); //重平衡

    RebalanceService其实是一个线程的封装,RocketMQ 中通RebalanceService线程实现消费队列负载。RebalanceService在DefaultMQPushConsumerImpl#start()方法中调用了MQClientInstance#start()方法,在MQClientInstance#start()方法中调用了RebalanceService#start()方法。这个线程就是隔一段时间执行以下MQClientInstance#doRebalance()方法,而具体的doRebalance方法实现是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的属性

继续阅读