基本使用
首先看看consumer的简单使用:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 使用consumer group初始化DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 指定namesrv
consumer.setNamesrvAddr("localhost:9876");
// 订阅topic
consumer.subscribe("Topic", "*");
// 注册消费消息的回调
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
基本概念
消息获取模式
1.Consumer有两种消费获取模式:
- DefaultMQPushConsumer:类似于Broker Push消息到Consumer方式,但实际仍然是Consumer内部后台从Broker Pull消息,采用长轮询方式拉消息,实时性同push方式一致,且不会无谓的拉消息导致Broker、Consumer压力增大
- DefaultMQPullConsumer:主动拉取模式
2.消费模式
广播消费:Rocketmq会将消息发送给Group中的每一个消费者
集群消费:同一条消息,只能被Group中的任意一个消费者消费
3.Topic
RocketMQ中都是通过Topic来发送和消费消息的,但是Topic也仅仅是逻辑上的一个概念,而1个Topic下又包含了若干个逻辑队列,即消息队列,消息内容实际是存放在队列中的(保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引),而队列又存储在Broker中的.
4.OffsetStore管理消息的Offset
- LocalFileOffsetStore:广播模式下,每个消费者都会消费所有的Queue,所以他们只需要保存自己的offset就行了,所以本地就行;
- RemoteBrokerOffsetStore:但是集群模式下,可能会消费者宕机,重启,扩容等情况,所以不能将offset保存到本地,使用RemoteBrokerOffsetStore的话,就可以将offset交给Broker保存,交给Broker保存的方法是persistAll(),这样当消费者正常启动后,从Broker哪里获取这个offset就行了。
DefaultMQPushConsumer启动流程
DefaultMQPushConsumer.start方法里面主要做一些类初始化的工作,先看看DefaultMQPushConsumer启动方法:
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
// 复制订阅关系
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
this.rpcHook); //@1
// 初始化Rebalance变量
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); //@2
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = ClientApisEnhancerFactory.getDefaultMQClientImpl(PullAPIWrapper.class,
mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); //@3
// 每次拉消息之后,都会进行一次过滤。
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
}
else {
// 广播消费/集群消费
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore =
new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore =
new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
// 加载消费进度
this.offsetStore.load();
// 启动消费消息服务
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner()); //@4
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start(); //@5
boolean registerOK =
mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group["
+ this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
mQClientFactory.start(); //@6
log.info("the consumer [{}] start OK", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
@1.实例化MQClientInstance
RocketMQ 在同一个 JVM 进程拥有一个 clientConfigId(客户端ID)该JVM进程中不同的消息消费组的消息客户端ID相同,因为在JVM进程中对于每一个 ClientConfig 只会实例化一个 MQClientInstance。
@2.初始化rebalanceImpl
RebalanceService其实是一个线程的封装,RocetMQ 中通过RebalanceService线程实现消费队列负载。RebalanceService在DefaultMQPushConsumerImpl#start()方法中调用了MQClientInstance#start()方法,在MQClientInstance#start()方法中调用了RebalanceService#start()方法。这个线程就是隔一段时间执行以下MQClientInstance#doRebalance()方法,而具体的doRebalance方法实现是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的属性
@3.初始化pullAPIWrapper、pullMessageService拉取消息
初始化pullAPIWrapper,pullAPIWrapper对Pull接口进行进一步的封装,而pullMessageService是一个线程的封装,开启长轮询拉消息服务,单线程异步拉取,在MQClientInstance#start()方法中调用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息
@4.初始化消息消费服务
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
Message保存到ProcessQueue后,会使用ConsumeMessageService消费消息,ConsumeMessageService也是一个接口,它有2个实现,分别是ConsumeMessageOrderlyServiceConsumeMessageConcurrentlyService。根据监听器MessageListener的类型,实例化对应的消息消费服务ConsumeMessageService:
-
ConsumeMessageOrderlyService
同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费
-
ConsumeMessageConcurrentlyService
并发消费消息服务
@5.启动消费消息服务
如果消息消费模式为集群模式,启动定时任务,默认每隔20s执行一次锁定分配给自己的消息消费队列。
1:根据当前负载的消息队列,按照 Broker分类存储在Map。负载的消息队列在RebalanceService时根据当前消费者数量与消息消费队列按照负载算法进行分配,然后尝试对该消息队列加锁,如果申请锁成功,则加入到待拉取任务中。
2:根据Broker获取主节点的地址。
3:向Broker发送锁定消息队列请求,该方法会返回本次成功锁定的消息消费队列,关于Broker端消息队列锁定实现见下文详细分析。
4:遍历本次成功锁定的队列来更新对应的ProcessQueue的locked状态,如果locked为false,则设置成true,并更新锁定时间。
5:遍历mqs,如果消息队列未成功锁定,需要将ProceeQueue的locked状态为false,在该处理队列未被其他消费者锁定之前,该消息队列将暂停拉取消息。
@6.启动MQClientInstance
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
}
this.mQClientAPIImpl.start();
this.startScheduledTask();
this.pullMessageService.start();
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId()
+ "] has been created before, and failed.", null);
default:
break;
}
}
}
- this.mQClientAPIImpl.start(); //远程通信,NettyRemotingClient 启动
- this.startScheduledTask(); //1.定时获取Name Server地址 2.定时从Name Server获取Topic路由信息 3.定时清理下线的Broker,向所有Broker发送心跳信息 4.定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker)
-
this.pullMessageService.start(); //轮训拉取消息
pullMessageService是一个线程的封装,开启长轮询拉消息服务,单线程异步拉取,在MQClientInstance#start()方法中调用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息
-
this.rebalanceService.start(); //重平衡
RebalanceService其实是一个线程的封装,RocketMQ 中通RebalanceService线程实现消费队列负载。RebalanceService在DefaultMQPushConsumerImpl#start()方法中调用了MQClientInstance#start()方法,在MQClientInstance#start()方法中调用了RebalanceService#start()方法。这个线程就是隔一段时间执行以下MQClientInstance#doRebalance()方法,而具体的doRebalance方法实现是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的属性