一、springboot整合rabbitmq
- 我們需要建立兩個工程,一個作為生産者,另一個作為消費者。在pom.xml中添加amqp依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在application.yml檔案中添加rabbitmq的相關資訊:
spring:
rabbitmq:
# 連接配接位址
host: 127.0.0.1
# 端口
port: 5672
# 登入賬号
username: guest
# 登入密碼
password: guest
# 虛拟主機
virtual-host: /
- 在生産者工程中建立配置項rabbitmqConfig.java,申明名稱為”byte-zb“直連交換機和隊列,使用”byte-zb“的routing-key将隊列和交換機綁定,代碼如下:
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "byte-zb";
public static final String EXCHANGE_NAME = "byte-zb";
public static final String ROUTING_KEY = "byte-zb";
// 隊列申明
@Bean
public Queue queue(){
return new Queue(QUEUE_NAME);
}
// 申明交換機
@Bean
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE_NAME);
}
// 資料綁定申明
@Bean
public Binding directBinding(){
return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);
}
}
- 建立生産者發送一條消息,代碼如下:
@RestController
public class Producer {
public static final String QUEUE_NAME = "byte-zb";
public static final String EXCHANGE_NAME = "byte-zb";
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/send")
public void sendMessage(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("email","11111111111");
jsonObject.put("timestamp",System.currentTimeMillis());
String json = jsonObject.toJSONString();
System.out.println(json);
amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,json);
}
}
- 在消費者工程裡建立消費者消費消息,代碼如下:
@Component
public class Consumer throws Exception{
public static final String QUEUE_NAME = "byte-zb";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message){
System.out.println("接收到的消息為" message);
}
}
我們啟動生産者,然後請求send接口,然後打開rabbitmq控制台發現多了一個名為”byte-zb“的交換機和隊列,并且隊列中出現了一個未消費的消息,然後啟動消費者,我們會在控制台上發現列印了一條消息,同時rabbitmq控制台中”byte-zb“的隊列中消息沒有了。
二、自動補償機制
如果消費者消息消費不成功的話,會出現什麼情況呢?我們修改一下消費者代碼,然後看看。
@Component
public class Consumer {
public static final String QUEUE_NAME = "byte-zb";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) throws Exception {
System.out.println("接收到的消息為" message);
int i = 1 / 0;
}
}
我們會看到消費者工程控制台一直在重新整理報錯,當消費者配出異常,也就是說當消息消費不成功的話,該消息會存放在rabbitmq的服務端,一直進行重試,直到不抛出異常為止。
如果一直抛異常,我們的服務很容易挂掉,那有沒有辦法控制重試幾次不成功就不再重試了呢?答案是有的。我們在消費者application.yml中增加一段配置。
spring:
rabbitmq:
# 連接配接位址
host: 127.0.0.1
# 端口
port: 5672
# 登入賬号
username: guest
# 登入密碼
password: guest
# 虛拟主機
virtual-host: /
listener:
simple:
retry:
enabled: true # 開啟消費者進行重試
max-attempts: 5 # 最大重試次數
initial-interval: 3000 # 重試時間間隔
上面配置的意思是消費異常後,重試五次,每次隔3s。繼續啟動消費者看看效果,我們發現重試五次以後,就不再重試了。
三、結合實際案例來使用消息補償機制
像上面那種情況出現的異常其實不管怎麼重試都不會成功,實際上用到消息補償的就是調用第三方接口的這種。
案例:生者往隊列中扔一條消息,包含郵箱和發送内容。消費者拿到消息後将調用郵件接口發送郵件。有時候可能郵件接口由于網絡等原因不通,這時候就需要去重試了。
在調用接口的工具類中,如果出現異常我們直接傳回null,工具類具體代碼就不貼了,如果傳回null之後怎麼處理呢?我們隻需要抛出異常,rabbitListener捕獲到異常後就會自動重試。
我們改造一下消費者代碼:
@Component
public class Consumer {
public static final String QUEUE_NAME = "byte-zb";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) throws Exception {
System.out.println("接收到的消息為" message);
JSONObject jsonObject = JSONObject.parseObject(message);
String email = jsonObject.getString("email");
String content = jsonObject.getString("timestamp");
String httpUrl = "http://127.0.0.1:8080/email?email" email "&content=" content;
// 如果發生異常則傳回null
String body = HttpUtils.httpGet(httpUrl, "utf-8");
//
if(body == null){
throw new Exception();
}
}
}
當然我們可以自定義異常抛出。具體怎麼試驗呢,第一步啟動生産者和消費者,這時候我們發現消費者在重試,第二步我們啟動郵件服務,這時候我們會發現郵件發送成功了,消費者不再重試了。
四、解決消息幂等性問題
一些剛接觸java的同學可能對幂等性不太清楚。幂等性就是重複消費造成結果不一緻。為了保證幂等性,是以消費者消費消息隻能消費一次消息。我麼可以是用全局的消息id來控制幂等性。當消息被消費了之後我們可以選擇緩存儲存這個消息id,然後當再次消費的時候,我們可以查詢緩存,如果存在這個消息id,我們就不錯處理直接return即可。先改造生産者代碼,在消息中添加消息id:
@RequestMapping("/send")
public void sendMessage(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("email","11111111111");
jsonObject.put("timestamp",System.currentTimeMillis());
String json = jsonObject.toJSONString();
System.out.println(json);
Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8").setMessageId(UUID.randomUUID() "").build();
amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,message);
}
消費者代碼改造:
@Component
public class Consumer {
public static final String QUEUE_NAME = "byte-zb";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(Message message) throws Exception {
Jedis jedis = new Jedis("localhost", 6379);
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"UTF-8");
System.out.println("接收導的消息為:" msg "==消息id為:" messageId);
String messageIdRedis = jedis.get("messageId");
if(messageId == messageIdRedis){
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String content = jsonObject.getString("timestamp");
String httpUrl = "http://127.0.0.1:8080/email?email" email "&content=" content;
// 如果發生異常則傳回null
String body = HttpUtils.httpGet(httpUrl, "utf-8");
//
if(body == null){
throw new Exception();
}
jedis.set("messageId",messageId);
}
}
我們在消費者端使用redis存儲消息id,隻做示範,具體項目請根據實際情況選擇相應的工具進行存儲。