天天看點

ActiveMQ 從零到最佳實踐

ActiveMQ 簡介:ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實作,盡管JMS規範出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
在學習之前我們先要知道一下幾個問題

ActiveMQ特性

⒈ 多種語言和協定編寫用戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協定: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

⒉ 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)

⒊ 對Spring的支援,ActiveMQ可以很容易内嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性

⒋ 通過了常見J2EE伺服器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上

⒌ 支援多種傳送協定:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

⒍ 支援通過JDBC和journal提供高速的消息持久化

⒎ 從設計上保證了高性能的叢集,用戶端-伺服器,點對點

⒏ 支援Ajax

⒐ 支援與Axis的整合

⒑ 可以很容易的調用内嵌JMS provider,進行測試

為什麼要用ActiveMQ?

高性能的資料分發:ActiveMQ的這個特性主要關注的是消息的吞吐率以及高效的消息投遞路由,中心思想就是在一個大的網絡中盡可能快的傳遞大量的并且快速改變的消息資料。

鑒于大量的資料和頻繁的資料資料交換負荷很高,是以這種情況下很少使用資料持久化,在失敗時丢失幾條資料也是可以接受的因為老的資料通常都不再被需要了,最新的資料才是真正我們關心的。

叢集和通用的異步消息模型:這種特性重點在網絡延遲和速度,當實作一個web或者EJB叢集的時候,目的是維護一個node叢集,典型的是使用多點廣播來discovery&keep-alive然後使用socket直接連接配接這些node來進行高效的通信。

這和使用JMS provider在EJB-Style或者WS-style的服務中作為RMI層是很相似的,都能使用多點廣播來discovery&keep-alive并且使用socket直接連接配接通信以減少延遲。是以與其使用不同的伺服器來協調client之間的通信,不如讓client直接和彼此通信來減少延遲。

Ps: 此段主要講的是activeMQ的node之間會有高效的異步通信機制,網絡延遲小并且高效

網絡資料流:這種特性關注點是activeMQ的ajax支援,越來越多的人希望資料流能實時的傳遞到網絡浏覽器中,例如金融行業的股票價格資料,實時的在展示IM會話,實時拍賣并且動态更新内容和消息。

鑒于這種情況,我們把ActiveMQ內建到了web容器中來提供封閉的網絡內建,使用HTTP POSTS來釋出消息并且在js中通過HTTP GET來接受并展示消息。

簡易的使用HTTP來傳遞消息的API:ActiveMQ這種特性主要關注跨語言跨技術的連接配接能力,我們為message broker提供了一個HTTP接口允許跨語言或者技術來進行簡單的發送和接受消息。使用HTTP POST将消息發送到message broker,使用HTTP GET從message broker擷取消息,使用URI并且指定參數來決定接受/發送的目的地。

什麼場景下使用了ActiveMQ?

在分布式應用的環境中,多個獨立運作的系統之間,需要互相調用,互相通訊的場景下使用。

在我們的商城中有很多地方需要用到ActiveMQ通訊的,我簡單列舉幾個吧:

商品系統在确認訂單後,發送一個“生産訂單”的主題消息到ActiveMQ,訂單系統收到該主題消息後會做相應的生産訂單及相關的所有業務操作。

在支付界面中,點選确認支付,調用第三方支付接口,支付成功後回調我們的支付系統,然後支付系統會發送一個“支付成功”的主題消息到ActiveMQ,倉儲系統收到該主題消息後會做庫存的相關操作及其相應的業務。

在網頁靜态化中,我們修改了某些資料,那麼會發送一個“生成靜态化”的隊列消息到ActiveMQ,負責生成靜态化頁面的方法接收到該消息後,從新生成靜态化頁面,進而達到更新頁面資料的目的。

JMS

JMS(Java Messaging Service)應用程式接口,是Java平台上有關面向消息中間件(MOM)的技術規範API,它便于消息系統中的Java應用程式進行消息交換,并且通過提供标準的産生、發送、接收消息的接口簡化企業應用的開發,用于在兩個應用程式之間,或分布式系統中發送消息,進行異步通信。翻譯為Java消息服務。

JMS規範:

JMS定義了Java中通路消息中間件的接口,并沒有給予實作,實作JSM接口的消息中間件稱為JMS Provider,例如ActiveMQ

幾個關于JMS的概念:

1、JMS Provider:實作JMS接口和規範的消息中間件

2、JMS Message:JMS消息,分3個部分:

(1)消息頭:每個消息頭字段都有相應的getter和setter方法

(2)消息屬性:如果需要除消息頭字段以外的值,那麼可以使用消息屬性

(3)消息體:封裝具體的消息資料

3、JMS Producer:消息生産者,建立和發送JMS消息的用戶端應用

4、JMS Consumer:消息消費者,接收和處理JMS消息的用戶端應用

5、JMS Domain:消息傳遞域,JMS規範中定義了兩種消息傳遞域:

(1)點對點(point-to-point,簡寫PTP或P2P)消息傳遞域,該消息傳遞域發送的消息目的地稱為隊列(queue)

特點:

a、每個消息隻能有一個消費者

b、消息的生産者和消費者之間沒有時間上的相關性,無論消息消費者在提取消息的時候,消息生産者是否處于運作狀态,消息消費者還是可以提取消息

(2)釋出/訂閱(publish/subscribe,簡寫pub/sub)消息傳遞域,該消息傳遞域發送的消息目的地稱為主題(topic)

a、每個消息可以有多個消費者

b、生産者和消費者之間有時間上的相關性,訂閱一個主題的消費者隻能消費自它訂閱之後釋出的消息。

ActiveMQ和JMS是什麼關系?

JMS和ActiveMQ是接口和實作的關系,ActiveMQ完全支援JMS1.1規範的JMS Provider實作的消息中間件(MOM)

JMS的消息傳遞域有什麼?各自有什麼特點?簡單列舉一下各自的一些使用場景

JMS的消息傳遞域有點對點(point-to-point,簡寫PTP或P2P)消息傳遞域和釋出/訂閱(publish/subscribe,簡寫pub/sub)消息傳遞域。

點對點消息傳遞域的特點:每個消息隻能有一個消費者并且消息的生産者和消費者之間沒有時間上的相關性,無論消息消費者在提取消息的時候,消息生産者是否處于運作狀态,消息消費者還是可以提取消息

釋出/訂閱消息傳遞域的特點:每個消息可以有多個消費者并且生産者和消費者之間有時間上的相關性,訂閱一個主題的消費者隻能消費自它訂閱之後釋出的消息。

ActiveMQ的原理是什麼?

首先需要建立連接配接工廠(ConnectionFactory ),用于連接配接ActiveMQ的服務端,其中ConnectionFactory 又有兩個子類分别是隊列連接配接工廠(QueueConnectionFactory)和主題連接配接工廠(TopicConnectionFactory),在我們發送消息的時候具體使用哪個連接配接工廠是由消息發送的目的地(Destination)來決定的,Destination是消息生産者的消息發送目标或者說消息消費者的消息來源。對于消息生産者來說,它的Destination是某個隊列(Queue)或某個主題(Topic),對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)

然後在确定了使用哪種連接配接工廠後,就通過該連接配接工廠建立出連接配接對象(Connection),該連接配接對象是對TCP/IP socket的包裝。連接配接對象可以産生多個會話(session),通過會話來發送消息(Message),這就是我們發送和接受的消息。

消息中間件

消息中間件MOM(Message Oriented Middleware)是在分布式環境中,兩個或多個獨立運作的系統之間,提供消息通訊作用的中介。

ActiveMQ 從零到最佳實踐

如圖所示,假設當我們用戶端現在需要确認商品的訂單,首先需要調用商品服務的方法,商品系統服務不能直接與訂單系統通信.這時商品系統隻能通過發送消息msg1(id=xxx)到消息中間件,當訂單系統接收到消息中間件的消息msg1時就會立刻執行生成訂單的方法.這就是我們消息中間件的角色

消息中間件的作用:

把各個系統之間服務的調用以消息通訊的方式互動

消息中間件的特點:

1、消息異步接收,消息發送者不需要等待消息接收着的響應,提高整個應用程式的性能

2、消息可靠接收:消息發送出去後儲存在一個中間容器内,隻有消息接收者收到消息後才删除消息

消息中間件的主要應用場景:

在多個系統間進行通訊的時候,通常會要求:

(1)可靠傳輸,資料不能丢,有的時候,也會要求不能重複傳輸

(2)異步傳輸,否則各個系統同步發送接收資料,互相等待,造成系統性能低下

比較流行的消息中間件有哪些:

收費:IBM MQSeries,BEA WebLogic JMS Server,Oracle AQ

開源:ActiveMQ,RocketMQ,Kafka

ActiveMQ安裝使用

下載下傳ActiveMQ運作程式:

1、從官網上下載下傳運作程式:

http://activemq.apache.org/download.html

2、拷貝到你要安裝的位置直接解壓就好了

啟動ActiveMQ

Windows版:輕按兩下bin目錄下activemq.bat

Unix/Linux/Cygwin版本:1.需要使用終端指令(cd +apache-activemq-5.10.0的根目錄 )

cd /Users/zhangshuai/Library/apache-activemq-5.10.0

  1. bin/activemq start
ActiveMQ 從零到最佳實踐

然後通路位址為localhost:8161/admin

如果需要輸入賬号與密碼可以直接都輸入admin

ActiveMQ 從零到最佳實踐

在conf目錄中有個activemq.xml檔案,該檔案有3個重要設定:

1、設定ActiveMQ的消息監聽端口,預設為61616

2、設定ActiveMQ的管理界面的端口,預設為8161。ActiveMQ使用的是内嵌的jetty伺服器來運作它提供的管理界面的,該管理界面的通路位址為localhost:8161/admin

3、設定ActiveMQ的通路使用者名和密碼,預設使用者名和密碼都為admin。設定方法,添加如下配置:

<!-- 添加通路ActiveMQ的賬号密碼 -->  
<plugins>  
 <simpleAuthenticationPlugin>  
   <users>  
    <authenticationUser username="zhangshuai" password="123456" groups="users,admins"/>  
   </users>  
 </simpleAuthenticationPlugin>  
</plugins> 
           

Springboot整合ActiveMQ

在這裡我以一個簡單商城為案例

1.向商品的服務端shop-server-goods和shop-server-order的pom.xml中添加springboot-ActiveMQ依賴

<!--spring-boot的activeMQ依賴-->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>
           

2.向商品的服務端shop-server-goods和shop-server-order的application.properties添加屬性配置

在springboot應用程式中的主配置檔案application.properties檔案中添加如下ActiveMQ的屬性配置
#activeMQ部署位址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#activeMQ通路使用者名
spring.activemq.user=admin
#activeMQ通路密碼
spring.activemq.password=admin
           

主要我們的兩個服務端都連接配接到同一個ActiveMQ上了

接下來我們先示範發送Queue模型的消息(以建立訂單為例子)

發送queue模型的消息的關鍵是建立一個ActiveMQQueue對象

建立一個消息生産者類

CreateOrderInfoProducer.java

/**
 * 建立訂單的消息生産類
 */
@Component//交給spring容器管理
public class CreateOrderInfoProducer {

    //确定消息發送到哪個地點
    Destination destination = new ActiveMQQueue("shop.queue.createOrderInfo");

    //注入spirng幫我們封裝好的JMS模闆類
    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * 發送建立訂單消息的方法
     * userId 哪個使用者下的訂單
     */
    public void sendMessage(Long userId){
        //實用JMS模闆類發送消息
        jmsTemplate.convertAndSend(destination,userId);
    }
}
           
在需要發送該消息的業務調用:

 //調用發送建立訂單消息的方法
 createOrderInfoProducer.sendMessage(1L);
           

在預設情況下消息消費者監聽的是Queue消息域的模型

建立訂單消息的消費對象

/**
 * 建立訂單消息的消費對象
 */
//把消息接收者交給spring容器管理
@Component
public class CreateOrderInfoConsumer {

    @Autowired
    OrderInfoMapper dao;

    /**
     * 使用spring給我們提供的JMS監聽注解,監聽一個消息地點
     * 注意,監聽的消息地點要和消息發送到的地點一緻
     * @param userId 消息生産者發送過來的userId
     */
    @JmsListener(destination = "shop.queue.createOrderInfo")
    public void receiveMessage(Long userId) {
        
        //做具體的業務
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setUserId(userId);
        orderInfo.setOrderSn("222222");
        orderInfo.setOrderStatus(Byte.valueOf("0"));
        orderInfo.setPayStatus(Byte.valueOf("0"));
        orderInfo.setShippingStatus(Byte.valueOf("0"));
        dao.insert(orderInfo);
    }
}
           

發送Topic模型的消息

消息生産者:發送Topic模型的消息的關鍵是建立一個ActiveMQTopic對象,還有把消息發送到的地方修改一下,其餘的代碼跟Queue模型是一樣的

消息消費者:因為在預設情況下消息消費者監聽的是Queue消息域的模型,是以想要springboot應用監聽Topic消息域模型的消息,就必須修改springboot的預設配置

修改方法,在消息消費者應用的application.properties檔案添加一個配置屬性:

是否使用訂閱和釋出的模型監聽器,預設為false,也就是說預設使用的是點對點的模型監聽器

spring.jms.pub-sub-domain=true
           

還有把監聽的消費接收地方修改一下,其餘的代碼跟Queue消息域的模型一樣

Queue和Topic兩種模型同時存在

建立一個配置類:springboot配置類,使ActiveMQ同時支援監聽Queue和Topic模型的消息

自定義模型的監聽容器bean:

在建立的配置類中添加兩個bean,分别用于監聽Queue和Topic兩個模型的消息,并且注入到spring容器中

/**
     * 用于監聽Topic模型消息的容器bean
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //是否使用訂閱和釋出的模型監聽器,預設為false,也就是說預設使用的是點對點的模型監聽器
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * 用于監聽Queue模型消息的容器bean
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
           

按需選擇監聽器容器

現在我們已經定義出了兩個監聽器的容器,分别用于監聽Queue和Topic模型的消息了,現在要做的就是在消息消費者中選擇自己需要監聽的是哪一種模型

@JmsListener(destination = "shop.topic.createOrderInfo",containerFactory="jmsListenerContainerTopic")
在原本的@JmsListener基礎上加一個屬性:containerFactory,該屬性值就是我們自定義監聽器的Bean的方法名