天天看點

ActiveMQ 筆記(六)ActiveMQ的消息存儲和持久化

個人部落格網:https://wushaopei.github.io/    (你想要這裡多有)

一、持久化機制

1、Activemq持久化

1.1 什麼是持久化:

持久化就是高可用的機制,即使伺服器當機了,消息也不會丢失           

1.2 持久化的作用

将MQ 收到的消息存儲到檔案、硬碟、資料庫 等、 則叫MQ 的持久化,這樣即使伺服器當機,消息在本地還是有,仍就可以通路到。           

詳情——官網 : http://activemq.apache.org/persistence

1.3 ActiveMQ 支援的消息持久化機制:

  • 為了避免意外當機以後丢失資訊,需要做到重新開機後可以恢複消息隊列,消息系統一半都會采用持久化機制。
  • ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,無論使用哪種持久化方式,消息的存儲邏輯都是一緻的。
  • 就是在發送者将消息發送出去後,消息中心首先将消息存儲到本地資料檔案、記憶體資料庫或者遠端資料庫等。再試圖将消息發給接收者,成功則将消息從存儲中删除,失敗則繼續嘗試嘗試發送。
  • 消息中心啟動以後,要先檢查指定的存儲位置是否有未成功發送的消息,如果有,則會先把存儲位置中的消息發出去。
注意:一句話:ActiveMQ當機了,消息不會丢失的機制。

二、持久化方式

1、Activemq的持久化方式有幾種

1.1 AMQ Mesage Store(了解)

AMQ是一種檔案存儲形式,它具有寫入速度快和容易恢複的特點。消息存儲再一個個檔案中檔案的預設大小為32M,當一個檔案中的消息已經
全部被消費,那麼這個檔案将被辨別為可删除,在下一個清除階段,這個檔案被删除。AMQ适用于ActiveMQ5.3之前的版本           

注意:基于檔案的存儲方式,是以前的預設消息存儲,現在不用了

1.2 KahaDB消息存儲(預設)

5.4 之後基于日志檔案的持久化插件,預設持久化插件,提高了性能和恢複能力           

KahaDB 的屬性配置 : http://activemq.apache.org/kahadb

說明:它使用一個事務日志和 索引檔案來存儲所有的位址

db-<數字>.log 存儲資料,一個存滿會再次建立 db-2 db-3 …… ,當不會有引用到資料檔案的内容時,檔案會被删除或歸檔

db.data 是一個BTree 索引,索引了消息資料記錄的消息,是消息索引檔案,它作為索引指向了 db-<x>.log 裡的消息

一點題外話:就像mysql 資料庫,建立一張表,就有這個表對應的 .MYD 檔案,作為它的資料檔案,就有一個 .MYI 作為索引檔案。

db.free 存儲空閑頁 ID 有時會被清除

db.redo 當 KahaDB 消息存儲在強制退出後啟動,用于恢複 BTree 索引

lock 顧名思義就是鎖

四類檔案+一把鎖 ==》 KahaDB           

1.3 JDBC消息存儲

消息基于JDBC存儲的           

1.4 LevelDB消息存儲(了解)

希望作為以後的存儲引擎,5.8 以後引進,也是基于檔案的本地資料存儲形式,但是比 KahaDB 更快           

它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自帶的 LeavelDB 索引

這種檔案系統是從ActiveMQ5.8之後引進的,它和KahaDB非常相似,也是基于檔案的本地資料庫存儲形式,但是它提供比KahaDB更快的持久性。

但它不使用自定義B-Tree實作來索引獨寫日志,而是使用基于LevelDB的索引

題外話:為什麼LeavelDB 更快,并且5.8 以後就支援,為什麼還是預設 KahaDB 引擎,因為 activemq 官網本身沒有定論,LeavelDB 之後又出了可複制的LeavelDB 比LeavelDB 更性能更優越,但需要基于 Zookeeper 是以這些官方還沒有定論,任就使用 KahaDB

預設配置如下:

<persistenceAdapter>
      <levelDB directory="activemq-data"/>
</persistenceAdapter>           

1.5 JDBC Message Store with ActiveMQ Journal

2、JDBC存儲消息(重點)

JDBC : 有一部分資料會真實的存儲到資料庫中 
       使用JDBC 的持久化,           

①修改配置檔案,預設 kahaDB

修改之前:

<persistenceAdapter>

       <kahaDB directory="${activemq.data}/kahadb"/>  

 </persistenceAdapter>           

修改之後:

<persistenceAdapter>

      <jdbcPersistenceAdapter dataSource="#mysql-ds"/>

 </persistenceAdapter>           

②在activemq 的lib 目錄下添加 jdbc 的jar 包 (connector.jar 我使用5.1.41 版本)

③ 修改配置檔案 : activemq.xml 使其連接配接自己windows 上的資料庫,并在本地建立名為activemq 的資料庫

④ 讓linux 上activemq 可以通路到 mysql ,之後産生消息。

ActiveMQ 啟動後會自動在 mysql 的activemq 資料庫下建立三張表:activemq_msgs 、activemq_acks、activemq_lock

activemq_acks:用于存儲訂閱關系。如果是持久化Topic,訂閱者和伺服器的訂閱關系在這個表儲存

activemq_lock:在叢集環境中才有用,隻有一個Broker可以獲得消息,稱為Master Broker

activemq_msgs:用于存儲消息,Queue和Topic都存儲在這個表中           

點對點會在資料庫的資料表 ACTIVEMQ_MSGS 中加入消息的資料,且在點對點時,消息被消費就會從資料庫中删除

但是對于主題,訂閱方式接受到的消息,會在 ACTIVEMQ_MSGS 存儲消息,即使MQ 伺服器下線,并在 ACTIVEMQ_ACKS 中存儲消費者資訊 。 并且存儲以 activemq 為主,當activemq 中的消息被删除後,資料庫中的也會自動被删除。

如果表沒生成,可能需要自己建立

-- auto-generated definition
create table ACTIVEMQ_ACKS
(
    CONTAINER     varchar(250)     not null comment '消息的Destination',
    SUB_DEST      varchar(250)     null comment '如果使用的是Static叢集,這個字段會有叢集其他系統的資訊',
    CLIENT_ID     varchar(250)     not null comment '每個訂閱者都必須有一個唯一的用戶端ID用以區分',
    SUB_NAME      varchar(250)     not null comment '訂閱者名稱',
    SELECTOR      varchar(250)     null comment '選擇器,可以選擇隻消費滿足條件的消息,條件可以用自定義屬性實作,可支援多屬性AND和OR操作',
    LAST_ACKED_ID bigint           null comment '記錄消費過消息的ID',
    PRIORITY      bigint default 5 not null comment '優先級,預設5',
    XID           varchar(250)     null,
    primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
    comment '用于存儲訂閱關系。如果是持久化Topic,訂閱者和伺服器的訂閱關系在這個表儲存';

create index ACTIVEMQ_ACKS_XIDX
    on ACTIVEMQ_ACKS (XID);

 
-- auto-generated definition
create table ACTIVEMQ_LOCK
(
    ID          bigint       not null
        primary key,
    TIME        bigint       null,
    BROKER_NAME varchar(250) null
);

 
-- auto-generated definition
create table ACTIVEMQ_MSGS
(
    ID         bigint       not null
        primary key,
    CONTAINER  varchar(250) not null,
    MSGID_PROD varchar(250) null,
    MSGID_SEQ  bigint       null,
    EXPIRATION bigint       null,
    MSG        blob         null,
    PRIORITY   bigint       null,
    XID        varchar(250) null
);

create index ACTIVEMQ_MSGS_CIDX
    on ACTIVEMQ_MSGS (CONTAINER);
create index ACTIVEMQ_MSGS_EIDX
    on ACTIVEMQ_MSGS (EXPIRATION);
create index ACTIVEMQ_MSGS_MIDX
    on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
create index ACTIVEMQ_MSGS_PIDX
    on ACTIVEMQ_MSGS (PRIORITY);
create index ACTIVEMQ_MSGS_XIDX
    on ACTIVEMQ_MSGS (XID);
           

坑:

JDBC 改進: 加入高速緩存機制 Journal

高速緩存在 activemq.xml 中的配置:

⑤代碼運作驗證

一定要開啟持久化 :  messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

(1)隊列Queue:

生産者:

。。。。。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        for (int i = 0; i < 3; i++) {

       。。。。。。。。。。。           

運作結果:

在點對點類型中

  • 當DeliveryMode設定為NON_PERSISTENCE時,消息被儲存在記憶體中
  • 當DeliveryMode設定為PERSISTENCE時,消息儲存在broker的相應的檔案或者資料庫中。

 而且點對點類型中消息一旦被Consumer消費,就從資料中删除

消費前的消息,會被存放到資料庫

上面的消息被消費後被MQ自動删除

ActiveMQ 筆記(六)ActiveMQ的消息存儲和持久化

(2)主題Topic

。。。。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("我是生産者張三");
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        for (int i = 0; i < 3; i++) {
        。。。。。。           

 在點對點類型中

ActiveMQ 筆記(六)ActiveMQ的消息存儲和持久化
ActiveMQ 筆記(六)ActiveMQ的消息存儲和持久化

3、JDBC Message store with ActiveMQ Journal(重點)

3.1 定義:

ActiveMQ 筆記(六)ActiveMQ的消息存儲和持久化

3.2 說明

這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫讀庫。

ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。

當消費者的速度能夠及時跟上生産者消息的生産速度時,journal檔案能夠大大減少需要寫入到DB中的消息。

舉個例子:

生産者生産了1000條消息,這1000條消息會儲存到journal檔案,如果消費者的消費速度很快的情況下,在journal檔案還沒有同步到DB之前,消費者已經消費了90%的以上消息,那麼這個時候隻需要同步剩餘的10%的消息到DB。如果消費者的速度很慢,這個時候journal檔案可以使消息以批量方式寫到DB。

3.3 配置方式:

原來的配置:

<persistenceAdapter> 
        <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
</persistenceAdapter>           

修改的結果:

<persistenceFactory>        
         <journalPersistenceAdapterFactory 
                     journalLogFiles="5" 
                     journalLogFileSize="32768" 
                     useJournal="true" 
                     useQuickJournal="true" 
                     dataSource="#mysql-ds" 
                     dataDirectory="../activemq-data" /> 
</persistenceFactory>           

以前是實時寫入mysql,在使用了journal後,資料會被journal處理,如果在一定時間内journal處理(消費)完了,就不寫入mysql,如果沒消費完,就寫入mysql,起到一個緩存的作用

小總結:

  1. 持久化消息是指:
  • MQ 所在的伺服器down 了消息也不會丢失

     2.持久化機制演化過程:

  • 從最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事務)附件,并且同步推出了關系型資料庫的存儲方案, ActiveMQ 5.3 版本有推出了KahaDB 的支援,(也是5.4之後的預設持久化方案),後來ActiveMQ 從5.8開始支援LevelDB ,現在5.9 提供了 Zookeeper + LevelDB 的叢集化方案。

     3. ActiveMQ 消息持久化機制有:

AMQ 基于日志檔案
KahaDB 基于日志檔案,5.4 之後的預設持久化
JDBC 基于第三方資料庫
LevelDB 基于檔案的本地資料庫存儲,從5.8 之後推出了LevelDB 性能高于 KahaDB
ReplicatedLevelDB Store

從5.8之後提供了基于LevelDB 和Zookeeper 的資料複制方式,用于Master-slave方式的首資料複制選方案

但是無論使用哪種持久化方式,消息的存儲邏輯都一樣