异步
原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的 下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去拉取消息队列中的东西进行处理.大大减少时间
解耦:增加积分,发送短信这些可以单独拆分出来,需要使用直接发送到知道的消息队列就行,你只需要关注你当前的业务
削峰: 如果使用线程池来解决,一个服务一个线程在高峰期你的mysql或者redis可能撑不住,使用mq就可以限制主机每次只拉取多少条进行处理
可用性降低
引入了mq,一旦mq宕机对业务有影响
复杂度提高
数据链路变得复杂,如何保证顺序性,不重复消费
一致性问题
用户支付了,增加积分出错该怎么处理
nameserver 相当于注册中心,连接从这里取ip
broker 消息仓库,里面有topic与队列
product,consumer生产者消费者
基本的环境<code>yum install java-1.8.0-openjdk-devel.x86_64 wget vim unzip -y</code>
下载mq安装包<code>wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip</code>
解压缩<code>unzip rocketmq-all-4.7.1-bin-release.zip -d /usr/local/</code>
启动nameserver服务
<code>vim bin/runserver.sh</code>
默认堆初始化最大都是4g,新生代2g,测试机没这么内存,不修改无法启动,改为256m,新生代128m<code>JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"</code>
后台启动<code>nohup bin/mqnamesrv > n1.out &</code>
启动broker服务
<code>vim bin/runbroker.sh</code>
默认堆初始化最大都是8g,新生代4g,测试机没这么内存,不修改无法启动,改为512m,新生代256m<code>JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"</code>
暴露namserver地址<code>echo 'export NAMESRV_ADDR=localhost:9876' >> ~/.bash_profile</code>
后台启动<code>nohup bin/runbroker.sh >n2.out &</code>
日志验证
n1.out <code>The Name Server boot success. serializeType=JSON</code>
n2.out <code>The broker[localhost.localdomain, 192.168.147.134:10911] boot success. serializeType=JSON and name server is localhost:9876</code>
发送接收测试
发送<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Producer</code>
接收<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer</code>
关闭
关闭nameserver服务<code>bin/mqshutdown namesrv</code>
关闭broker服务<code>bin/mqshutdown broker</code>
4种高可用集群
多master模式
优点:配置简单,性能最高
缺点:单个宕机,这台机器上违背消费的消息不可订阅
多master多salve 异步复制
优点:消息丢失少(异步复制),消息实时性不受到影响,master宕机可以从slave上消费,性能与多master基本一致
缺点:master宕机下会丢失少量消息
多master多salve 同步双写
优点:master宕机,消息无延迟,可用性高
缺点:性能有所丢失
dledger模式:4.5版本之前采用master-slave架构部署但是master挂掉都slave无法自动晋升为master,这种模式可以将多个master-slave组成一个组,当组内master挂了将选举一个master继续服务
修改vim conf/2m-2s-async/broker-a.properties配置文件
将broker-a.properties写入到broker-b-s.properties修改brokerName,brokerId,brokerRole和几个文件存储路径,同一台虚拟机注意端口号也需要修改
克隆当前虚拟机,修改broker-a-s.properties,broker-b.properties文件
修改host文件<code>vim /etc/hosts</code>
启动两台nameserver<code>nohup bin/mqnamesrv >n1.out &</code>
启动broker,使用-c指定配置文件<code>nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties >nb.out &</code>
关闭防火墙或者开放9876,两个broker服务的端口<code>firewall-cmd --zone=public --add-port=9876/tcp --add-port=10911/tcp --add-port=11011/tcp --permanent``firewall-cmd --reload</code>
四个broker服务都启动后验证集群<code>bin/mqadmin clusterList -n work1:9876</code>
这里可能会报错<code>Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available</code>
解决方法:出处:一篇文章彻底解决RocketMq的疑难杂症之:org.apache.rocketmq.client.exception.MQClientException: No route info of thi<code>cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/lib/ext/sunjce_provider.jar /usr/local/rocketmq-all-4.7.1-bin-release/lib</code>具体以实际目录为准
成功:两主两从

项目地址rocketmq-dashboard
项目克隆<code>git clone https://github.com/apache/rocketmq-dashboard.git</code>
打开rocketmq-console导入idea,修改application.properties文件<code>rocketmq.config.namesrvAddr=work1:9876;work2:9876</code>以实际情况修改
打包项目上传jar包,启动<code>nohup java -jar rocketmq-console-ng-2.0.0.jar &</code>
打开浏览器访问当前服务器8080端口
快速演示
在<code>bin/dledger/fast-try.sh</code>快速演示的脚本,但脚本给一个broker的内存是1g,虚拟机没有这么大修改一下
这里我修改为256m
启动<code>bin/dledger/fast-try.sh start</code>
查看集群情况<code>bin/mqadmin clusterList -n 127.0.0.1:9876</code>
查询master节点进程号并把它kill,看slave是否能转为master
实际搭建
配置文件增加一下几条
集群搭建成功
直接把135主机关机了
切换成功
producer生产消息,consumer消费消息,broker存储消息,每个broker对于一台服务器,每个broker可以存储多个opic消息,每个topic消息也可以分片存储于不同的broker上,message queue用于存储多个消息的物理地址,每个topic消息存储于多个message queue中
producer负责生产消费,将消费者消息发送到broker上,有多种发送方式:同步发送,异步发送,顺序发送,单向发送,同步与异步需要broker返回确认消息,单向发送不需要。同一类producer组成一个集合为生产组发送同一类消息且逻辑一致,如果有异常,broker服务器会联系同一生产者组提交或回滚
consumer消息者形式分为两种:
拉取式:主动式消费,消费者调用拉取的方法
推动式消费:broker有数据就会推给消费者
消费者组必须订阅同一个topic,消息模式两种:
集群消费模式:平摊消费
广播消费模式:共享消费
每个topic若干个消息,每个消息只能有一个主题,同一个topic下的数据分片保存到不同的broker,每个分片单位是messageQueue
几个模块
remoting module:处理来自clients的请求
client manager:负责管理客户端和维护消费者的topic订阅信息
store service:处理消息的存储查询功能
ha service:高可用服务,负责master与slave的数据同步
index service:索引服务,以提高查询
普通集群
每个节点固定角色,master负责响应客户端请求并存储消息,slave负责同步数据并响应客户端部分读请求
dledger高可用集群
dledger
接管broker的commitlog消息存储
选举leader节点
完成消息同步
多副本消息同步
leader收到消息会将消息标记为uncommited状态,发给follower,follower收到消息后需要给leader返回一个ack,如果有超过半数的follower返回ack就会把消息改为commited状态,发给follower
leader选举机制
每个节点有三个状态,leader,follower,candidate(候选人)
每个时间点叫做term
集群启动时,每个节点都是follower,集群内部发送一个timeout信号,follower转为候选人,发起投票后收到半数以上的投票晋升为leader,
选举过程,集群启动,三个节点都是follower,三个节点都给自己投票,term都是1,三个节点随机休眠,a启动term加一为2,第二个节点醒来,发现a的term比自己大,承认a是leader,c同理
充当路由消息的提供者,broker会在启动时向nameserver注册自己的服务信息,后续通过心跳维护当前服务的可用性,生产者或消费者通过名字服务查找各主题消息相应的broker ip列表
每个消息都必须拥有一个topic,每个消息拥有唯一的message id,且可以携带业务标识key, 可以为消息设置一个tag标签
时间
mq收到消息标记为uncommit状态发给follower,follower收到消息,发给leader一个ack,超过半数follower返回ack,消息改为commit状态,存储,状态同步给follower
mqpush消息给消费者,等待消费者ack响应,标记为已消费,没有标记消息会重复推送
mq会定期删除一些过期的消息
存储介质:磁盘文件(采用顺序读写,保证存储的速度,采用mmap的方式,省去上下文切换,提高速度)
commitlog:存储消息元数据,每个文件1个G
consumerQueue:消息队列,保存commitlog的索引
indexFile:提供通过key或时间来查询消息的方法
同步刷盘:消息写入机器的内存时,通知刷盘线程刷盘,等待刷盘线程写入完成后唤醒线程,返回写入完成
优点:稳定安全
缺点:性能不如异步
异步刷盘:消息写入内存后,返回写入完成,当内存累计到一定程度是统一触发刷盘操作
优点:吞吐量大
缺点:一旦服务器断电丢失部分消息
同步复制:生产者发送消息,只有master与slave(半数slave)写入成功才反馈生产者写入成功
异步复制:生产者发送消息,只要master写入消息成功,就反馈生产者写入成功,再异步将消息同步到slave
生产者负载均衡:
生产者发送消息时,获取当前topic下所有broker集合,采用取模递增算法将消息往不同的broker上发送
消费者负载均衡
集群模式:六种分配算法
AllocateMachineRoomNearby:同机房的消费者与broker分配一起
AllocateMessageQueueAveragely:平均分配,将所有消息队列平均分配给消费者,先算数后分配
AllocateMessageQueueAveragelyByCircle:先轮流给消费者分配一个队列,后面再增加
AllocateMessageQueueByConfig:直接指定所有队列
AllocateMessageQueueByMachineRoom:按逻辑机房进行分配
AllocateMessageQueueConsistentHash:
广播模式:每个消费者分配所有的队列
广播模式下不存在消息重试,会直接消费下一条
如何重试
消息监听器中配置
返回Action.ReconsumeLater
返回null
抛出异常
不重试返回Action.CommitMessage
重试处理
重试的消息会进入“%RETRY%”+ConsumeGroup队列,最多16次,16次后会进入死信队列,可配置例如20次,16次后酶促间隔2h
16次每次间隔10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
messageId
老版本中,无论重试多少次messageId是相同的,4.7.1中每次重试messageId会重建
配置覆盖
最大重试次数对同一个消费组实例有效,最后启动的消费者会覆盖之前的配置
一个死信队列对于一个消费组,而不是一个消费者
一个消费组不需要死信队列是不会创建死信队列的
一个死信队列包含这个消费组所有无法消费的消息,不区分主题
消息无法再被消费者正常消费
默认存储3天,不管是否消费被删除
默认死信队列中的消息无法读取,需要将权限配置为6
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。支付时重复提交了多次但最后还是只支付了一次的钱
三种实现语义
at most once:每条消息最多消费一次
at least once:每条消息至少消费一次
exactly one:确定消费一次
rocketmq支持at least once语义
消息重复情况
发送重复:消息发送到服务端并且持久化了,网络断开或者宕机了,生产者判断发送失败了会造次发送
投递重复:消费者收到消息并完成业务处理了,准备发送消息接收时宕机了,服务端在恢复后会再次发送一遍这个消息
负载均衡时消息重复:broker服务重启,扩容,缩容会触发rebalance造成消费者收到重复的消息
解决:
业务唯一标识:例如订单号
利用数据库唯一索引或主键索引
利用redis判断
dledger模式不支持批量发送/升级v4.8+
<code>consumer.setMessageModel(MessageModel.BROADCASTING);</code>
表达式过滤
consumer.subscribe("filter-topic", "TAG1 || TAG2");
sql过滤
需要配置<code>enablePropertyFilter=true</code>
<code>message1.putUserProperty("a", "1");</code>
<code>consumer.subscribe("filter-topic", MessageSelector.bySql("TAGS IN ('TAG1','TAG2') AND a between 0 and 1 "));</code>
基本语法<code>>,<,>=,between,in,and,or,not</code>等
代码
流程
发送消息到服务端,这个消息暂存在服务端,不会被消费者读取到
持久化成功后会返回生产者一个ack,确认消息是否成功
成功回调执行executeLocalTransaction方法,执行本地事务,持久化到数据库类的操作,这块的回滚自行处理,最终返回本地事务的执行结果
根据返回结果进行操作,commit的话会将当前消息移动到实际的topic下,回滚就删除消息
如果本地事务返回unknown,服务端会定时调用checkLocalTransaction方法进行查询,最多15次
根据checkLocalTransaction方法进行执行回滚或者提交
开启权限控制
配置文件
使用RocketMQTemplate进行发送消息,相关属性都以rocketmq_开头
topic:tags
源码地址 源码地址 使用4.7.1版本源码
在项目根目录下创建conf文件夹,复制distribution下broker.conf,logback_broker.xml,logback_nameserv.xml三个文件到conf下
在本机添加环境变量ROCKETMQ_HOME指向项目根目录
启动nameser
修改conf目录下的broker.conf 添加namesrvAddr,storePathRootDir,storePathRootDir,storePathCommitLog,storePathConsumeQueue,storePathIndex,storeCheckpoint,abortFile等参数具体可参考上方配置
启动broker 配置启动参数-c broker.conf文件地址
配置信息:创建nameseverconfig与nettyserverconfig
初始化,启动,监听9876端口,提供给客户端拉取路由信息
创建处理请求的线程与定时扫描的线程(10s扫描一次,判断最后最后更新时间+2分钟,超出会删除这个broker并关闭连接)
启动了很多组件
注册到nameserver,每30s(可以配置修改但最长为60s)发送一次心跳
DefaultMQProducerImpl:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
判断组名是否符合规定
启动各种定时任务,缓存nameserver上所有的主题,与broker建立心跳
发送消息采用索引自增取模的方式进行
org.apache.rocketmq.store.DefaultMessageStore#putMessage
使用零拷贝追加到commitlog,同步或异步刷盘,主从同步
定时任务:每10s启动启动一次,
作者: JaminYe
出处:https://www.cnblogs.com/JaminYe/p/15559170.html/
版权声明:本文原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。