天天看點

關于ActiveMQ的問題分析

      目前常用的消息隊列組建無非就是MSMQ和ActiveMQ,至于他們的異同,這裡不想做過多的比較。簡單來說,MSMQ内置于微軟作業系統之中,在部署上包含一個隐性條件:Server需要是微軟作業系統。(對于這點我并去調研過MSMQ是否可以部署在非微軟系統,比如:Linux,隻是拍腦袋想了想,感覺上是不可以)。對于ActiveMQ,微軟系統和Linux都是可以部署的。從功能方面來說,一般最常用的就是:消息的收/發,感覺差異不大。從性能上來說,一般的說法是ActiveMQ略高。在穩定性上,個人感覺MSMQ更好。如果這兩種常用隊列都用過的同學,應該來說最大的差異在于:MSMQ如果要通路遠端隊列(比如機器A上的程式通路機器B上的隊列),會比較惡心。在資料量比較大的情況之下,一般來說隊列伺服器會專門的一台或者多台(多台的話,用程式去做熱備+負載比較友善,也不需要額外的硬體成本。簡單來說做法可以這樣:消息發送的時候随機向着多台隊列伺服器上發送消息;接受的時候開多個線程去分别監聽;熱備方面,可以維護一個帶狀态的隊列連接配接池,如果消息收發失敗那麼将狀态置為不可用,然後起一個線程去定時監測壞的連接配接是否可用,這個過程一般情況下可以不用加鎖,為什麼,大家根據各自需要去取舍吧)。最近搞完了短彩信的網關連接配接服務,這兩種隊列我均使用了。大緻的過程是這樣的:上層應用如果要發端彩信,那麼将消息直接發送至ActiveMQ(目前用的就是上面說的多台熱備+負載,因為實際中下行量非常大5千萬條/天以上),然後端彩信網關連接配接服務部署多套,每套均依賴本機的MSMQ。為什麼呢?用ActiveMQ的原因是:上層應用程式和網關連接配接服務彼此獨立,消息需要跨機通路。用MSMQ的原因是:ActiveMQ中的資料是一條不分省的大隊列,網關連接配接服務需要按省流控,是以端彩信網關連接配接服務:首先把消息從ActiveMQ取出來,然後存至本機上的分省MSMQ,這樣做另外的一個好處就是:ActiveMQ不至于過多擠壓,他的資料會分攤到N台短彩信網關連接配接服務所部署的機器上的MSMQ之中,也就說MSMQ可以起到分攤資料和緩沖的作用。

     在之前的随筆中,已經介紹過MSMQ,現在先介紹一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介紹還比較少。以下是自己總結一些相關資料,貼出來給大家共享

一)問題分析和解決

1)KahaDb和AMQ Message Store兩種持久方式如何選擇?

官方:

From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.

The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.

非官方:

kaha檔案系統實際上上是一個檔案索引系統,有兩部分組成,一個是資料檔案系統,由一個個獨立的檔案組成,預設檔案大小是32M大(可配置),另外一個是索引檔案系統,記錄消息在資料檔案中的位置資訊以及資料檔案中的空閑塊資訊。資料檔案是存儲到硬碟上的,索引檔案是緩存在記憶體中的。是以這個存儲系統對大消息存儲有利,象我們的memberId之類的文本消息,實際上是浪費,索引比消息還大,哈。

我方分析:

推薦: Amq持久方式

理由:雖然官方推薦使用KahaDB持久方式,但其提到的優勢:可伸縮性和恢複性較好,對于我們實際的應用意義不大。從我們自己的使用經驗來看,KahaDB持久方式,Data檔案是一個大檔案(感覺檔案過大後,造成隊列服務癱死的可能性會增大),從官網的相關配置(附錄1)也找不到哪裡可以設定資料的檔案的最大Size。)而Amq持久方式可以設定Data檔案最大Size,這樣可以保證即時消息積壓很多,Data檔案也不至于過大。

2)錯誤:Channel was inactive for too long

解決方法:

在建立連接配接的Uri中加入: wireFormat.maxInactivityDuration=0

參考資源:

http://jinguo.iteye.com/blog/243153

You can do the following to fix the issues:

1) Append max inactivity duration to your Uri in the format below: wireFormat.maxInactivityDuration=0

2) Use the same Uri at the client side as well as at the server side

Regards,

如果不這樣設定,對應的錯誤會出現:

2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616

org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616

ActiveMQ的tcp url:tcp://localhost:61616後面要加入?wireFormat.maxInactivityDuration=0 這樣的參數,否則當一段時間沒有消息發送時會抛出 "Channel was inactive for too long"異常

3)錯誤:Wire format negotiation timeout: peer did not send his wire format.

1)關閉ActiveMqLog4j

打開:conf/log4j.properties

将:log4j.rootLogger=INFO, console, logfile

修改為:log4j.rootLogger=OFF

2)在建立連接配接的Uri中加入: maxInactivityDurationInitalDelay=30000

例如北京的測試環境連接配接Uri:

tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&maxInactivityDurationInitalDelay=30000&connection.AsyncSend=true

http://activemq.apache.org/javaxjmsjmsexception-wire-format-negociation-timeout-peer-did-not-send-his-wire-format.html

If you get exception like this,it can mean one of three things:

1. You're connecting to the port not used by ActiveMQ TCP transport

Make sure to check that you're connecting to the appropriate host:port

2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages

Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender

3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time

If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat.maxInactivityDurationInitalDelay property on the connection URL in your client.

For example

tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000

will use 30 sec timeout.(貌似有問題!!!)

4)錯誤:Out of memory

1)  設定Java最大記憶體限制為合适大小:

Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(預設是512)

2)Activemq.xml配置節:systemUsage/ systemUsage配置大小合适,并且特别注意:大于所有durable desitination設定的memoryUsage之和。

備注:

1)尖括号:“>”代表通配符

2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有durable desitination設定之和

3)SystemUsage配置設定了一些系統記憶體和硬碟容量,當系統消耗超過這些容量設定時,amq會“slow down producer”,還是很重要的。

參考資料:

http://m.oschina.net/blog/26216

參考-- http://activemq.apache.org/javalangoutofmemory.html

       對于MQ的内容實用是可管理和可配置的。首先需要判斷的是MQ的哪部分系統因記憶體不足而導緻洩漏,是JVM,broker還是消費者、生産者?

       一、記憶體管理

       JVM記憶體管理:

       1. 用bin/activemq指令在獨立JVM中運作broker。用-Xmx和-Xss指令即可(activemq.bat檔案中修改ACTIVEMQ_OPTS選項參數即可);

       2. 預設情況下,MQ用512M的JVM;

       broker記憶體管理:

       1. broker使用的記憶體并不是由JVM的記憶體決定的。雖然受到JVM的限制,但broker确實獨立管理器記憶體;

       2. systemUsage和destination的記憶體限制與broker記憶體息息相關;

       3. MQ中記憶體的關系是:JVM->Broker->broker features;

       4. 所有destination的記憶體總量不能超過broker的總記憶體;

       消費者:

       1. 由于消息大小可以配置,prefetch limit往往是導緻記憶體溢出的主要原因;

       2. 減少prefetch limit的大小,會減少消費者記憶體中存儲的消息數量;

       生産者:

       1. 除非消息數量超過了broker資源的限制,否則生産者不會導緻記憶體溢出;

       2. 當記憶體溢出後,生産者會收到broker的阻塞資訊提示;

       二、其他

       将消息緩沖之硬碟:

       1. 隻有當消息在記憶體中存儲時,才允許消息的快速比對與分發,而當消費者很慢或者離開時,記憶體可能會耗盡;

       2. 當destination到達它的記憶體臨界值時,broker會用消息遊标來緩存非持久化的消息到硬碟。

       3. 臨界值在broker中通過memoryUsage和systemUsage兩個屬性配置,請參考activemq.xml;

       4. 對于緩慢的消費者,當尚未耗盡記憶體或者轉變為生産者并發控制模式前,這個特性允許生産者繼續發送消息到broker;

       5. 當有多個destination的時候,預設的記憶體臨界值可能被打破,而這種情況将消息緩存到硬碟就顯得很有意義;

       6. precentUsage配置:使用百分比來控制記憶體使用情況;

       多個線程:

       1. 預設情況下,MQ每個destination都對應唯一的線程;

       2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat檔案中修改ACTIVEMQ_OPTS選項參數即可),用線程池來限制線程的數量,進而減少記憶體消耗;

       大資料傳輸:

       1. destination policies--maxPageSize:控制進入記憶體中的消息數量;lazyDispatch:增加控制使用目前消費者清單的預取值;

       2. 使用blogMessage或者streamsMessage類型來進行大量檔案的傳輸;

       洩漏JMS資源:

       1. 當session或者producer或者consumer大量存在而沒有關閉的時候;

       2. 使用PooledConnectionFactory;

5)采用failover方式連接配接導緻卡死

不采用failover連接配接

分析:

采用failover方式連接配接,如果所要連接配接的伺服器或者Activemq服務宕了,那麼程式會一直處于等待狀态,不逾時,不報錯。

二)附錄

1)KahaDB持久配置

property name default value Comments
directory activemq-data the path to the directory to use to store the message store data and log files
indexWriteBatchSize 1000 number of indexes written in a batch
indexCacheSize 10000 number of index pages cached in memory
enableIndexWriteAsync false if set, will asynchronously write indexes
journalMaxFileLength 32mb a hint to set the maximum size of the message data logs
enableJournalDiskSyncs true ensure every non transactional journal write is followed by a disk sync (JMS durability requirement)
cleanupInterval 30000 time (ms) before checking for a discarding/moving message data logs that are no longer used
checkpointInterval 5000 time (ms) before checkpointing the journal
ignoreMissingJournalfiles If enabled, will ignore a missing message log file
checkForCorruptJournalFiles If enabled, will check for corrupted Journal files on startup and try and recover them
checksumJournalFiles create a checksum for a journal file - to enable checking for corrupted journals
Available since version 5.4:
archiveDataLogs If enabled, will move a message data log to the archive directory instead of deleting it.
directoryArchive null Define the directory to move data logs to when they all the messages they contain have been consumed.
databaseLockedWaitDelay time (ms) before trying to get acquire a the database lock (used by shared master/slave)
maxAsyncJobs the maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers)
concurrentStoreAndDispatchTopics enable the dispatching of Topic messages to interested clients to happen concurrently with message storage
concurrentStoreAndDispatchQueues enable the dispatching of Queue messages to interested clients to happen concurrently with message storage

2)AMQ持久配置

useNIO use NIO to write messages to the data logs
syncOnWrite sync every write to disk
maxFileLength
persistentIndex use a persistent index for the message logs. If this is false, an in-memory structure is maintained
maxCheckpointMessageAddSize 4kb the maximum number of messages to keep in a transaction before automatically committing
indexBinSize 1024 default number of bins used by the index. The bigger the bin size - the better the relative performance of the index
indexKeySize 96 the size of the index key - the key is the message id
indexPageSize 16kb the size of the index page - the bigger the page - the better the write performance of the index
archive the path to the directory to use to store discarded data logs
if true data logs are moved to the archive directory instead of being deleted

3)systemUsage配置

memoryUsage 20M

amq使用記憶體大小,照amq論壇上說,這個值應該大于所有durable desitination設定的

memoryUsage之和,否則會導緻硬碟swap,影響性能。

storeUsage 1G kaha資料存儲大小,如果設定不足,性能會下降到1個1個發
tempUsage 100M 非persistent的消息存儲在temp區域

4)其他配置

http://activemq.apache.org/nms/configuring.html

4.1)Failover Transport Options

Option Name Default Description
transport.timeout -1 Time that a send operation blocks before failing.
transport.initialReconnectDelay 10 Time in Milliseconds that the transport waits before attempting to reconnect the first time.
transport.maxReconnectDelay The max time in Milliseconds that the transport will wait before attempting to reconnect.
transport.backOffMultiplier 2 The amount by which the reconnect delay will be multiplied by if useExponentialBackOff is enabled.
transport.useExponentialBackOff Should the delay between connection attempt grow on each try up to the max reconnect delay.
transport.randomize Should the Uri to connect to be chosen at random from the list of available Uris.
transport.maxReconnectAttempts Maximum number of time the transport will attempt to reconnect before failing (0 means infinite retries)
transport.startupMaxReconnectAttempts Maximum number of time the transport will attempt to reconnect before failing when there has never been a connection made. (0 means infinite retries) (included in NMS.ActiveMQ v1.5.0+)
transport.reconnectDelay The delay in milliseconds that the transport waits before attempting a reconnection.
transport.backup Should the Failover transport maintain hot backups.
transport.backupPoolSize 1 If enabled, how many hot backup connections are made.
transport.trackMessages keep a cache of in-flight messages that will flushed to a broker on reconnect
transport.maxCacheSize 256 Number of messages that are cached if trackMessages is enabled.
transport.updateURIsSupported Update the list of known brokers based on BrokerInfo messages sent to the client.

4.2)Connection Options

connection.AsyncSend Are message sent Asynchronously.
connection.AsyncClose Should the close command be sent Asynchronously
connection.AlwaysSyncSend Causes all messages a Producer sends to be sent Asynchronously.
connection.CopyMessageOnSend Copies the Message objects a Producer sends so that the client can reuse Message objects without affecting an in-flight message.
connection.ProducerWindowSize The ProducerWindowSize is the maximum number of bytes in memory that a producer will transmit to a broker before waiting for acknowledgement messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends where the client is responsible for managing memory usage. The default value of 0 means no flow control at the client. See also Producer Flow Control
connection.useCompression Should message bodies be compressed before being sent.
connection.sendAcksAsync Should message acks be sent asynchronously
connection.messagePrioritySupported Should messages be delivered to the client based on the value of the Message Priority header.
connection.dispatchAsync Should the broker dispatch messages asynchronously to the connection's consumers.

4.3)OpenWire Options

wireFormat.stackTraceEnabled Should the stack trace of exception that occur on the broker be sent to the client? Only used by openwire protocol.
wireFormat.cacheEnabled Should commonly repeated values be cached so that less marshalling occurs? Only used by openwire protocol.
wireFormat.tcpNoDelayEnabled Does not affect the wire format, but provides a hint to the peer that TCP nodelay should be enabled on the communications Socket. Only used by openwire protocol.
wireFormat.sizePrefixDisabled Should serialized messages include a payload length prefix? Only used by openwire protocol.
wireFormat.tightEncodingEnabled Should wire size be optimized over CPU usage? Only used by the openwire protocol.
wireFormat.maxInactivityDuration The maximum inactivity duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to appear to die, so we allow the broker to kill connections if they are inactive for a period of time. Use by some transports to enable a keep alive heart beat feature. Set to a value <= 0 to disable inactivity monitoring.
maxInactivityDurationInitalDelay The initial delay in starting the maximum inactivity checks (and, yes, the word 'Inital' is supposed to be misspelled like that)

5)安全配置

1、控制台安全配置,打開conf/jetty.xml檔案,找到

    <bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">

        <property name="name" value="BASIC" />

        <property name="roles" value="admin" />

        <property name="authenticate" value="false" />

    </bean>

   将“false”改為“true”即可。使用者名和密碼存放在conf/jetty-realm.properties檔案中。

2、生産者和消費者連接配接MQ需要密碼

   打開conf/activemq.xml檔案,在<broker>标簽裡的<systemUsage>标簽前加入:

   <plugins>  

<simpleAuthenticationPlugin>  

<users>  

<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>  

</users>  

</simpleAuthenticationPlugin>  

   </plugins>

 注意必須在<systemUsage>标簽前,否則啟動ActiveMQ會報錯。

使用者名和密碼存放在conf/credentials.properties檔案中

轉載:http://www.cnblogs.com/CopyPaster/archive/2012/04/27/2473179.html

JMS