個人部落格網:https://wushaopei.github.io/ (你想要這裡多有)
一、持久化機制
1、Activemq持久化
1.1 什麼是持久化:
持久化就是高可用的機制,即使伺服器當機了,消息也不會丢失
1.2 持久化的作用
将MQ 收到的消息存儲到檔案、硬碟、資料庫 等、 則叫MQ 的持久化,這樣即使伺服器當機,消息在本地還是有,仍就可以通路到。
詳情——官網 : http://activemq.apache.org/persistence
1.3 ActiveMQ 支援的消息持久化機制:
- 為了避免意外當機以後丢失資訊,需要做到重新開機後可以恢複消息隊列,消息系統一半都會采用持久化機制。
- ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,無論使用哪種持久化方式,消息的存儲邏輯都是一緻的。
- 就是在發送者将消息發送出去後,消息中心首先将消息存儲到本地資料檔案、記憶體資料庫或者遠端資料庫等。再試圖将消息發給接收者,成功則将消息從存儲中删除,失敗則繼續嘗試嘗試發送。
- 消息中心啟動以後,要先檢查指定的存儲位置是否有未成功發送的消息,如果有,則會先把存儲位置中的消息發出去。
注意:一句話:ActiveMQ當機了,消息不會丢失的機制。
二、持久化方式
1、Activemq的持久化方式有幾種
1.1 AMQ Mesage Store(了解)
AMQ是一種檔案存儲形式,它具有寫入速度快和容易恢複的特點。消息存儲再一個個檔案中檔案的預設大小為32M,當一個檔案中的消息已經
全部被消費,那麼這個檔案将被辨別為可删除,在下一個清除階段,這個檔案被删除。AMQ适用于ActiveMQ5.3之前的版本
注意:基于檔案的存儲方式,是以前的預設消息存儲,現在不用了
1.2 KahaDB消息存儲(預設)
5.4 之後基于日志檔案的持久化插件,預設持久化插件,提高了性能和恢複能力
KahaDB 的屬性配置 : http://activemq.apache.org/kahadb
說明:它使用一個事務日志和 索引檔案來存儲所有的位址
db-<數字>.log 存儲資料,一個存滿會再次建立 db-2 db-3 …… ,當不會有引用到資料檔案的内容時,檔案會被删除或歸檔
db.data 是一個BTree 索引,索引了消息資料記錄的消息,是消息索引檔案,它作為索引指向了 db-<x>.log 裡的消息
一點題外話:就像mysql 資料庫,建立一張表,就有這個表對應的 .MYD 檔案,作為它的資料檔案,就有一個 .MYI 作為索引檔案。
db.free 存儲空閑頁 ID 有時會被清除
db.redo 當 KahaDB 消息存儲在強制退出後啟動,用于恢複 BTree 索引
lock 顧名思義就是鎖
四類檔案+一把鎖 ==》 KahaDB
1.3 JDBC消息存儲
消息基于JDBC存儲的
1.4 LevelDB消息存儲(了解)
希望作為以後的存儲引擎,5.8 以後引進,也是基于檔案的本地資料存儲形式,但是比 KahaDB 更快
它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自帶的 LeavelDB 索引
這種檔案系統是從ActiveMQ5.8之後引進的,它和KahaDB非常相似,也是基于檔案的本地資料庫存儲形式,但是它提供比KahaDB更快的持久性。
但它不使用自定義B-Tree實作來索引獨寫日志,而是使用基于LevelDB的索引
題外話:為什麼LeavelDB 更快,并且5.8 以後就支援,為什麼還是預設 KahaDB 引擎,因為 activemq 官網本身沒有定論,LeavelDB 之後又出了可複制的LeavelDB 比LeavelDB 更性能更優越,但需要基于 Zookeeper 是以這些官方還沒有定論,任就使用 KahaDB
預設配置如下:
<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>
1.5 JDBC Message Store with ActiveMQ Journal
2、JDBC存儲消息(重點)
JDBC : 有一部分資料會真實的存儲到資料庫中
使用JDBC 的持久化,
①修改配置檔案,預設 kahaDB
修改之前:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
修改之後:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
②在activemq 的lib 目錄下添加 jdbc 的jar 包 (connector.jar 我使用5.1.41 版本)
③ 修改配置檔案 : activemq.xml 使其連接配接自己windows 上的資料庫,并在本地建立名為activemq 的資料庫
④ 讓linux 上activemq 可以通路到 mysql ,之後産生消息。
ActiveMQ 啟動後會自動在 mysql 的activemq 資料庫下建立三張表:activemq_msgs 、activemq_acks、activemq_lock
activemq_acks:用于存儲訂閱關系。如果是持久化Topic,訂閱者和伺服器的訂閱關系在這個表儲存
activemq_lock:在叢集環境中才有用,隻有一個Broker可以獲得消息,稱為Master Broker
activemq_msgs:用于存儲消息,Queue和Topic都存儲在這個表中
點對點會在資料庫的資料表 ACTIVEMQ_MSGS 中加入消息的資料,且在點對點時,消息被消費就會從資料庫中删除
但是對于主題,訂閱方式接受到的消息,會在 ACTIVEMQ_MSGS 存儲消息,即使MQ 伺服器下線,并在 ACTIVEMQ_ACKS 中存儲消費者資訊 。 并且存儲以 activemq 為主,當activemq 中的消息被删除後,資料庫中的也會自動被删除。
如果表沒生成,可能需要自己建立
-- auto-generated definition
create table ACTIVEMQ_ACKS
(
CONTAINER varchar(250) not null comment '消息的Destination',
SUB_DEST varchar(250) null comment '如果使用的是Static叢集,這個字段會有叢集其他系統的資訊',
CLIENT_ID varchar(250) not null comment '每個訂閱者都必須有一個唯一的用戶端ID用以區分',
SUB_NAME varchar(250) not null comment '訂閱者名稱',
SELECTOR varchar(250) null comment '選擇器,可以選擇隻消費滿足條件的消息,條件可以用自定義屬性實作,可支援多屬性AND和OR操作',
LAST_ACKED_ID bigint null comment '記錄消費過消息的ID',
PRIORITY bigint default 5 not null comment '優先級,預設5',
XID varchar(250) null,
primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
comment '用于存儲訂閱關系。如果是持久化Topic,訂閱者和伺服器的訂閱關系在這個表儲存';
create index ACTIVEMQ_ACKS_XIDX
on ACTIVEMQ_ACKS (XID);
-- auto-generated definition
create table ACTIVEMQ_LOCK
(
ID bigint not null
primary key,
TIME bigint null,
BROKER_NAME varchar(250) null
);
-- auto-generated definition
create table ACTIVEMQ_MSGS
(
ID bigint not null
primary key,
CONTAINER varchar(250) not null,
MSGID_PROD varchar(250) null,
MSGID_SEQ bigint null,
EXPIRATION bigint null,
MSG blob null,
PRIORITY bigint null,
XID varchar(250) null
);
create index ACTIVEMQ_MSGS_CIDX
on ACTIVEMQ_MSGS (CONTAINER);
create index ACTIVEMQ_MSGS_EIDX
on ACTIVEMQ_MSGS (EXPIRATION);
create index ACTIVEMQ_MSGS_MIDX
on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
create index ACTIVEMQ_MSGS_PIDX
on ACTIVEMQ_MSGS (PRIORITY);
create index ACTIVEMQ_MSGS_XIDX
on ACTIVEMQ_MSGS (XID);
坑:
JDBC 改進: 加入高速緩存機制 Journal
高速緩存在 activemq.xml 中的配置:
⑤代碼運作驗證
一定要開啟持久化 : messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
(1)隊列Queue:
生産者:
。。。。。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 0; i < 3; i++) {
。。。。。。。。。。。
運作結果:
在點對點類型中
- 當DeliveryMode設定為NON_PERSISTENCE時,消息被儲存在記憶體中
- 當DeliveryMode設定為PERSISTENCE時,消息儲存在broker的相應的檔案或者資料庫中。
而且點對點類型中消息一旦被Consumer消費,就從資料中删除
消費前的消息,會被存放到資料庫
上面的消息被消費後被MQ自動删除

(2)主題Topic
。。。。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("我是生産者張三");
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 0; i < 3; i++) {
。。。。。。
在點對點類型中
3、JDBC Message store with ActiveMQ Journal(重點)
3.1 定義:
3.2 說明
這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫讀庫。
ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。
當消費者的速度能夠及時跟上生産者消息的生産速度時,journal檔案能夠大大減少需要寫入到DB中的消息。
舉個例子:
生産者生産了1000條消息,這1000條消息會儲存到journal檔案,如果消費者的消費速度很快的情況下,在journal檔案還沒有同步到DB之前,消費者已經消費了90%的以上消息,那麼這個時候隻需要同步剩餘的10%的消息到DB。如果消費者的速度很慢,這個時候journal檔案可以使消息以批量方式寫到DB。
3.3 配置方式:
原來的配置:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
修改的結果:
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="5"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="../activemq-data" />
</persistenceFactory>
以前是實時寫入mysql,在使用了journal後,資料會被journal處理,如果在一定時間内journal處理(消費)完了,就不寫入mysql,如果沒消費完,就寫入mysql,起到一個緩存的作用
小總結:
- 持久化消息是指:
- MQ 所在的伺服器down 了消息也不會丢失
2.持久化機制演化過程:
- 從最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事務)附件,并且同步推出了關系型資料庫的存儲方案, ActiveMQ 5.3 版本有推出了KahaDB 的支援,(也是5.4之後的預設持久化方案),後來ActiveMQ 從5.8開始支援LevelDB ,現在5.9 提供了 Zookeeper + LevelDB 的叢集化方案。
3. ActiveMQ 消息持久化機制有:
AMQ | 基于日志檔案 |
KahaDB | 基于日志檔案,5.4 之後的預設持久化 |
JDBC | 基于第三方資料庫 |
LevelDB | 基于檔案的本地資料庫存儲,從5.8 之後推出了LevelDB 性能高于 KahaDB |
ReplicatedLevelDB Store | 從5.8之後提供了基于LevelDB 和Zookeeper 的資料複制方式,用于Master-slave方式的首資料複制選方案 但是無論使用哪種持久化方式,消息的存儲邏輯都一樣 |