实现高可用、高伸缩、高性能、易用和安全的企业级面向消息服务的系统
异步消息的消费和处理
控制消息的消费顺序
可以和Spring/springBoot整合简化编码
配置集群容错的MQ集群
下载地址:http://activemq.apache.org/components/classic/download/
这里笔者是下载的linux版的:
因为activeMQ底层是使用java编写的,所以需要安装jdk,这个请移步我之前的博客:
安装activeMq:
前台访问的端口是8161,在查看前台时,要关闭linux和windows的防火墙:
在访问之前,需要修改conf目录下的jetty.xml,将下面的host修改成自己的ip,以及修改用户名和密码。
修改完成之后重启activemq
查看,地址为192.168.189.150:8161
到这里就说明activemq安装成功了。
JMS(java message service)是一个用于提供消息服务的技术规范,他制定了在整个消息服务提供过程中的所有数据结构和交互流程。当两个程序使用jms进行通信时,他们并不是直接相连的,而是通过一个共同的消息收发服务连接起来的,达到解耦的效果。jms为标准消息协议和消息服务提供了一组通用的接口,包括创建、发送、读取消息等。
异步:客户端不用发送请求,JMS自动将消息发送给客户端
可靠:JMS保证消息只传递一次
JMS provider:实现了jms接口和规范的消息中间件
JMS producer:消息生产者,创建和发送JMS消息的客户端应用
JMS consumer:消息消费者,接受和处理JMS消息的客户端应用
JMS message:由消息头、消息属性、消息体组成
消息头(在send方法之前,通过setXXX()设置):
JMSDestination:消息发送的目的地,主要是指Queue(点对点传送模型)和Topic(发布订阅模型)
JMSDeliverMode:消息是否持久
JMSExpiration:设置消息过期时间
JMSPriority:消息优先级,0-4被称为普通消息,5-9是加急消息,默认为4
JMSMessageID:唯一识别每个消息的标识,由MQ产者或者自己设定
消息属性:除消息头以外的值,如识别,去重,重点标注等方法,如textMessage.setStringProperty("c1","VIP");
消息体:
TextMessage:普通字符串
MapMessage:map类型,其中key为String类型,而值为java的基本类型
BytesMessage:二进制数组消息
StreamMessage:java数据流消息,用个标准流来顺序填充和读取
ObjectMessage:对象消息,包含一个可序列化的java对象
点对点消息传送模型:应用程序由消息队列、发送者、接收者组成,每个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,处理消费掉的和已过期的消息
点对点消息传送的特性:
1.每个消息只有一个接收者
2.消息发送者和接收者没有时间依赖性
3.当消息发送者发送消息时,无论接收者程序在不在运行,都能发送消息
4.当接收者收到消息时,会发送确认收到通知
发布订阅消息传递模型:发布者发布一个消息,该消息通过topic传递给所有订阅的客户端,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和消息订阅。
发布订阅消息传递的特性:
1.一个消息可以传递给多个订阅者
2.发布者和订阅者有时间依赖性
3.为了缓和严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅
1.引入jar包
2.生产者代码
运行代码在浏览器上查看,可以看到queue01里面有5条消息:
Number Of Pending Messages:等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Number Of Consumers:消费者的数量
Messages Enqueued:进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued:出了队列的消息 可以理解为是消费这消费掉的数量
运行消费者的代码,应该我上面生产者的代码运行了两次,所以消息有10条。
在这里,笔者使用的基于Zookeeper+levelDb搭建的activeMq集群,为了避免单点故障,使用一主两从的架构。使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它被视为master,也就是说如果master因为故障而不能提供服务,Zookeeper会从SLave中选举出一个Broker充当master。
我这边的zookeeper集群已经搭建好了,150和151是follower,152是leader。
从上面可以看到,只有00000000020这个几点的elected里面有值,表明它被选举为master节点了。
在浏览器上依次访问:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161
只有192.168.189.150:8161可以访问成功,因为只有master节点可以对外提供访问,所以只有一个节点能访问到,那么它就是master节点。
第二种查看的方式:
查看activemq的日志,最后一行,可以看到,MasterLevelDBStore即为master节点,SlaveLevelDBStore即为slave节点。
第三种查看的方式为使用zookeeper的可视化工具。
由于activeMq集群是基于zookeeper集群实现的,所以要注意一下三点:
activeMQ的客户端只能访问master的Broker,其它处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议
当一个activeMQ节点挂掉或者一个Zookeeper节点挂掉,activeMQ服务正常运转,但是如果仅剩一个activeMQ节点,由于不能选举Master,所以activeMQ不能正常运行;(一个就不成集群了)
同理,如果Zookeeper仅剩一个节点是活动的,不管activeMQ是都存活或者说不管activeMQ个节点是否存活,activeMQ不能正常提供服务,必须依赖于Zookeeper集群服务。
集群的代码和上面单机的代码大致是一直的,就只需要修改一个activemq的地址。
1.消息发送方式
默认情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升,所以在发送持久化消息时,请开启事务模式。
2.储存机制
在通常情况下,非持久化的消息时存储在内存中的,持久化消息时存储在文件中的,他们的最大限制在配置文件中的
所以尽量不要用非持久化文件,如果非要用的化,可以将临时文件的限制调大。同时,非持久化的消息要及时处理,不要堆积,或者启动事务。启动事务后,commit()会等待服务器的消息返回,也不会导致消息丢失了。
3.死信队列
一条消息在被重发多次后(默认是6次),将会被ActiveMQ移入死信队列;说白了就是异常消息的归并处理的集合,主要是处理失败的消息。可以在activeMQ.DLQ这个队列中查看。
4.重复消息,幂等性调用
在网络延迟的情况洗啊,可能会造成MQ重试,可能会造成重复消费。如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,因为唯一主键,会造成主键冲突,避免数据库出现脏数据。如果是第三方消费,可以在每条数据里面加一个全局唯一的id,如果消息消费了,就将消息存在redis中,在消费消息之前将id到redis中查询一下,判断是否消费过,如果没有消费过,就处理,如果消费过了,就不处理了。