rabbitmq自動及手動ACK
mq的ack 主要是确認消息被消費者消費完成後通知伺服器将隊列裡面的消息清除。
而如果不配置Ack的話呢,我測試過他會自動的忽略,也就是說此時的服務是no_ack=true的模式,就是說隻要我發現你是消費了這個資料,至于異常不異常的,我不管了。通知Ack機制就是這麼來的,更加靈活的,我們需要Ack不自動,而是手動,這樣做的好處,就是使得我們開發人員更加人性化或者靈活的來處理我們的業務羅傑代碼,更加友善的處理異常的問題以及資料的傳回處理等。下面是通話機制的四條原則:
- Basic.Ack 發回給 RabbitMQ 以告知,可以将相應 message 從 RabbitMQ 的消息緩存中移除。
- Basic.Ack 未被 consumer 發回給 RabbitMQ 前出現了異常,RabbitMQ 發現與該 consumer 對應的連接配接被斷開,之後将該 message 以輪詢方式發送給其他 consumer (假設存在多個 consumer 訂閱同一個 queue)。
- 在 no_ack=true 的情況下,RabbitMQ 認為 message 一旦被 deliver 出去了,就已被确認了,是以會立即将緩存中的 message 删除。是以在 consumer 異常時會導緻消息丢失。
- 來自 consumer 側的 Basic.Ack 與 發送給 Producer 側的 Basic.Ack 沒有直接關系。
正題部分(配置手動Ack,實作異常消息復原)
A. 在消費者端的mq配置檔案上添加,配置 關鍵代碼為 acknowledeg = "manual",意為表示該消費者的ack方式為手動(此時的queue已經和生産者的exchange通過某個routeKey綁定了)
B. 建立一個類 MqConsumer ,并實作接口 ChannelAwareMessageListener ,實作onMessage方法,不需要指定方法。
springAMQP中已經實作了一個功能,如果該監聽器已經實作了下面2個接口,則直接調用onMessage方法
C. 關鍵點在實作了ChannelAwareMessageListener的onMessage方法後,會有2個參數。
一個是message(消息實體),一個是channel就是目前的通道,很多地方都沒有說清楚怎麼去手動ack,其實手動ack就是在目前channel裡面調用basicAsk的方法,并傳入目前消息的tagId就可以了。
消息監聽接口實作
1.MessageListener消費者消息監聽(自動進行任務完成确認)
基于實作MessageListener的消費者監聽消息時,如果xml裡沒有配置acknowledge,則是預設如同xml配置acknowledge="auto" ,是自動确認消費者完成任務(消息ack), 如果此時消費者抛出異常 ,消息會傳回該隊列并發送給其他消費者 ,如沒有其他消費者 則會繼續發到該消費者
如果xml配置中acknowledge="manual",則無法收到消息。該消息會停留在伺服器,然後會發給可以收到消息的消費者。
2.ChannelAwareMessageListener消費者消息監聽(手動進行任務完成确認)
基于實作ChannelAwareMessageListener的消費者監聽消息時,xml配置中acknowledge="auto"或不配置acknowledge時,調用方法進行消費者任務完成确認時會報如下異常(com.rabbitmq.client.ShutdownSignalException: channel error;)
是以若要實作手動消費則任務完成确認,xml的監聽标簽中需要配置acknowledge="manual" 手動确認消費者任務完成(消息ack)
消息确認 如未調用如下方法确認,則消息不再發到該消費者(如有其它的消費者,則輪詢到其他的消費者,否則傳回隊列保留在伺服器),multiple 為 false隻确認目前一個消息收到,true确認所有consumer獲得的消息
(1) 消息确認 如未确認則消息不再發到該消費者,multiple 為 false隻确認目前一個消息收到,true确認所有consumer獲得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
(2) 消息确認并傳回隊列 如未确認則消息不在發到該消費者,multiple 為 false隻确認目前一個消息收到,true确認所有 consumer獲得的消息;requeue 為true該消息重新回到隊列,并發到該隊列的其他消費者,為false則直接丢掉該消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
(3) 拒絕消息 requeue 為true該消息重新回到隊列,并發到該隊列的其他消費者,如沒有其他消費者,則會一直發到該消費者,為false則直接丢掉該消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
同樣的,如果要Nack或者拒絕消息(reject)的時候,也是調用channel裡面的basicXXX方法就可以了(當然要制定tagId)。注意如果抛異常或Nack(并且requeue為true),消息會一直重新入隊列,一不小心就會重複一大堆消息不斷出現~。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的辨別,false隻确認目前一個消息收到,true确認所有consumer獲得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack傳回false,并重新回到隊列,api裡面解釋得很清楚 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
D. 針對上面所描述的情況,我們在搭建一個消息隊列時候,我們的思路應該是這樣的,首先,我們要啟動ack的手動方式,緊接着,我們處理代碼邏輯,如果發生了異常資訊,我們首先通知到ack,我已經表示接受到這條資料了,你可以進行删除了,不需要讓他自動的重新進入隊列中,然後,我們啟用一個錯誤處理,手動将其重新插入隊列中,在此之前,有幾個類和Api一起來看一下。
1. SimpleMessageListenerContainer
這個是我們的基礎監聽,他的作用就是隊列的總監聽,可以為其配置ack模式,異常處理類等。。
2. org.springframework.amqp.support.converter.SimpleMessageConverter
這個類和下面的Converter類很容易搞混淆,這個類的作用是可以解析隊列中的 message 轉 obj
3. org.springframework.amqp.rabbit.retry.MessageRecoverer
這個接口,需要我們開發者自定義實作,其中的一個方法recover(Message message, Throwable cause),就可以看出來他是幹嘛的,就是說在監聽出錯,也就是沒有抓取的異常而是抛出的異常會觸發該方法,我們就會在這個接口的實作中,将消息重新入隊列
4. org.springframework.util.ErrorHandler
這個接口也是在出現異常時候,會觸發他的方法
案例。。。。。。。。。。。。。。。。。。。
生産者
import java.io.IOException;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer{
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
@Resource(name="amqpTemplate2")
private AmqpTemplate amqpTemplate2;
public void sendMessage(Object message) throws IOException {
logger.info("to send message:{}", message);
amqpTemplate.convertAndSend("queue.Test.admin", message);
// implements ConfirmCallback
// Message re = (Message)amqpTemplate.convertSendAndReceive("queue.Test.admin", message);
// amqpTemplate2.convertAndSend("queue.Test.admin", message);
}
消費者 1 及其配置檔案.xml
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("admin MessageConsumer consumer receive message------->:{}", message);
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
xml配置中acknowledge="auto" 時 是自動确認ack 如果此時消費者抛出異常 消息會發到該隊列其他消費者 如沒有其他消費者 則會一直發到該消費者
// throw new NullPointerException(".....admin.....消費者異常。。。。。。。。");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定連接配接rabbit server參數 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/"
username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
<!--配置connection-factory,指定連接配接rabbit server參數
<rabbit:connection-factory id="connectionFactory" virtual-host="hymn"
username="hy" password="hy2018627" host="120.25.212.10" port="5672" />
-->
<!--通過指定下面的admin資訊,目前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定義queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" >
</rabbit:queue>
<!-- 定義direct exchange,綁定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義rabbit template用于資料的接收和發送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest"/>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 acknowledge="manual" -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage"/>
</rabbit:listener-container>
<!--定義rabbit template用于資料的接收和發送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
<!--定義queue -->
<rabbit:queue name="queueTest2" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定義direct exchange,綁定queueTest -->
<rabbit:topic-exchange name="exchangeTopic"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
<rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queueTest2" ref="messageConsumer" method="onMessage"/>
</rabbit:listener-container>
</beans>
消費者 2 及其配置檔案.xml(另外一個項目)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// TODO Auto-generated method stub
// 消息監聽接口實作
// 1.MessageListener消費者消息監聽(自動進行任務完成确認)
// 基于實作MessageListener的消費者監聽消息時,如果xml裡沒有配置acknowledge,則是預設如同xml配置acknowledge="auto" ,是自動确認消費者完成任務(消息ack), 如果此時消費者抛出異常 ,消息會傳回該隊列并發送給其他消費者 ,如沒有其他消費者 則會繼續發到該消費者
// 如果xml配置中acknowledge="manual",則無法收到消息。該消息會停留在伺服器,然後會發給可以收到消息的消費者。
//
// 2.ChannelAwareMessageListener消費者消息監聽(手動進行任務完成确認)
// 基于實作ChannelAwareMessageListener的消費者監聽消息時,xml配置中acknowledge="auto"或不配置acknowledge時,調用方法進行消費者任務完成确認時會報如下異常(com.rabbitmq.client.ShutdownSignalException: channel error;)
// 是以若要實作手動消費則任務完成确認,xml的監聽标簽中需要配置acknowledge="manual" 手動确認消費者任務完成(消息ack)
//
// 消息确認 如未調用如下方法确認,則消息不再發到該消費者(如有其它的消費者,則輪詢到其他的消費者),multiple 為 false隻确認目前一個消息收到,true确認所有consumer獲得的消息
// 消息确認 如未确認則消息不在發到該消費者,multiple 為 false隻确認目前一個消息收到,true确認所有consumer獲得的消息
// (1)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 消息确認并傳回隊列 如未确認則消息不在發到該消費者,multiple 為 false隻确認目前一個消息收到,true确認所有consumer獲得的消息;requeue 為true該消息重新回到隊列,并發到該隊列的其他消費者,為false則直接丢掉該消息
// (2)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒絕消息 requeue 為true該消息重新回到隊列,并發到該隊列的其他消費者,如沒有其他消費者,則會一直發到該消費者,為false則直接丢掉該消息
// (3)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//..........手動消息确認。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
xml配置中acknowledge="auto" 時 是自動确認ack 如果此時消費者抛出異常 消息會發到該隊列其他消費者 如沒有其他消費者 則會一直發到該消費者
// if(true){
// throw new NullPointerException(".....admin.....消費者異常。。。。。。。。");
// }
logger.error("收到");
//消息确認 如未确認則消息不在發到該消費者,multiple 為 false隻确認目前一個消息收到,true确認所有consumer獲得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
logger.info("business-admin MessageConsumer receive message 出現異常 并将該消息重新入隊列------->:{}", message);
logger.info("messageid:"+message.getMessageProperties().getDeliveryTag()+" ...messageBody:"+message.getBody());
//消息确認并傳回隊列 如未确認則消息不在發到該消費者,multiple 為 false隻确認目前一個消息收到,true确認所有consumer獲得的消息;requeue 為true該消息重新回到隊列,并發到該隊列的其他消費者,為false則直接丢掉該消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息 requeue 為true該消息重新回到隊列,并發到該隊列的其他消費者,如沒有其他消費者,則會一直發到該消費者,為false則直接丢掉該消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//........................手動通知消息生産者。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定連接配接rabbit server參數 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/"
username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
<!--配置connection-factory,指定連接配接rabbit server參數
<rabbit:connection-factory id="connectionFactory2" virtual-host="hymn"
username="hy" password="hy2018627" host="120.25.212.10" port="5672" publisher-confirms="true"/>
-->
<!--通過指定下面的admin資訊,目前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定義queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<rabbit:queue name="queueTest3" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定義direct exchange,綁定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義rabbit template用于資料的接收和發送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 acknowledge="manual"-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage" />
</rabbit:listener-container>
<!-- .............................................................................. -->
<!--定義rabbit template用于資料的接收和發送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
<!--定義queue -->
<rabbit:queue name="queueTest2" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定義direct exchange,綁定queueTest -->
<rabbit:topic-exchange name="exchangeTopic" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
<rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queueTest2" ref="topicConsumer" method="onMessage"/>
</rabbit:listener-container>
</beans>