天天看點

ActiveMQ的最簡單應用-隊列消息

有一段時間不使用JMS了。現在的項目又有可能需要應用JMS,來提高服務品質和提高系統資源的使用率。

提高服務品質,主要是保證不間斷的服務。用JMS伺服器接收任務,排成隊列。應用服務可以暫停做維護,不影響接收的任務。應用服務運作後,再從隊列中擷取任務。

提高系統資源的使用率,主要是任務的派發不是24小時平均的,而是高峰時期任務量很多,比如1秒1000多個,有的時候很低,比如十幾秒鐘才來一個。應用服務通過JMS隊列一個一個的取任務,做完一個再領一個,使系統資源的運用趨于平均。而JMS,比如JMS接收消息的效率是很高的,比如ActiveMQ,在賽揚(2.40GHz)機器上能夠達到2000/s,消息大小為1-2k。好一些的伺服器可以達到2萬以上/秒。

ActiveMQ是開源免費的JMS實作,并且有很多擴充功能。位址:

​​http://activemq.apache.org​​

安裝和運作很簡單,下載下傳後解壓縮,執行:

activemq.bat

将啟動預設配置的activemq。可以通過:

​​http://localhost:8161/admin​​

通路activemq的管理界面。檢視隊列(queue)和主題(topic)下的消息内容。

編寫ActiveMQ程式前需要準備類庫,如果使用maven就簡單了:

<
   dependency
   >
   
  
   <
   groupId
   >
   org.apache.activemq
   </
   groupId
   >
   
  
   <
   artifactId
   >
   activemq-core
   </
   artifactId
   >
   
  
   <
   version
   >
   5.2.0
   </
   version
   >
   

   </
   dependency
   >      

編寫最簡單的向JMS隊列發送文本消息的程式:

ActiveMQConnectionFactory connectionFactory 
   =
    
   new
    ActiveMQConnectionFactory(
        
   "
   tcp://localhost:61616
   "
   );
Connection connection 
   =
    connectionFactory.createConnection();
connection.start();
System.out.println(
   "
   start...
   "
   );

Session session 
   =
    connection.createSession(
   true
   ,
        Session.AUTO_ACKNOWLEDGE);
Queue destination 
   =
    session.createQueue(
   "
   example.C
   "
   );

MessageProducer producer 
   =
    session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

TextMessage message 
   =
    session.createTextMessage(
   "
   中文
   "
   );
producer.send(message);
session.commit();

connection.close();
System.out.println(
   "
   send text ok.
   "
   );      
ActiveMQConnectionFactory connectionFactory 
   =
    
   new
    ActiveMQConnectionFactory(
        
   "
   tcp://localhost:61616
   "
   );
Connection connection 
   =
    connectionFactory.createConnection();
connection.start();

   final
    Session session 
   =
    connection.createSession(
   true
   ,
        Session.AUTO_ACKNOWLEDGE);
Queue destination 
   =
    session.createQueue(
   "
   example.C
   "
   );

MessageConsumer consumer 
   =
    session.createConsumer(destination);
consumer.setMessageListener(
   new
    MessageListener(){
    @Override
    
   public
    
   void
    onMessage(Message message) {
        System.out.println(message);
        
   try
    {
            session.commit();
        } 
   catch
    (JMSException e) {
            e.printStackTrace();
        }
    }
});      

繼續閱讀