1. 簡單示例
- 建立連接配接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.94.151:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
- 建立queue生産者,發送消息
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Q-NUMBER");
messageProducer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage("ActiveMQ發送消息+eeee");
messageProducer.send(textMessage);
session.commit();
- 建立Topic生産者,發送消息
= connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("myFirstTopic");
messageProducer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage("ActimeMQ發送主題");
System.out.println(textMessage.getText());
messageProducer.send(textMessage);
//送出
session.commit();
- 建立隊列消費者,阻塞模式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Q-NUMBER");
messageConsumer = session.createConsumer(destination);
while (true){
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if(textMessage!=null){
System.out.println("收到消息:"+textMessage.getText());
}else {
break;
}
}
- 建立隊列消費者,監聽模式(連接配接不要關了)
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Q-NUMBER");
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new JMSListener());
public class JMSListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("收到消息: "+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 建立主題消費者,阻塞模式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("myFirstTopic");
messageConsumer = session.createConsumer(destination);
while (true){
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if(textMessage!=null){
System.out.println("收到消息:"+textMessage.getText());
}else {
break;
}
}
- 建立主題消費者,監聽模式,JMSListener類與上面一樣
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("myFirstTopic");
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new JMSListener());
2. 發送消息設定
發送消息要保證消息的可靠性,解決方案是在事物中發送持久性的消息。事物能保證程式出錯時不會誤發消息,持久性能保證消息沒被消費的情況下,mq消息不會丢失。
2.1 事物
建立session的時候指定事物
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
,第二個參數可以忽略,當有事物時第二個參數預設為
Session.SESSION_TRANSACTED
。當執行
session.commit()
時,消息才真正發送送出,可以在一個事物裡發送多條消息。復原
session.rollback()
2.2 持久化與非持久化
public interface DeliveryMode {
static final int NON_PERSISTENT = 1;
static final int PERSISTENT = 2;
}
- 持久化後可以保證消息服務由于某種原因不會丢失消息(如:mq伺服器重新開機)
- 使用messageProducer.setDeliveryMode方法,所有的消息都采用此傳送模式
- 使用messageProducer.send方法為每一條消息設定傳送模式
2.3 逾時
- 使用messageProducer.setTimeToLive方法,所有的消息都采用此過期時間
- 使用messageProducer.send方法為每一條消息設定過期時間
2.4 設定優先級
消息傳送者将會首先嘗試傳送優先級較高的消息。消息優先級有0-9十個級别,0-4是普通消息,5-9是加急消息,預設為4
- 使用messageProducer.setPriority方法,所有的消息都采用此優先級
- 使用messageProducer.send方法為每一條消息設定優先級
2.5 異步發送消息
-
使用Connection URI配置異步發送:
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
-
在ConnectionFactory層面配置異步發送:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
-
在Connection層面配置異步發送,此層面的設定将覆寫ConnectionFactory層面的設定:
((ActiveMQConnection)connection).setUseAsyncSend(true);
3. 接收消息設定
接受消息也要保證可靠性,在事物中接收消息。
3.1 控制消息簽收(AcknowledgeMode)
用戶端成功接收一條消息的标志是這條消息被簽收,有如下三個階段
- 用戶端接收消息
- 用戶端處理消息
- 消息被簽收(可以由用戶端發起,也可以由ActiveMQ發起,取決于Session簽收模式的設定)
- 在帶事物的session中,簽收發生在session.commit()時,如果事物發生了復原,消息會再次被傳送。
- 在不帶事物的session中,簽收取決于session的設定
- Session.AUTO_ACKNOWLEDGE。當客戶成功的從receive方法傳回的時候,或者從MessageListener.onMessage方法成功傳回的時候,會話自動确認客戶收到的消息。
- Session.CLIENT_ACKNOWLEDGE。 用戶端通過消息的 acknowledge 方法确認消息。需要注意的是,在這種模式中,确認簽收是在會話層上進行:确認一個被消費的消息将自動确認所有已被會話消費的消息。例如,如果一個消息消費者消費了 10 個消息,然後确認第 5 個消息,那麼所有 10 個消息都被确認。
- Session.DUPS_ACKNOWLEDGE。 該選擇隻是會話遲鈍的确認消息的送出。如果 JMS provider 失敗,那麼可能會導緻一些重複的消息。如果是重複的消息,那麼 JMS provider 必須把消息頭的 JMSRedelivered 字段設定為 true。
- Session.SESSION_TRANSACTED:事務
例如:
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
設定在事物中接收消息,第一個參數為true時,第二個參數預設為
Session.SESSION_TRANSACTED
。當執行
session.commit()
時才認為消息消費成功
對一個隊列來說,如果sessionz終止時,它接受了消息,但是沒有簽收,那麼ActiveMQ會認為消息沒有被消費成功,會再次傳送給消費者
對主題來說,對與非持久訂閱,session終止時會删除消息,對于持久訂閱,已消費但未簽收,會再次傳送給消費者。
開啟事物若不送出,消息相當于沒有消費,因為連接配接沒斷,會一直占用着此消息,也不會被其它消費者消費
3.2 持久訂閱
持久訂閱可以增加消息的可靠性,用戶端向ActiveMQ注冊一個識别自己身份的ID,當這個用戶端處于離線時,ActiveMQ會為這個ID儲存所有發送到主題的消息,當用戶端再次連接配接到ActiveMQ時,會得到離線這段時間發送的消息主題。
- 為連接配接設定一個客戶ID
- 為訂閱的主題指定一個訂閱名稱
connection = connectionFactory.createConnection();
String clientId = "8fc38c5d-a4a9-43bb-b617-03f25cce2da5";
connection.setClientID(clientId);
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("myFirstTopic");
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "sub-durable-"+clientId);
durableSubscriber.setMessageListener(new JMSListener());
3.3 消費者異步分派
ActiveMQ可以以同步或者異步的模式向消費者分派消息。可以以異步模式向處理慢的消費者分派消息,以同步模式向處理消息快的消費者分派消息
- 在ConnectionFactory層面配置同步分派:
((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
-
在Connection層面配置同步分派,此層面的設定将覆寫ConnectionFactory層面的設定:
((ActiveMQConnection)connection).setDispatchAsync(false);
-
在消費者層面以Destination URI配置同步分派,此層面的設定将覆寫ConnectionFactory和Connection層面的設定:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false"); consumer = session.createConsumer(queue);
3.4 消費者優先級
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prority=10");
consumer = session.createConsumer(queue);
3.5 獨占的消費者
如果有多個消費者,那麼同一時刻從隊列中接收消息時就不能保證處理時是有序的,AMQ4支援獨占的消費,會挑選一個consumer,并把隊列中所有的消息按順序分派給它。如果消費者發生故障,會選擇另一個消費者。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
3.6 消息預取
預取限制來控制有多少消息能及時的傳送給消費者,一旦達到預取數量,那就不會有消息被分派給這個消費者知道它發送簽收消息。不是指同時處理的數量
如果有大量的消息并且希望高性能,可以為這個消費者增大預取值,如果有少量的消息并且花費時間都很長,可以設定預取值為1。這樣同一時間AMQ隻會為這個消費者分派一條消息
-
在ConnectionFactory層面為所有消費者配置預取值:
tcp://localhost:61616?jms.prefetchPolicy.all=50
-
在ConnectionFactory層面為隊列消費者配置預取值:
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
-
使用“目标選項”為一個消費者配置預取值:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
3.7 消息重試
//重發政策
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
//是否采用碰撞因數做判斷
queuePolicy.setUseCollisionAvoidance(false);
//碰撞躲避因數,會和重發延遲做計算,得出最終延遲時間,會在+-15%之間随機選擇時間
queuePolicy.setCollisionAvoidancePercent((short) 15);
//重發延遲初始值
queuePolicy.setInitialRedeliveryDelay(1000);
//如果initialRedeliveryDelay為0則使用這個值
queuePolicy.setRedeliveryDelay(1000);
//是否成倍增加延遲
queuePolicy.setUseExponentialBackOff(false);
//成倍延遲倍率,上次延遲時間*此值為要計算時的延遲時間
queuePolicy.setBackOffMultiplier(5);
//UseCollisionAvoidance 為true時生效
queuePolicy.setMaximumRedeliveryDelay(200000);
//最大重發次數,從0開始
queuePolicy.setMaximumRedeliveries(2);
destination = session.createQueue("Q-NUMBER");
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
//為目前目的地設定重試政策
map.put((ActiveMQDestination) destination, queuePolicy);
若消息達到嘗試次數消費失敗或者逾時等,會進入死信隊列
ActiveMQ.DLQ
隻會在目前消費者進行重試,不會切換到其他的消費者