简单使用
public class SyncProducer {
public static void main(String[] args) throws Exception {
//使用ProducerGroup初始化Producer
DefaultMQProducer producer = new
DefaultMQProducer("group");
// 指定namesrv
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
//创建消息
Message msg = new Message("Topic" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//关闭Producer
producer.shutdown();
}
}
上面是同步发送消息的简单使用,主要是初始化DefaultMQProducer,调用start()方法和send()方法.接下来具体分析生产者发送消息的具体过程。
DefaultMQProducer
DefaultMQProducer就是消息生产者的客户端,DefaultMQProducer的定义如下:
public class DefaultMQProducer extends ClientConfig implements MQProducer {
...
}
可以看到DefaultMQProducer实现了MQProducer接口并继承了ClientConfig类。
-
MQProducer接口
在MQProducer接口定义了Producer的一些操作,并且继承了MQAdmin,在MQAdmin主要是一些管理功能接口,MQProducer中定义了各种发送消息的结构,同步发送,异步发送,oneway发送,批量发送和事务消息的发送。
public interface MQProducer extends MQAdmin {
void start() throws MQClientException;
void shutdown();
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
// 省略一系列send方法
// 失误相关的发送消息
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
// 省略一系列的batch send的方法
}
public interface MQAdmin {
void createTopic(final String key, final String newTopic, final int queueNum)
throws MQClientException;
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
throws MQClientException;
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
long maxOffset(final MessageQueue mq) throws MQClientException;
long minOffset(final MessageQueue mq) throws MQClientException;
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
-
ClientConfig类
ClientConfig根据类名称大概也能猜到,主要是负责一些配置信息。
public class ClientConfig {
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
protected AccessChannel accessChannel = AccessChannel.LOCAL;
private int pollNameServerInterval = 1000 * 30;
private int heartbeatBrokerInterval = 1000 * 30;
private int persistConsumerOffsetInterval = 1000 * 5;
private boolean unitMode = false;
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
private boolean useTLS = TlsSystemConfig.tlsEnable;
private LanguageCode language = LanguageCode.JAVA;
}
大致了解了一下DefaultMQProducer的结构,DefaultMQProducer有两个核心方法start()、send()方法。
start()
DefaultMQProducer初始化完成后,会调用start()方法,start方法主要是负责建立网络通信、初始化配置。它的具体实现最终是交给defaultMQProducerImpl#start()方法了。
public void start(final boolean startFactory) throws MQClientException {
// 根据serviceState处理不同逻辑,初始化都是CREATE_JUST
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查配置信息,主要是ProducerGroup的,不重要
this.checkConfig();
// 判断ProducerGroup是否是CLIENT_INNER_PRODUCER_GROUP,如果是将 ClientConfig#instanceName 改为进程PID,什么情况下是CLIENT_INNER_PRODUCER_GROUP呢?接下来会接触到,由于我们都会指定自己的group,所以这里会进入到if语句里面修改ClientConfig#instanceName为进程PID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 创建一个MQClientInstance,这个是重点,就是它完成了和broker的通信
// 创建完成后MQClientManager会保存ClientId -> MQClientInstance的映射
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册Producer,就是将 ProducerGroup -> DefaultMQProducerImpl 的关系保存到MQClientInstance中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
// 如果注册失败,抛异常结束,终止启动
}
// topicPublishInfoTable 初始化一个topic的路由信息,这个CreateTopicKey就是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC,这个topic和Broker的AutoCreateTopicEnable进行配置使用,后面会详细介绍
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// startFactory这时是true,所以会调用MQClientInstance的start()方法
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
// 这里省略了一些 case 逻辑的处理,都是不重要的,逻辑都在CREATE_JUST这个case下面
}
// 发送心跳数据,这个后面再看吧
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
这里会实例化一个核心的类MQClientInstance,RocketMQ 在同一个 JVM 进程拥有一个 clientConfigId(客户端ID)该JVM进程中不同的消息消费组的消息客户端ID相同,因为在JVM进程中对于每一个ClientConfig 只会实例化一个 MQClientInstance。实例化完成之后会start启动该实例,看一下具体的start方法:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
//远程通信,NettyRemotingClient 启动
this.mQClientAPIImpl.start();
//1.定时获取Name Server地址 2.定时从Name Server获取Topic路由信息 3.定时清理下线的Broker,向所有Broker发送心跳信息 4.定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker)
this.startScheduledTask();
// 开启轮训拉取消息
this.pullMessageService.start();
// 开启重平衡
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
- this.mQClientAPIImpl.start(); //远程通信,NettyRemotingClient 启动
//获取NameServer地址
public List<String> getNameServerAddressList() {
return this.remotingClient.getNameServerAddressList();
}
public RemotingClient getRemotingClient() {
return remotingClient;
}
public String fetchNameServerAddr() {
try {
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
//更新NameServer地址
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List<String> list = Arrays.asList(addrArray);
this.remotingClient.updateNameServerAddressList(list);
}
public void start() {
this.remotingClient.start();
}
public void shutdown() {
this.remotingClient.shutdown();
}
//创建topic
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
}
//发送消息
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
//异步发送消息
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
}
//拉取消息
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
//异步拉取消息
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
}
//检查Broker是否正常
public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
}
可以看到MQClientAPIImpl类封装了所有与服务器通信部分API,我们随便找一个发送请求的方法看看,可以发现,MQClientAPIImpl中的请求都是先构造RemotingCommand,然后通过NettyRemotingClient发送请求,最后处理响应数据。
- this.startScheduledTask(); //1.定时获取Name Server地址 2.定时从Name Server获取Topic路由信息 3.定时清理下线的Broker,向所有Broker发送心跳信息 4.定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker)
-
this.pullMessageService.start(); //轮训拉取消息
pullMessageService是一个线程的封装,开启长轮询拉消息服务,单线程异步拉取,在MQClientInstance#start()方法中调用了pullMessageService#start()。pullAPIWrapper.pullKernelImpl方法是真正的拉取消息
-
this.rebalanceService.start(); //重平衡
RebalanceService其实是一个线程的封装,RocketMQ 中通过RebalanceService线程实现消费队列负载。RebalanceService在DefaultMQPushConsumerImpl#start()方法中调用了MQClientInstance#start()方法,在MQClientInstance#start()方法中调用了RebalanceService#start()方法。这个线程就是隔一段时间执行以下MQClientInstance#doRebalance()方法,而具体的doRebalance方法实现是由RebalanceImpl去完成的,RebalanceImpl是DefaultMQPushConsumerImpl中的属性
send()
send方法最终也是交给DefaultMQProducerImpl.sendDefaultImpl方法实现的,看看具体实现:
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 检查状态和消息
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 获取topic的路由数据,这里是重点,这里就是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC和Broker的AutoCreateTopicEnable进行配置如何配合的?的答案了
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); //@1
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 如果是同步发送,会有重试机制
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 这里是发送消息负载均衡的关键
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); //@2
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 真正干活的老弟在这
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); //@3
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); //@4
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch () {
// 这里是一对catch Exception,省略了
}
} else {
break;
}
}
// 如果发送成功,返回
if (sendResult != null) {
return sendResult;
}
// 这里还有一堆处理异常的逻辑,也省略了
throw mqClientException;
}
// 没有路由信息的一些异常处理
}
首先看一下参数
- Message:这个就是我们要发送的消息数据,没啥说的
- CommunicationMode:就是发送消息的方式,同步?异步?还是Oneway?
- SendCallback:这个是异步发送回调用的
- timeout:超时时间
@1.tryToFindTopicPublishInfo方法
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 根据topic从Map中直接获取路由数据,Producer启动时肯定是啥都没有的
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
// 直接put进去一个空的TopicPublishInfo
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从namesrv获取这个topic的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 如果从namesrv中拿到路由数据了直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else { // namesrv中也没有再从namesrv中获取?头铁?不撞南墙不回头?
// 这个updateTopicRouteInfoFromNameServer和上面的那个方法是不同的逻辑了,并不是重试
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
这个方法主要是实现根据topic获取路由信息,而最终是交给MQClientInstance.updateTopicRouteInfoFromNameServer,再交给MQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer方法调用Name Server接口,根据Topic获取路由信息。前面我们分析过MQClientAPIImpl类,这个类封装了所有网络通讯的接口,这里可以体现出来。
@2.selectOneMessageQueue
获取到Topic的路由信息后,会调用selectOneMessageQueue()方法获取一个MessageQueue。由于集群模式下Broker有多个,所以这里Broker也会存储多个queue,这里就需要从多个queue中获取一个MessageQueue,这就涉及到一个负载均衡。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// Broker故障延迟机制,默认是false,不开启,后面再分析
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 没有开启Broker故障延迟机制会走这里
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
可以发现,普通情况下的负载均衡很简单,就是通过自增的一个index和messageQueueList.size()求余实现的,由于同步发送有重试机制,所以重试时会判断是否是上次发送失败的broker,如果是会跳过这个有问题的broker。
@3.sendKernelImpl
sendKernelImpl方法就是具体的发送消息的逻辑了
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 根据broker name获取master的地址,发布消息过程中,寻找Broker地址,一定是找Master
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
// 如果没有获取到master的地址,刷新路由信息
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 判断是不是VIP,如果配置了是VIP发送消息,name会向broker的另外一个端口发送消息
// VIP走小红帽通道
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
// 省略了现在我们不关心的内容
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
// 省略了现在我们不关心的内容
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
return sendResult;
} catch () {
// 省略了现在我们不关心的内容
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
最终是交给MQClientAPIImpl.sendMessage去发送消息,可以看到发送消息给broker都是给master发的,也就是消费者主要同broker的master建立连接。
@4.updateFaultItem-Broker故障延迟机制
我们看看RocketMQ的Broker故障延迟机制是怎么减少前面提到的没必要的调用的
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// latencyFaultTolerance这里就是实现Broker故障延迟机制的关键了
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 到这里都还没找到合适的MessageQueue,那么也只能随便挑一个了,总得给上级一个交代是吧
// pickOneAtLeast()就不再细说了,大家可以自行了解,反正大概就是从差的里面再挑一个好的出来
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
可以看到,进入if里面后,和普通机制的处理差不多,会选择一个MessageQueue,但是这里调用了LatencyFaultTolerance对选择的
MessageQueue的broker进行了判断,如果可用的话,才会继续使用,不可用的话继续选择下一个
那么我们看看LatencyFaultTolerance#isAvailable()怎么做的吧?
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
逻辑比较简单,从faultItemTable中根据broker name获取到FaultItem对象,如果FaultItem不存在说明这个broker是可用的,如果FaultItem不为空,根据FaultItem#isAvailable()判断是否可用,如果消息发送成功会更新faultItemTable。
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
发送失败了也会更新
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
...
}
参考:https://www.jianshu.com/p/4e63d2143351