天天看點

ActiveMQ詳細介紹

1. 簡單示例

  • 建立連接配接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.94.151:61616");
Connection connection = connectionFactory.createConnection();
connection.start();      
  • 建立queue生産者,發送消息
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("Q-NUMBER");
            messageProducer = session.createProducer(destination);
            TextMessage textMessage = session.createTextMessage("ActiveMQ發送消息+eeee");
messageProducer.send(textMessage);
session.commit();      
  • 建立Topic生産者,發送消息
= connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic("myFirstTopic");
            messageProducer = session.createProducer(destination);
            TextMessage textMessage = session.createTextMessage("ActimeMQ發送主題");
            System.out.println(textMessage.getText());
            messageProducer.send(textMessage);

            //送出
            session.commit();      
  • 建立隊列消費者,阻塞模式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Q-NUMBER");
messageConsumer = session.createConsumer(destination);

while (true){
    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
    if(textMessage!=null){
        System.out.println("收到消息:"+textMessage.getText());
    }else {
        break;
    }
}      
  • 建立隊列消費者,監聽模式(連接配接不要關了)
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Q-NUMBER");
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new JMSListener());      
public class JMSListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            System.out.println("收到消息: "+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}      
  • 建立主題消費者,阻塞模式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("myFirstTopic");
messageConsumer = session.createConsumer(destination);

while (true){
    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
    if(textMessage!=null){
        System.out.println("收到消息:"+textMessage.getText());
    }else {
        break;
    }
}      
  • 建立主題消費者,監聽模式,JMSListener類與上面一樣
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("myFirstTopic");
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new JMSListener());      

2. 發送消息設定

發送消息要保證消息的可靠性,解決方案是在事物中發送持久性的消息。事物能保證程式出錯時不會誤發消息,持久性能保證消息沒被消費的情況下,mq消息不會丢失。

2.1 事物

建立session的時候指定事物​

​session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);​

​​,第二個參數可以忽略,當有事物時第二個參數預設為​

​Session.SESSION_TRANSACTED​

​​。當執行​

​session.commit()​

​​時,消息才真正發送送出,可以在一個事物裡發送多條消息。復原​

​session.rollback()​

2.2 持久化與非持久化

public interface DeliveryMode {
    static final int NON_PERSISTENT = 1;

    static final int PERSISTENT = 2;
}      
  • 持久化後可以保證消息服務由于某種原因不會丢失消息(如:mq伺服器重新開機)
  • 使用messageProducer.setDeliveryMode方法,所有的消息都采用此傳送模式
  • 使用messageProducer.send方法為每一條消息設定傳送模式

2.3 逾時

  • 使用messageProducer.setTimeToLive方法,所有的消息都采用此過期時間
  • 使用messageProducer.send方法為每一條消息設定過期時間

2.4 設定優先級

消息傳送者将會首先嘗試傳送優先級較高的消息。消息優先級有0-9十個級别,0-4是普通消息,5-9是加急消息,預設為4

  • 使用messageProducer.setPriority方法,所有的消息都采用此優先級
  • 使用messageProducer.send方法為每一條消息設定優先級

2.5 異步發送消息

  • 使用Connection URI配置異步發送:

    ​​

    ​cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");​

  • 在ConnectionFactory層面配置異步發送:

    ​​

    ​((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);​

  • 在Connection層面配置異步發送,此層面的設定将覆寫ConnectionFactory層面的設定:

    ​​

    ​((ActiveMQConnection)connection).setUseAsyncSend(true);​

3. 接收消息設定

接受消息也要保證可靠性,在事物中接收消息。

3.1 控制消息簽收(AcknowledgeMode)

用戶端成功接收一條消息的标志是這條消息被簽收,有如下三個階段

  1. 用戶端接收消息
  2. 用戶端處理消息
  3. 消息被簽收(可以由用戶端發起,也可以由ActiveMQ發起,取決于Session簽收模式的設定)
  • 在帶事物的session中,簽收發生在session.commit()時,如果事物發生了復原,消息會再次被傳送。
  • 在不帶事物的session中,簽收取決于session的設定
  • Session.AUTO_ACKNOWLEDGE。當客戶成功的從receive方法傳回的時候,或者從MessageListener.onMessage方法成功傳回的時候,會話自動确認客戶收到的消息。
  • Session.CLIENT_ACKNOWLEDGE。 用戶端通過消息的 acknowledge 方法确認消息。需要注意的是,在這種模式中,确認簽收是在會話層上進行:确認一個被消費的消息将自動确認所有已被會話消費的消息。例如,如果一個消息消費者消費了 10 個消息,然後确認第 5 個消息,那麼所有 10 個消息都被确認。
  • Session.DUPS_ACKNOWLEDGE。 該選擇隻是會話遲鈍的确認消息的送出。如果 JMS provider 失敗,那麼可能會導緻一些重複的消息。如果是重複的消息,那麼 JMS provider 必須把消息頭的 JMSRedelivered 字段設定為 true。
  • Session.SESSION_TRANSACTED:事務

例如:

​session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);​

​​ 設定在事物中接收消息,第一個參數為true時,第二個參數預設為​

​Session.SESSION_TRANSACTED​

​​。當執行​

​session.commit()​

​時才認為消息消費成功

對一個隊列來說,如果sessionz終止時,它接受了消息,但是沒有簽收,那麼ActiveMQ會認為消息沒有被消費成功,會再次傳送給消費者

對主題來說,對與非持久訂閱,session終止時會删除消息,對于持久訂閱,已消費但未簽收,會再次傳送給消費者。

開啟事物若不送出,消息相當于沒有消費,因為連接配接沒斷,會一直占用着此消息,也不會被其它消費者消費

3.2 持久訂閱

持久訂閱可以增加消息的可靠性,用戶端向ActiveMQ注冊一個識别自己身份的ID,當這個用戶端處于離線時,ActiveMQ會為這個ID儲存所有發送到主題的消息,當用戶端再次連接配接到ActiveMQ時,會得到離線這段時間發送的消息主題。

  • 為連接配接設定一個客戶ID
  • 為訂閱的主題指定一個訂閱名稱
connection = connectionFactory.createConnection();
String clientId = "8fc38c5d-a4a9-43bb-b617-03f25cce2da5";
connection.setClientID(clientId);
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("myFirstTopic");
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "sub-durable-"+clientId);
durableSubscriber.setMessageListener(new JMSListener());      

3.3 消費者異步分派

ActiveMQ可以以同步或者異步的模式向消費者分派消息。可以以異步模式向處理慢的消費者分派消息,以同步模式向處理消息快的消費者分派消息

  • 在ConnectionFactory層面配置同步分派:​

    ​((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);​

  • 在Connection層面配置同步分派,此層面的設定将覆寫ConnectionFactory層面的設定:

    ​​

    ​((ActiveMQConnection)connection).setDispatchAsync(false);​

  • 在消費者層面以Destination URI配置同步分派,此層面的設定将覆寫ConnectionFactory和Connection層面的設定:

    ​​

    ​queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false"); consumer = session.createConsumer(queue);​

3.4 消費者優先級

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prority=10");

consumer = session.createConsumer(queue);      

3.5 獨占的消費者

如果有多個消費者,那麼同一時刻從隊列中接收消息時就不能保證處理時是有序的,AMQ4支援獨占的消費,會挑選一個consumer,并把隊列中所有的消息按順序分派給它。如果消費者發生故障,會選擇另一個消費者。

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

consumer = session.createConsumer(queue);      

3.6 消息預取

預取限制來控制有多少消息能及時的傳送給消費者,一旦達到預取數量,那就不會有消息被分派給這個消費者知道它發送簽收消息。不是指同時處理的數量

如果有大量的消息并且希望高性能,可以為這個消費者增大預取值,如果有少量的消息并且花費時間都很長,可以設定預取值為1。這樣同一時間AMQ隻會為這個消費者分派一條消息

  • 在ConnectionFactory層面為所有消費者配置預取值:

    ​​

    ​tcp://localhost:61616?jms.prefetchPolicy.all=50​

  • 在ConnectionFactory層面為隊列消費者配置預取值:

    ​​

    ​tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1​

  • 使用“目标選項”為一個消費者配置預取值:

    ​​

    ​queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");​

    ​​

    ​consumer = session.createConsumer(queue);​

3.7 消息重試

//重發政策
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
//是否采用碰撞因數做判斷
queuePolicy.setUseCollisionAvoidance(false);
//碰撞躲避因數,會和重發延遲做計算,得出最終延遲時間,會在+-15%之間随機選擇時間
queuePolicy.setCollisionAvoidancePercent((short) 15);
//重發延遲初始值
queuePolicy.setInitialRedeliveryDelay(1000);
//如果initialRedeliveryDelay為0則使用這個值
queuePolicy.setRedeliveryDelay(1000);
//是否成倍增加延遲
queuePolicy.setUseExponentialBackOff(false);
//成倍延遲倍率,上次延遲時間*此值為要計算時的延遲時間
queuePolicy.setBackOffMultiplier(5);
//UseCollisionAvoidance 為true時生效
queuePolicy.setMaximumRedeliveryDelay(200000);

//最大重發次數,從0開始
queuePolicy.setMaximumRedeliveries(2);

destination = session.createQueue("Q-NUMBER");

RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
//為目前目的地設定重試政策
map.put((ActiveMQDestination) destination, queuePolicy);
      

若消息達到嘗試次數消費失敗或者逾時等,會進入死信隊列​

​ActiveMQ.DLQ​

隻會在目前消費者進行重試,不會切換到其他的消費者

繼續閱讀