天天看點

ActiveMQ 消息存儲持久化

有效的消息存儲

ActiveMQ提供了一個插件式的消息存儲,類似于消息的多點傳播,主要實作了如下幾種:

1:AMQ消息存儲-基于檔案的存儲方式,是以前的預設消息存儲

2:KahaDB消息存儲-提供了容量的提升和恢複能力,是現在的預設存儲方式

3:JDBC消息存儲-消息基于JDBC存儲的

4:Memory 消息存儲-基于記憶體的消息存儲

KahaDB Message Store概述

KahaDB是目前預設的存儲方式,可用于任何場景,提高了性能和恢複能力。消息存儲使用一個事務日志和僅僅用一個索引檔案來存儲它所有的位址。

KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在Kaha中,資料被追加到data logs中。當不再需要log檔案中的資料的時候,log檔案會被丢棄。

 KahaDB基本配置例子

<persistenceAdapter>

<kahaDB directory="${activemq.data}/kahadb"/>

</persistenceAdapter>

可用的屬性有:

1:director:KahaDB存放的路徑,預設值activemq-data

2:indexWriteBatchSize: 批量寫入磁盤的索引page數量,預設值1000

3:indexCacheSize:記憶體中緩存索引page的數量,預設值10000

4:enableIndexWriteAsync:是否異步寫出索引,預設false

5:journalMaxFileLength:設定每個消息data log的大小,預設是32MB

6:enableJournalDiskSyncs:設定是否保證每個沒有事務的内容,被同步寫入磁盤,JMS持久化的時候需要,預設為true

7:cleanupInterval:在檢查到不再使用的消息後,在具體删除消息前的時間,預設30000

8:checkpointInterval:checkpoint的間隔時間,預設5000

9:ignoreMissingJournalfiles:是否忽略丢失的消息日志檔案,預設false

10:checkForCorruptJournalFiles:在啟動的時候,将會驗證消息檔案是否損壞,預設false

11:checksumJournalFiles:是否為每個消息日志檔案提供checksum,預設false

12:archiveDataLogs: 是否移動檔案到特定的路徑,而不是删除它們,預設false

13:directoryArchive:定義消息已經被消費過後,移動data log到的路徑,預設null

14:databaseLockedWaitDelay:獲得資料庫鎖的等待時間 (used by shared master/slave),預設10000

15:maxAsyncJobs:設定最大的可以存儲的異步消息隊列,預設值10000,可以和concurrentMessageProducers 設定成一樣的值

16:concurrentStoreAndDispatchTransactions:是否分發消息到用戶端,同時事務存儲消息,預設true

17:concurrentStoreAndDispatchTopics:是否分發Topic消息到用戶端,同時進行存儲,預設true

18:concurrentStoreAndDispatchQueues:是否分發queue消息到用戶端,同時進行存儲,預設true

AMQ Message Store概述

AMQ Message Store是ActiveMQ5.0預設的持久化存儲,它是一個基于檔案、事務存儲設計為快速消息存儲的一個結構,該結構是以流的形式來進行消息互動的。這種方式中,Messages被儲存到data logs中,同時被reference store進行索引以提高存取速度。Date logs由一些單獨的data log檔案組成,預設的檔案大小是32M,如果某個消息的大小超過了data log檔案的大小,那麼可以修改配置以增加data log檔案的大小。如果某個data log檔案中所有的消息都被成功消費了,那麼這個data log檔案将會被标記,以便在下一輪的清理中被删除或者歸檔。

AMQ Message Store配置示例

<broker brokerName="broker" persistent="true" useShutdownHook="false">

 <persistenceAdapter>

   <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>

 </persistenceAdapter>

</broker>

使用JDBC來持久化消息

ActiveMQ支援使用JDBC來持久化消息,預定義的表如下:

1:消息表,預設表名為ACTIVEMQ_MSGS,queue和topic都存在裡面,結構如下:

ActiveMQ 消息存儲持久化

2:ACTIVEMQ_ACKS表存儲持久訂閱的資訊和最後一個持久訂閱接收的消息ID,結構如下:

ActiveMQ 消息存儲持久化

3:鎖定表,預設表名為ACTIVEMQ_LOCK,用來確定在某一時刻,隻能有一個ActiveMQ broker執行個體來通路資料庫 ,結構如下:

ActiveMQ 消息存儲持久化

使用JDBC來持久化消息的配置示例

<beans>

<broker brokerName="test-broker" persistent=true xmlns="http://activemq.apache.org/schema/core">

 <persistenceAdapter>

  <jdbcPersistenceAdapter dataSource=“#mysql-ds"/>

 </persistenceAdapter>

</broker>

<bean name="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

 <property name="driverClassName"><value>org.gjt.mm.mysql.Driver</value></property>

 <property name="url"><value>jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&amp;characterEncodi

  ng=UTF-8</value></property>

 <property name="username"> <value>root</value> </property>

 <property name="password" value="cc"/>

</bean>

JDBC Message Store with ActiveMQ Journal

這種方式克服了JDBC Store的不足,使用快速的緩存寫入技術,大大提高了性能。 配置示例如下:

<beans>

<broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">

<persistenceFactory>

 <journalPersistenceAdapterFactory

  journalLogFiles="4"

  journalLogFileSize="32768"

  useJournal="true"

  useQuickJournal="true"

  dataSource="#mysql-ds"

  dataDirectory="activemq-data" />

</persistenceFactory>

</broker>

</beans>

JDBC Store和JDBC Message Store with ActiveMQ Journal的差別

1:Jdbc with journal的性能優于jdbc

2:Jdbc用于master/slave模式的資料庫分享

3:Jdbc with journal不能用于master/slave模式

4:一般情況下,推薦使用jdbc with journal

Memory Message Store

記憶體消息存儲主要是存儲所有的持久化的消息在記憶體中。這裡沒有動态的緩存存在,是以你必須注意設定你的broker所在的JVM和記憶體限制

Memory Message Store配置示例

<beans>

<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">

<transportConnectors>

 <transportConnector uri="tcp://localhost:61635"/>

</transportConnectors>

</broker>

</beans>