生産者
/**
* 消息被拒的情況
*/
public class Produce0001 {
private static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//該消息用作隊列的個數限制
for(int i=0;i<10;i++)
{
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生産者發送消息"+message);
}
}
}
消費者:
/**
* 消息被拒的情況
*/
public class Consumer0001 {
//普通交換機
private static final String NORMAL_EXCHANGE="normal_exchange";
//死信交換機
private static final String DEAD_EXCHANGE="dead_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
//聲明死信交換機,類型為direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//聲明死信隊列
String deadQueue="dead_queue";
channel.queueDeclare(deadQueue,false,false,false,null);
//死信隊列綁定交換和routingKey值
channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
//正常隊列綁定死信隊列
Map<String,Object> params=new HashMap<>();
//正常隊列設定死信交換機,參數key是固定值
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//正常隊列設定死信routing-key,參數key是固定值
params.put("x-dead-letter-routing-key", "lisi");
//正常隊列設定的最大限制長度
params.put("x-max-length",6);
System.out.println("等待接收消息....");
String normalQueue="normal_queue";
channel.queueDeclare(normalQueue,false,false,false,params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
DeliverCallback deliverCallback=(consumerTag, message) -> {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
if (s.equals("info5"))
{
System.out.println("info5拒接");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}
else
{
System.out.println("01接收到消息"+s);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});
}
}
結果:
