天天看点

消息队列之activeMQ

实现高可用、高伸缩、高性能、易用和安全的企业级面向消息服务的系统

异步消息的消费和处理

控制消息的消费顺序

可以和Spring/springBoot整合简化编码

配置集群容错的MQ集群

下载地址:http://activemq.apache.org/components/classic/download/

这里笔者是下载的linux版的:

消息队列之activeMQ

因为activeMQ底层是使用java编写的,所以需要安装jdk,这个请移步我之前的博客:

安装activeMq:

前台访问的端口是8161,在查看前台时,要关闭linux和windows的防火墙:

在访问之前,需要修改conf目录下的jetty.xml,将下面的host修改成自己的ip,以及修改用户名和密码。

修改完成之后重启activemq

查看,地址为192.168.189.150:8161

消息队列之activeMQ

到这里就说明activemq安装成功了。

JMS(java message service)是一个用于提供消息服务的技术规范,他制定了在整个消息服务提供过程中的所有数据结构和交互流程。当两个程序使用jms进行通信时,他们并不是直接相连的,而是通过一个共同的消息收发服务连接起来的,达到解耦的效果。jms为标准消息协议和消息服务提供了一组通用的接口,包括创建、发送、读取消息等。

消息队列之activeMQ

异步:客户端不用发送请求,JMS自动将消息发送给客户端

可靠:JMS保证消息只传递一次

JMS provider:实现了jms接口和规范的消息中间件

JMS producer:消息生产者,创建和发送JMS消息的客户端应用

JMS consumer:消息消费者,接受和处理JMS消息的客户端应用

JMS message:由消息头、消息属性、消息体组成

消息头(在send方法之前,通过setXXX()设置):

JMSDestination:消息发送的目的地,主要是指Queue(点对点传送模型)和Topic(发布订阅模型)

JMSDeliverMode:消息是否持久

JMSExpiration:设置消息过期时间

JMSPriority:消息优先级,0-4被称为普通消息,5-9是加急消息,默认为4

JMSMessageID:唯一识别每个消息的标识,由MQ产者或者自己设定

消息属性:除消息头以外的值,如识别,去重,重点标注等方法,如textMessage.setStringProperty("c1","VIP");

消息体:

TextMessage:普通字符串

MapMessage:map类型,其中key为String类型,而值为java的基本类型

BytesMessage:二进制数组消息

StreamMessage:java数据流消息,用个标准流来顺序填充和读取

ObjectMessage:对象消息,包含一个可序列化的java对象

点对点消息传送模型:应用程序由消息队列、发送者、接收者组成,每个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,处理消费掉的和已过期的消息

点对点消息传送的特性:

1.每个消息只有一个接收者

2.消息发送者和接收者没有时间依赖性

3.当消息发送者发送消息时,无论接收者程序在不在运行,都能发送消息

4.当接收者收到消息时,会发送确认收到通知

发布订阅消息传递模型:发布者发布一个消息,该消息通过topic传递给所有订阅的客户端,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和消息订阅。

发布订阅消息传递的特性:

1.一个消息可以传递给多个订阅者

2.发布者和订阅者有时间依赖性

3.为了缓和严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅

1.引入jar包

2.生产者代码

运行代码在浏览器上查看,可以看到queue01里面有5条消息:

消息队列之activeMQ

Number Of Pending Messages:等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数

Number Of Consumers:消费者的数量

Messages Enqueued:进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减

Messages Dequeued:出了队列的消息 可以理解为是消费这消费掉的数量

运行消费者的代码,应该我上面生产者的代码运行了两次,所以消息有10条。

消息队列之activeMQ

在这里,笔者使用的基于Zookeeper+levelDb搭建的activeMq集群,为了避免单点故障,使用一主两从的架构。使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它被视为master,也就是说如果master因为故障而不能提供服务,Zookeeper会从SLave中选举出一个Broker充当master。

我这边的zookeeper集群已经搭建好了,150和151是follower,152是leader。

从上面可以看到,只有00000000020这个几点的elected里面有值,表明它被选举为master节点了。

在浏览器上依次访问:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161

只有192.168.189.150:8161可以访问成功,因为只有master节点可以对外提供访问,所以只有一个节点能访问到,那么它就是master节点。

第二种查看的方式:

查看activemq的日志,最后一行,可以看到,MasterLevelDBStore即为master节点,SlaveLevelDBStore即为slave节点。

消息队列之activeMQ
消息队列之activeMQ

第三种查看的方式为使用zookeeper的可视化工具。

由于activeMq集群是基于zookeeper集群实现的,所以要注意一下三点:

activeMQ的客户端只能访问master的Broker,其它处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议

当一个activeMQ节点挂掉或者一个Zookeeper节点挂掉,activeMQ服务正常运转,但是如果仅剩一个activeMQ节点,由于不能选举Master,所以activeMQ不能正常运行;(一个就不成集群了)

同理,如果Zookeeper仅剩一个节点是活动的,不管activeMQ是都存活或者说不管activeMQ个节点是否存活,activeMQ不能正常提供服务,必须依赖于Zookeeper集群服务。

集群的代码和上面单机的代码大致是一直的,就只需要修改一个activemq的地址。

1.消息发送方式

默认情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升,所以在发送持久化消息时,请开启事务模式。

2.储存机制

在通常情况下,非持久化的消息时存储在内存中的,持久化消息时存储在文件中的,他们的最大限制在配置文件中的

所以尽量不要用非持久化文件,如果非要用的化,可以将临时文件的限制调大。同时,非持久化的消息要及时处理,不要堆积,或者启动事务。启动事务后,commit()会等待服务器的消息返回,也不会导致消息丢失了。

3.死信队列

一条消息在被重发多次后(默认是6次),将会被ActiveMQ移入死信队列;说白了就是异常消息的归并处理的集合,主要是处理失败的消息。可以在activeMQ.DLQ这个队列中查看。

4.重复消息,幂等性调用

在网络延迟的情况洗啊,可能会造成MQ重试,可能会造成重复消费。如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,因为唯一主键,会造成主键冲突,避免数据库出现脏数据。如果是第三方消费,可以在每条数据里面加一个全局唯一的id,如果消息消费了,就将消息存在redis中,在消费消息之前将id到redis中查询一下,判断是否消费过,如果没有消费过,就处理,如果消费过了,就不处理了。