后台开发的rabbitmq官网拾遗
- 常使用rabbitmq的常用功能,想着拓展一下,就到rabbitmq官网的文档里做一些总结和摘取,查看文档的视角是一位后台开发,所以运维工具方面省略了,比如 安装、CLI工具、配置、权限、部分网络、命令等。
-
- 网络和连接(部分)
- 监控
- 客户端连接
- 分布式mq 与 clustering
- Quorum Queues
- Classic Mirrored Queues
- Time-To-Live and Expiration
- Lazy Queues
- 普通的Queues
- Exclusive Queues
- Priority Queue Support
- Exchange
- Dead Letter Exchanges
- Consumers
- Publishers
- Java Client 总结
- channel特性
- Channel Prefetch Setting (QoS)
- firehose 消防水带特性
-
- 申明
常使用rabbitmq的常用功能,想着拓展一下,就到rabbitmq官网的文档里做一些总结和摘取,查看文档的视角是一位后台开发,所以运维工具方面省略了,比如 安装、CLI工具、配置、权限、部分网络、命令等。
顺便贴上官网地址https://www.rabbitmq.com/documentation.html
网络和连接(部分)
-
各种客户端和RabbitMQ协议种类包括如下:
AMQP 0-9-1 with extensions :一个连接多个通道,可以使用pki以及认证证书。
AMQP 1.0 :有个连接多种会话,比上一个功能弱一点。
MQTT 3.1.1
STOMP 1.0 through 1.2
- 所有协议都是建立在tcp上的,使用长连接。一个client使用一个tcp连接。这里注意,使用完连接后,应用需要关闭它,减少机器资源耗尽的风险。有些操作系统会限制一个进程可以使用的连接数目。
- 连接泄露问题:连接打开后一直不关闭,或者只是关闭一小部分,造成资源耗尽。使用admin后台监控连接数量, RabbitMQ 3.7.9.之后还能监控连接打开速率。
- 高连接切换问题:应用频繁的打开以及关闭连接,应该尽量使用长连接。切换速率超过 100/second 就要注意了。
- 流控:应用在推送连接上
- 心跳监测时间:对于集群,小于5秒可能导致错误判断,不推荐。
监控
- mq的插件可以提供http的接口,查询mq的监控指标,默认的http接口是 http://server-name:15672/api/**
-
主要指标:
cpu状态
内存利用率
虚拟内存使用
I/O 使用情况
节点的可用空间
TCP连接数目
网络流量
网络延迟
- 监控频率推荐是30-60秒
客户端连接
-
AMQP 0-9-1方式的客户端的连接使用同一个Tcp连接,后面一个是更底层的连接,每个channel以一个id。
channel是建立在成功的连接之上。关闭后会是防止相应的资源。关闭的channel不能再被使用。
-
异常场景:
重复申明queue、exchange时候使用不同的属性值,导致406错误,注意有些场景,属性是可以覆盖的,这个异常只是说一种情况。
访问不被允许的资源,403错误。
绑定不存在的queue、或者exchange,404错误
从不存在的queue消费、推消息到不存在的exchange,404错误
访问一个被排除的queue,405异常
但是客户端可以自己处理,如Java客户端,可以注册错误处理器(error handle)
- 一个连接可以打开的channel数目,是可以配置的。channel数目增加,会导致客户端以及节点上资源消耗。因此需要合理控制。
-
rabbitmq-event-exchange 插件
这个插件可以监听mq事件,包括连接、channel、queue等系统部分产生的事件。这个插件把事件推送到exchange里面,名字是 amq.rabbitmq.event 应用自己声明队列并且监听。
分布式mq 与 clustering
-
分布式有三种方式:with clustering, with Federation, and using the Shovel plugin.
clustering: 多台机器联系到一起,所有节点mq以及erlang版本一致。上面的虚拟主机、exchange、用户等自动复制。queue可以复制内容、镜像、可以只在一个节点上、可以使用Quorum queues 类型。
Federation:允许exchange以及queue接收其他broker上对应exchange以及queue的消息。 exchange或者queue使用点对点的链接,默认情况是消息在链接上被传递一次。传递出去的消息如果不能分配到queue上,那么消息来源的位置,也不会被发出去。常见场景是组合连个mq的broker。 可以使用不同版本的mq以及erlang
Shovels插件:雷类似于Federation,但是更低级层次。他是消费一个消息,然后,传递到另一个exchange上。
- 节点通过名字做唯一识别,例如[email protected]。可以动态变化节点数目。
-
被复制的东西:
所有的数据、状态都会被复制到所有节点。除了默认的消息队列,可以复制的队列类型在后面谈及。
-
集群的所有节点是地位相同的。节点间通过cookies验证身份。
节点数目推荐使用奇数,1、3、5、7等。因为需要集体决策。
- 节点的操作会被复制到不同节点(原文大概是队列的主从位置)
-
防止单个节点失败
客户端连接使用一个list指明节点即可。另外推荐的方法有:dns动态解析变换,或者tcp连接的负载均衡方案。
-
节点失败
mq的broker能够接受节点失败。镜像队列可以使得内容复制到不同的节点;非镜像队列也可以用在集群上。
Quorum Queues
quorum 是指决策多数。例如n/2+1 中的n。RabbitMQ 3.8.0.之后支持。
-
动机:
传统的镜像队列由于技术原因,有潜在风险丢失信息。quorum queue保证消息安全、方便实用。
-
和传统队列比较,有很多差异点,简单列举部分;
只能是持久化的
不支持排除特性
不支持ttl、不支持死信
不支持优先级
支持Poison Message Handling
消息一直不能被消费,导致不断从新入队,成为Poison Message。消息head里面用x-delivery-count记录发送次数,然后队列设置最大次数,大于最大次数的丢弃或者成为死信。
-
性能严重取决于集群大小以及硬盘速度。
复制因子默认是5,例如集群是3节点,每个节点上有一个复制;集群是7节点,只有5个节点上有一个复制;
- 使用publish confirm机制保证消息安全,消息一致性大于可用性。
Classic Mirrored Queues
官网开头就说:建议读者使用quorum queue。并评价为下一代高可用队列。
-
介绍
每一个queue有主从,主的节点负责消息接收、消息发送。主节点挂掉,最老的从节点变成主节点。无论客户端连接到集群哪个节点,执行操作的是主节点。
主节点掉线后,如果上面的数据没有同步,那么消息就会丢失。消费者是自动应答模式的话,消息也可能会丢失。从节点转发的消息没有被消费,从节点就掉线的话,也可能会导致消息丢失。
具体同步、选主过程、失败处理等略。
-
可靠性
涉及点如下,详情略:连接失败处理、ack以及confirms机制、信条坚持以及数据指标监控、生产者数据安全、消费者数据安全、消费者监听取消通知
- 高可用实现略
-
脑裂问题:
网络问题造成一个分区的两个部分断开连接,变成两个分区;网络再次连接时,分区不会自动合并。
解决方法:
mq版本使用3.4.0和3.4.1及以上,避免错误检测到网络分区。
RabbitMQ提供了三种方法自动的解决网络分区:
pause-minority mode: 检测自身处于少数派,关闭此集群。
pause-if-all-down mode:不能和任意指定节点连接,关闭此集群。
autoheal mode。(默认的是ignore模式):选择节点多的作为主分区,其他重启。节点数一样的话,随机选择。
Time-To-Live and Expiration
- 在每一个队列上设置ttl:定义队列时候使用参数(“x-message-ttl”, 60000),单位毫秒
-
在每一个设置消息的ttl:消息设置属性expiration,单位毫秒
两个属性都有的话,最小值。
- 队列自身的ttl:设置x-expires,定义队列多久未使用后被删除。未使用是指没有消费者。
-
队列长度限制:
可以设置消息总数量或者消息的总内存大小限制;
处置超过限制的消息:定义队列使用参数overflow(“overflow”:“reject-publish”),可以指定消息不能进入队列或者从头部丢弃消息,消息被抛弃或者成为死信。
Lazy Queues
-
介绍
RabbitMQ 3.6.0之后支持懒加载队列。适应于数百万消息、或者内存较小的场景。这个是尽可能多的把消息放到硬盘上。少量消息放在内存中,默认是16384个消息。
可以再运行的时候修改队列的模式,但是需要重启。
普通的Queues
-
注意
以"amq.开头的队列名字,是mq的保留字段。
-
队列使用前需要申明。对于已经存在的队列,使用相同的参数声明,没有影响;参数不同的话,会导致406异常。
队列的多数可选参数可以动态变化。
Exclusive Queues
只是被声明他的连接使用。其他连接想要使用的话,会抛出异常。声明他的连接关闭后,这个队列被删除。
Priority Queue Support
-
介绍
声明队列的时候使用参数x-max-priority标识优先级。 1-255,推荐是1-10的数字。
然后就可以推送具有优先级的消息,使用属性priority定义。没有值的话默认是0,必定义的最大值还要大的话,就相当于最大值。
- 高优先级的消息会被先处理。但是需要配合消息的prefetch属性。因为消息进入消费者的缓存池,就不会再排序了。
-
队列里面的消息是从头部扫描过期的,因此低优先级的过期消息可能会被高优先级的消息卡在后面。
如果队列有最大限制,按照通常丢弃头部消息的策略,可能会造成高优先级的消息会被丢弃。
Exchange
- exchange 和 exchange 可以绑定,就像队列绑定一样使用。
- Alternate Exchanges: 一个exchange不能把消息分配给queue,那么消息就会进入AE里面,这个路径可以一直走下去。
Dead Letter Exchanges
一个消息成为死信有三种情况
消息被消费者拒绝
消息过期
队列长度限制导致消息被丢弃
- 关于dead letter routing key:消息发送的时候自身有routing key = foo,默认的dead letter routing key 就是foo, 如果exchange上设置 dead letter routing key = bar,那么死信的dead letter routing key 变成bar。
- 死信消息在原始队列上的删除时机:死信队列ack回复后,原始队列才会把消息删除,如果死信队列在回复前挂掉,死信消息会存在两个位置。
- 如果死信消息发生了循环,这个消息会被丢弃。
- 死信消息的head有一个x-death属性,存有死信消息的经历的事件。
Consumers
- 每一个消费者也有一个唯一标识。
-
消费者连接丢失的话,Java .Net client 会自动恢复。
恢复的顺序是:
恢复connection
恢复channel
恢复queue
恢复exchange
恢复binding
恢复消费者
-
Consumer Prefetch
限制处于未返回ack的消息数目,只有一个消息返回ack才能腾出空间给下一个消息。
-
Consumer Priorities
消费者设置优先级,高优先级的消费者的会先消费,除非高优先级堵塞,低优先级的消费者才会消费消息。
同一个级别的是轮询消费。
-
设置exclusive 为true,保证某个时刻只有一个customer消费queue。排他消费。
设置x-single-active-consumer为true,某个时刻只有一个customer消费queue直到这个消费者挂掉,然后其他消费者自动补上。单活消费。
单活消费 与 排他消费区别在于,前者更大程度保证消息被一个消费者消费。这俩个属性不能共存。
-
消费者并发:Java .net 等客户端使用线程池控制异步消费操作,这个池子也可以设置并发度。客户端保证了从同一个channel发送来的消息,会被按照原有顺序消费,不必考虑池子的并发度。这里需要注意,消费和等待ack是异步的,后消费的可能速度快导致先返回ack。
有些客户端会限制并发度,设置并发因子为1,以满足严格的顺序消费场景。
- 对于消费者被异常取消的场景,java 客户单可以使用覆盖handleCancel方法自定义业务。
Publishers
- AMQP 0-9-1协议上:消息发送到exchange上,如果没有queue接收,默认是消息丢弃。如果消息设置了mandatory属性,消息会被返回给exchange。
- message可以设置Content type(例如 application/json)、Content encoding(例如 gzip);但是mq不会使用这个字段,自己的application或者plugins 使用这些字段。
-
Publisher Confirms的策略:
流模式:持续推送消息,监听消息ack,
批模式:一次推送一批消息,等待消息ack
单个模式:一次推送后,等待ack,然后再次推送。是批模式的特例。对流量影响最大。
当出现资源报警时候,所有推送都是阻塞的直到警告清除。
- 生产者publish confirm的时候,不共享channel,使用channel pool ;消费者使用线程池,来保证消息安全。
- mq支持Java nio,但是nio不是为了提高流量的,是为了可控性。控制有多少线程在工作。
- 除了常见的exchange类型,还有其他的类型:Headers,下面Consistent hashing exchange, random routing exchange, internal event exchange and delayed message exchange,这些可以通过插件实现。
- 分发消息的时候,消息会被mq默认带上一些属性:Delivery tag、Redelivered、Exchange、Routing key、Consumer tag,还有一些可选的属性:Message ID、Headers等。
- 推送安全有两种模式:transaction和confirm,前者是重量级的,减少很多吞吐量。
Java Client 总结
-
并发安全:
不同的线程使用同一channel应该避免,因该是给每一个线程提供不同的channel,可以使用channel pool 达到上述目的,作为一种同步机制解决方法。
消费在一个线程上,推送在另一个线程上,也是安全的。
当时使用手动ack模式,使用线程回复ack是很重要的,否则可能造成重复ack。
- 对于消费者,线程池里面的线程是依据channel独立开的,因此不同线程可以执行阻塞的方法。 每一个channel有自己的线程,
- 消息给了exchange,但是无法路由到queue上:如果消息代了mandatory的标识,那么消息会被返回给客户端。客户端对于这种消息的处理,需要实现ReturnListener,不实现的话,默默丢弃消息。
-
其他自定义特性:
1 设置消费者的线程池
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
需要注意确保自定义的线程池执行shutdown()方法,否者它会阻止jvm关闭
当有明显证据显示,默认的配置是处理消息的瓶颈的时候,才考虑自定义的线程池。
2:设置hosts列表
客户端会按顺序逐个列表的host,返回第一个成功的连接。
3 支持Java的nio
4 支持网络失败后自动恢复,默认开启
触发的时机包括:io异常、socket readTimeOut、心跳监测异常
5 可以注册一个恢复监听器,Recovery Listeners
-
自动恢复:
mq的entities (queues, exchanges, bindings, consumers)会在缓存里面存数据。连接中断的监测需要时间,library and the application会有一段时间意识不到连接断开。 mq的publisher confirms机制保证消息不会丢失。
当i/o操作异常的时候,application恢复连接会在延迟5秒后开始,这是因为假设大多数网络失败是临时的。这个时间可以通过RecoveryDelayHandler动态修改,但是不应该小于2秒。
恢复状态的connection会拒绝所有的推送请求,publisher confirms不能完全避免消息丢失。开发者应该注意当连接恢复的时候,重新推送消息。
- 监控mq的各项指标可以使用MicrometerMetricsCollector。它的角色只是读取数据。
channel特性
- 每个channel在客户端以及节点上都占用少量内存,
-
客户端和mq都可以配置单个connection支持的最大channel数目
channel对应一个deliver tag,消息的属性里面也有这个字段,所以消息和channel是绑定的。
-
acknowledge modes
自动ack:消息写到tcp socket就认为成功了,容易丢消息。
手动ack:手动返回ack
返回的结果
basic.ack:消息处理成功,可以删除了。
Java客户端,可以设置multiple,批量返回ack
basic.nack:息处理不成功
设置requeue,指定是否重新入队,不同于reject,可以设置multiple,批量返回结果
basic.reject:息处理不成功,可以删除了
设置requeue,指定是否重新入队
重新入队的消息,若果可能,回到原来的位置;如果不行,放到更靠近header的一个位置。
- 自动模式ack下:牺牲安全换取吞吐量,可以认为这个模式是不安全的。而且会造成服务器过载,因为它不会限制channel prefetch,因此只是用在那些消费速度很快的场景。
- 手动ack模式下:因为channel关闭或者其他异常,消息会自动入队
Channel Prefetch Setting (QoS)
-
设置prefetch count,指定消费者一次接受多少消息处于处理中,只有一个消息处理结束,才会再放入下一个消息。
prefetch count会影响吞吐量,通常100~300能提供较好吞吐量。
-
持久的消息,到达硬盘的时候,basic.ack就会返回个publisher
和消费消息一样,发送消息和等待ack异步的,两者顺序不完全相同。
channel设置global标识,这里是rabbitmq给的含义
true:basic.qos的值,应用给一个channel上所有的consumer。表示大家均摊这个数值。
false:basic.qos的值,应用给独立的consumer
firehose 消防水带特性
开启的话会影响性能,因为有额外的消息被产生以及路由
从exchange进入的消息,都会进入amq.rabbitmq.trace 这个默认exchange,需要自己取消费这个exchange。同时这个消息的头会放入消息的来龙去脉。可以做备份吧。
消息头如下:
headers: channel: 1
connection: [email protected]
exchange_name: priority.ex
node: [email protected]
properties: delivery_mode: 1
headers:
routed_queues: priority.q
routing_keys:
user: guest
vhost: /
申明
如需转载,需要申明来源