天天看点

rocketmq入门笔记

异步

原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的 下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去拉取消息队列中的东西进行处理.大大减少时间

解耦:增加积分,发送短信这些可以单独拆分出来,需要使用直接发送到知道的消息队列就行,你只需要关注你当前的业务

削峰: 如果使用线程池来解决,一个服务一个线程在高峰期你的mysql或者redis可能撑不住,使用mq就可以限制主机每次只拉取多少条进行处理

可用性降低

引入了mq,一旦mq宕机对业务有影响

复杂度提高

数据链路变得复杂,如何保证顺序性,不重复消费

一致性问题

用户支付了,增加积分出错该怎么处理

nameserver 相当于注册中心,连接从这里取ip

broker 消息仓库,里面有topic与队列

product,consumer生产者消费者

基本的环境<code>yum install java-1.8.0-openjdk-devel.x86_64 wget vim unzip -y</code>

下载mq安装包<code>wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip</code>

解压缩<code>unzip rocketmq-all-4.7.1-bin-release.zip -d /usr/local/</code>

启动nameserver服务

<code>vim bin/runserver.sh</code>

默认堆初始化最大都是4g,新生代2g,测试机没这么内存,不修改无法启动,改为256m,新生代128m<code>JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"</code>

后台启动<code>nohup bin/mqnamesrv &gt; n1.out &amp;</code>

启动broker服务

<code>vim bin/runbroker.sh</code>

默认堆初始化最大都是8g,新生代4g,测试机没这么内存,不修改无法启动,改为512m,新生代256m<code>JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"</code>

暴露namserver地址<code>echo 'export NAMESRV_ADDR=localhost:9876' &gt;&gt; ~/.bash_profile</code>

后台启动<code>nohup bin/runbroker.sh &gt;n2.out &amp;</code>

日志验证

n1.out <code>The Name Server boot success. serializeType=JSON</code>

n2.out <code>The broker[localhost.localdomain, 192.168.147.134:10911] boot success. serializeType=JSON and name server is localhost:9876</code>

发送接收测试

发送<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Producer</code>

接收<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer</code>

关闭

关闭nameserver服务<code>bin/mqshutdown namesrv</code>

关闭broker服务<code>bin/mqshutdown broker</code>

4种高可用集群

多master模式

优点:配置简单,性能最高

缺点:单个宕机,这台机器上违背消费的消息不可订阅

多master多salve 异步复制

优点:消息丢失少(异步复制),消息实时性不受到影响,master宕机可以从slave上消费,性能与多master基本一致

缺点:master宕机下会丢失少量消息

多master多salve 同步双写

优点:master宕机,消息无延迟,可用性高

缺点:性能有所丢失

dledger模式:4.5版本之前采用master-slave架构部署但是master挂掉都slave无法自动晋升为master,这种模式可以将多个master-slave组成一个组,当组内master挂了将选举一个master继续服务

修改vim conf/2m-2s-async/broker-a.properties配置文件

将broker-a.properties写入到broker-b-s.properties修改brokerName,brokerId,brokerRole和几个文件存储路径,同一台虚拟机注意端口号也需要修改

克隆当前虚拟机,修改broker-a-s.properties,broker-b.properties文件

修改host文件<code>vim /etc/hosts</code>

启动两台nameserver<code>nohup bin/mqnamesrv &gt;n1.out &amp;</code>

启动broker,使用-c指定配置文件<code>nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &gt;nb.out &amp;</code>

关闭防火墙或者开放9876,两个broker服务的端口<code>firewall-cmd --zone=public --add-port=9876/tcp --add-port=10911/tcp --add-port=11011/tcp --permanent``firewall-cmd --reload</code>

四个broker服务都启动后验证集群<code>bin/mqadmin clusterList -n work1:9876</code>

这里可能会报错<code>Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available</code>

解决方法:出处:一篇文章彻底解决RocketMq的疑难杂症之:org.apache.rocketmq.client.exception.MQClientException: No route info of thi<code>cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/lib/ext/sunjce_provider.jar /usr/local/rocketmq-all-4.7.1-bin-release/lib</code>具体以实际目录为准

成功:两主两从

rocketmq入门笔记

项目地址rocketmq-dashboard

项目克隆<code>git clone https://github.com/apache/rocketmq-dashboard.git</code>

打开rocketmq-console导入idea,修改application.properties文件<code>rocketmq.config.namesrvAddr=work1:9876;work2:9876</code>以实际情况修改

打包项目上传jar包,启动<code>nohup java -jar rocketmq-console-ng-2.0.0.jar &amp;</code>

打开浏览器访问当前服务器8080端口

rocketmq入门笔记

快速演示

在<code>bin/dledger/fast-try.sh</code>快速演示的脚本,但脚本给一个broker的内存是1g,虚拟机没有这么大修改一下

这里我修改为256m

启动<code>bin/dledger/fast-try.sh start</code>

查看集群情况<code>bin/mqadmin clusterList -n 127.0.0.1:9876</code>

rocketmq入门笔记

查询master节点进程号并把它kill,看slave是否能转为master

rocketmq入门笔记

实际搭建

配置文件增加一下几条

集群搭建成功

rocketmq入门笔记

直接把135主机关机了

切换成功

rocketmq入门笔记

producer生产消息,consumer消费消息,broker存储消息,每个broker对于一台服务器,每个broker可以存储多个opic消息,每个topic消息也可以分片存储于不同的broker上,message queue用于存储多个消息的物理地址,每个topic消息存储于多个message queue中

producer负责生产消费,将消费者消息发送到broker上,有多种发送方式:同步发送,异步发送,顺序发送,单向发送,同步与异步需要broker返回确认消息,单向发送不需要。同一类producer组成一个集合为生产组发送同一类消息且逻辑一致,如果有异常,broker服务器会联系同一生产者组提交或回滚

consumer消息者形式分为两种:

拉取式:主动式消费,消费者调用拉取的方法

推动式消费:broker有数据就会推给消费者

消费者组必须订阅同一个topic,消息模式两种:

集群消费模式:平摊消费

广播消费模式:共享消费

每个topic若干个消息,每个消息只能有一个主题,同一个topic下的数据分片保存到不同的broker,每个分片单位是messageQueue

几个模块

rocketmq入门笔记

remoting module:处理来自clients的请求

client manager:负责管理客户端和维护消费者的topic订阅信息

store service:处理消息的存储查询功能

ha service:高可用服务,负责master与slave的数据同步

index service:索引服务,以提高查询

普通集群

每个节点固定角色,master负责响应客户端请求并存储消息,slave负责同步数据并响应客户端部分读请求

dledger高可用集群

dledger

接管broker的commitlog消息存储

选举leader节点

完成消息同步

多副本消息同步

leader收到消息会将消息标记为uncommited状态,发给follower,follower收到消息后需要给leader返回一个ack,如果有超过半数的follower返回ack就会把消息改为commited状态,发给follower

leader选举机制

每个节点有三个状态,leader,follower,candidate(候选人)

每个时间点叫做term

集群启动时,每个节点都是follower,集群内部发送一个timeout信号,follower转为候选人,发起投票后收到半数以上的投票晋升为leader,

选举过程,集群启动,三个节点都是follower,三个节点都给自己投票,term都是1,三个节点随机休眠,a启动term加一为2,第二个节点醒来,发现a的term比自己大,承认a是leader,c同理

充当路由消息的提供者,broker会在启动时向nameserver注册自己的服务信息,后续通过心跳维护当前服务的可用性,生产者或消费者通过名字服务查找各主题消息相应的broker ip列表

每个消息都必须拥有一个topic,每个消息拥有唯一的message id,且可以携带业务标识key, 可以为消息设置一个tag标签

时间

mq收到消息标记为uncommit状态发给follower,follower收到消息,发给leader一个ack,超过半数follower返回ack,消息改为commit状态,存储,状态同步给follower

mqpush消息给消费者,等待消费者ack响应,标记为已消费,没有标记消息会重复推送

mq会定期删除一些过期的消息

存储介质:磁盘文件(采用顺序读写,保证存储的速度,采用mmap的方式,省去上下文切换,提高速度)

commitlog:存储消息元数据,每个文件1个G

consumerQueue:消息队列,保存commitlog的索引

indexFile:提供通过key或时间来查询消息的方法

同步刷盘:消息写入机器的内存时,通知刷盘线程刷盘,等待刷盘线程写入完成后唤醒线程,返回写入完成

优点:稳定安全

缺点:性能不如异步

异步刷盘:消息写入内存后,返回写入完成,当内存累计到一定程度是统一触发刷盘操作

优点:吞吐量大

缺点:一旦服务器断电丢失部分消息

同步复制:生产者发送消息,只有master与slave(半数slave)写入成功才反馈生产者写入成功

异步复制:生产者发送消息,只要master写入消息成功,就反馈生产者写入成功,再异步将消息同步到slave

生产者负载均衡:

生产者发送消息时,获取当前topic下所有broker集合,采用取模递增算法将消息往不同的broker上发送

消费者负载均衡

集群模式:六种分配算法

AllocateMachineRoomNearby:同机房的消费者与broker分配一起

AllocateMessageQueueAveragely:平均分配,将所有消息队列平均分配给消费者,先算数后分配

AllocateMessageQueueAveragelyByCircle:先轮流给消费者分配一个队列,后面再增加

AllocateMessageQueueByConfig:直接指定所有队列

AllocateMessageQueueByMachineRoom:按逻辑机房进行分配

AllocateMessageQueueConsistentHash:

广播模式:每个消费者分配所有的队列

广播模式下不存在消息重试,会直接消费下一条

如何重试

消息监听器中配置

返回Action.ReconsumeLater

返回null

抛出异常

不重试返回Action.CommitMessage

重试处理

重试的消息会进入“%RETRY%”+ConsumeGroup队列,最多16次,16次后会进入死信队列,可配置例如20次,16次后酶促间隔2h

16次每次间隔10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h

messageId

老版本中,无论重试多少次messageId是相同的,4.7.1中每次重试messageId会重建

配置覆盖

最大重试次数对同一个消费组实例有效,最后启动的消费者会覆盖之前的配置

一个死信队列对于一个消费组,而不是一个消费者

一个消费组不需要死信队列是不会创建死信队列的

一个死信队列包含这个消费组所有无法消费的消息,不区分主题

消息无法再被消费者正常消费

默认存储3天,不管是否消费被删除

默认死信队列中的消息无法读取,需要将权限配置为6

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。支付时重复提交了多次但最后还是只支付了一次的钱

三种实现语义

at most once:每条消息最多消费一次

at least once:每条消息至少消费一次

exactly one:确定消费一次

rocketmq支持at least once语义

消息重复情况

发送重复:消息发送到服务端并且持久化了,网络断开或者宕机了,生产者判断发送失败了会造次发送

投递重复:消费者收到消息并完成业务处理了,准备发送消息接收时宕机了,服务端在恢复后会再次发送一遍这个消息

负载均衡时消息重复:broker服务重启,扩容,缩容会触发rebalance造成消费者收到重复的消息

解决:

业务唯一标识:例如订单号

利用数据库唯一索引或主键索引

利用redis判断

dledger模式不支持批量发送/升级v4.8+

<code>consumer.setMessageModel(MessageModel.BROADCASTING);</code>

表达式过滤

consumer.subscribe("filter-topic", "TAG1 || TAG2");

sql过滤

需要配置<code>enablePropertyFilter=true</code>

<code>message1.putUserProperty("a", "1");</code>

<code>consumer.subscribe("filter-topic", MessageSelector.bySql("TAGS IN ('TAG1','TAG2') AND a between 0 and 1 "));</code>

基本语法<code>&gt;,&lt;,&gt;=,between,in,and,or,not</code>等

代码

流程

发送消息到服务端,这个消息暂存在服务端,不会被消费者读取到

持久化成功后会返回生产者一个ack,确认消息是否成功

成功回调执行executeLocalTransaction方法,执行本地事务,持久化到数据库类的操作,这块的回滚自行处理,最终返回本地事务的执行结果

根据返回结果进行操作,commit的话会将当前消息移动到实际的topic下,回滚就删除消息

如果本地事务返回unknown,服务端会定时调用checkLocalTransaction方法进行查询,最多15次

根据checkLocalTransaction方法进行执行回滚或者提交

开启权限控制

配置文件

使用RocketMQTemplate进行发送消息,相关属性都以rocketmq_开头

topic:tags

源码地址 源码地址 使用4.7.1版本源码

在项目根目录下创建conf文件夹,复制distribution下broker.conf,logback_broker.xml,logback_nameserv.xml三个文件到conf下

在本机添加环境变量ROCKETMQ_HOME指向项目根目录

启动nameser

修改conf目录下的broker.conf 添加namesrvAddr,storePathRootDir,storePathRootDir,storePathCommitLog,storePathConsumeQueue,storePathIndex,storeCheckpoint,abortFile等参数具体可参考上方配置

启动broker 配置启动参数-c broker.conf文件地址

配置信息:创建nameseverconfig与nettyserverconfig

初始化,启动,监听9876端口,提供给客户端拉取路由信息

创建处理请求的线程与定时扫描的线程(10s扫描一次,判断最后最后更新时间+2分钟,超出会删除这个broker并关闭连接)

启动了很多组件

注册到nameserver,每30s(可以配置修改但最长为60s)发送一次心跳

DefaultMQProducerImpl:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl

判断组名是否符合规定

启动各种定时任务,缓存nameserver上所有的主题,与broker建立心跳

发送消息采用索引自增取模的方式进行

org.apache.rocketmq.store.DefaultMessageStore#putMessage

使用零拷贝追加到commitlog,同步或异步刷盘,主从同步

定时任务:每10s启动启动一次,

作者: JaminYe

出处:https://www.cnblogs.com/JaminYe/p/15559170.html/

版权声明:本文原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。