天天看點

ActiveMQ的幾種消息持久化機制

為了避免意外當機以後丢失資訊,需要做到重新開機後可以恢複消息隊列,消息系統一般都會采用持久化機制。

activemq的消息持久化機制有jdbc,amq,kahadb和leveldb,無論使用哪種持久化方式,消息的存儲邏輯都是一緻的。

就是在發送者将消息發送出去後,消息中心首先将消息存儲到本地資料檔案、記憶體資料庫或者遠端資料庫等,然後試圖将消息發送給接收者,發送成功則将消息從存儲中删除,失敗則繼續嘗試。

消息中心啟動以後首先要檢查指定的存儲位置,如果有未發送成功的消息,則需要把消息發送出去。

使用jdbc持久化方式,資料庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。

activemq_msgs用于存儲消息,queue和topic都存儲在這個表中。

(1)配置方式

配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml檔案,

首先定義一個mysql-ds的mysql資料源,然後在persistenceadapter節點中配置jdbcpersistenceadapter并且引用剛才定義的資料源。

1

2

3

<code>&lt;</code><code>persistenceadapter</code><code>&gt; </code>

<code>    </code><code>&lt;</code><code>jdbcpersistenceadapter</code> <code>datasource="#mysql-ds" createtablesonstartup="false" /&gt; </code>

<code>&lt;/</code><code>persistenceadapter</code><code>&gt;</code>

datasource指定持久化資料庫的bean,createtablesonstartup是否在啟動的時候建立資料表,預設值是true,這樣每次啟動都會去建立資料表了,一般是第一次啟動的時候設定為true,之後改成false。

使用mysql配置jdbc持久化:

4

5

6

7

8

9

10

11

12

13

14

15

<code>&lt;</code><code>beans</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>persistenceadapter</code><code>&gt;</code>

<code>            </code><code>&lt;</code><code>jdbcpersistenceadapter</code> <code>datasource="#mysql-ds" createtablesonstartup="false"/&gt;</code>

<code>        </code><code>&lt;/</code><code>persistenceadapter</code><code>&gt;</code>

<code>    </code><code>&lt;/</code><code>broker</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>bean</code> <code>id="mysql-ds" class="org.apache.commons.dbcp.basicdatasource" destroy-method="close"&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name="driverclassname" value="com.mysql.jdbc.driver"/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name="username" value="activemq"/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name="password" value="activemq"/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name="maxactive" value="200"/&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name="poolpreparedstatements" value="true"/&gt;</code>

<code>    </code><code>&lt;/</code><code>bean</code><code>&gt;</code>

<code>&lt;/</code><code>beans</code><code>&gt;</code>

 

(2)資料庫表資訊

activemq_msgs用于存儲消息,queue和topic都存儲在這個表中:

id:自增的資料庫主鍵

container:消息的destination

msgid_prod:消息發送者用戶端的主鍵

msg_seq:是發送消息的順序,msgid_prod+msg_seq可以組成jms的messageid

expiration:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數

msg:消息本體的java序列化對象的二進制資料

priority:優先級,從0-9,數值越大優先級越高

activemq_acks用于存儲訂閱關系。如果是持久化topic,訂閱者和伺服器的訂閱關系在這個表儲存:

主要的資料庫字段如下:

sub_dest:如果是使用static叢集,這個字段會有叢集其他系統的資訊

client_id:每個訂閱者都必須有一個唯一的用戶端id用以區分

sub_name:訂閱者名稱

selector:選擇器,可以選擇隻消費滿足條件的消息。條件可以用自定義屬性實作,可支援多屬性and和or操作

last_acked_id:記錄消費過的消息的id。

表activemq_lock在叢集環境中才有用,隻有一個broker可以獲得消息,稱為master broker,

其他的隻能作為備份等待master broker不可用,才可能成為下一個master broker。

這個表用于記錄哪個broker是目前的master broker。

性能高于jdbc,寫入消息時,會将消息寫入日志檔案,由于是順序追加寫,性能很高。為了提升性能,建立消息主鍵索引,并且提供緩存機制,進一步提升性能。每個日志檔案的大小都是有限制的(預設32m,可自行配置)。

當超過這個大小,系統會重建立立一個檔案。當所有的消息都消費完成,系統會删除這個檔案或者歸檔(取決于配置)。

主要的缺點是amq message會為每一個destination建立一個索引,如果使用了大量的queue,索引檔案的大小會占用很多磁盤空間。

而且由于索引巨大,一旦broker崩潰,重建索引的速度會非常慢。

配置片段如下:

<code>&lt;</code><code>persistenceadapter</code><code>&gt;</code>

<code>     </code><code>&lt;</code><code>amqpersistenceadapter</code> <code>directory="${activemq.data}/activemq-data" maxfilelength="32mb"/&gt;</code>

雖然amq性能略高于下面的kaha db方式,但是由于其重建索引時間過長,而且索引檔案占用磁盤空間過大,是以已經不推薦使用。

kahadb是從activemq 5.4開始預設的持久化插件,也是我們項目現在使用的持久化方式。

kahadb恢複時間遠遠小于其前身amq并且使用更少的資料檔案,是以可以完全代替amq。

kahadb的持久化機制同樣是基于日志檔案,索引和緩存。

配置方式:

<code>    </code><code>&lt;</code><code>kahadb</code> <code>directory="${activemq.data}/activemq-data" journalmaxfilelength="16mb"/&gt;</code>

<code>directory : 指定持久化消息的存儲目錄</code>

<code>journalmaxfilelength : 指定儲存消息的日志檔案大小,具體根據你的實際應用配置  </code>

(1)kahadb主要特性

1、日志形式存儲消息;

2、消息索引以b-tree結構存儲,可以快速更新;

3、完全支援jms事務;

4、支援多種恢複機制;

(2)kahadb的結構

消息存儲在基于檔案的資料日志中。如果消息發送成功,變标記為可删除的。系統會周期性的清除或者歸檔日志檔案。

消息檔案的位置索引存儲在記憶體中,這樣能快速定位到。定期将記憶體中的消息索引儲存到metadata store中,避免大量消息未發送時,消息索引占用過多記憶體空間。

ActiveMQ的幾種消息持久化機制

data logs:

data logs用于存儲消息日志,消息的全部内容都在data logs中。

同amq一樣,一個data logs檔案大小超過規定的最大值,會建立一個檔案。同樣是檔案尾部追加,寫入性能很快。

每個消息在data logs中有計數引用,是以當一個檔案裡所有的消息都不需要了,系統會自動删除檔案或放入歸檔檔案夾。

metadata cache :

緩存用于存放線上消費者的消息。如果消費者已經快速的消費完成,那麼這些消息就不需要再寫入磁盤了。

btree索引會根據messageid建立索引,用于快速的查找消息。這個索引同樣維護持久化訂閱者與destination的關系,以及每個消費者消費消息的指針。

metadata store 

在db.data檔案中儲存消息日志中消息的中繼資料,也是以b-tree結構存儲的,定時從metadata cache更新資料。metadata store中也會備份一些在消息日志中存在的資訊,這樣可以讓broker執行個體快速啟動。

即便metadata store檔案被破壞或者誤删除了。broker可以讀取data logs恢複過來,隻是速度會相對較慢些。

從activemq 5.6版本之後,又推出了leveldb的持久化引擎。

目前預設的持久化方式仍然是kahadb,不過leveldb持久化性能高于kahadb,可能是以後的趨勢。

在activemq 5.9版本提供了基于leveldb和zookeeper的資料複制方式,用于master-slave方式的首選資料複制方案。