ActiveMQ可以将数据持久化到数据库中,用于容灾备份恢复,可以使用任意的数据库。
1、找到activemq.xml文件
2、在文件中添加数据库的bean
<bean id="activemq-mysql" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
3、修改persistenceAdapter
找到<persistenceAdapter>标签,把原来的<kahaDB directory="${activemq.data}/kahadb"/>标签注释掉,使用jdbcPersistenceAdapter标签,引用上一步创建的bean,等JDBC持久化完成并启动activeMQ后,回来把createTablesOnStartup改为false(createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false)
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#activemq-mysql" createTablesOnStartup="true"/>
</persistenceAdapter>
4、创建第二步所对应的数据库,这里即“activemq”
5、向activeMQ的lib目录下添加所需要的jar包,点击下载所需要的jar包。
6、启动activeMQ
7、查看mysql,是否有表生成,有表生成代表成功
8、发送一条消息,查看数据库中是否有消息记录,有记录表示消息的持久化完成
9、进行消息的消费,消费之后,数据库中的记录删除
10、表说明:
activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录消费过的消息的ID。
表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,
其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
这个表用于记录哪个Broker是当前的Master Broker。
11、必要性:
只有在消息必须保证有效,且绝对不能丢失的时候。使用JDBC存储策略