1、导包
org.springframework.boot spring-boot-starter-web org.projectlombok lombok trueorg.springframework.boot spring-boot-starter-amqp
2、编写properties文件
#给应用取名字spring.application.name=springboot-rabbitmq#ip地址spring.rabbitmq.host=47.96.119.26#端口spring.rabbitmq.port=5672#用户名spring.rabbitmq.username=xiaobobo#密码spring.rabbitmq.password=12345678#开启消息确认机制spring.rabbitmq.publisher-confirms=true#开始return机制spring.rabbitmq.publisher-returns=true#整个虚拟机spring.rabbitmq.virtual-host=/#消息采用手动确认spring.rabbitmq.listener.direct.acknowledge-mode=manual#消费者最小数量spring.rabbitmq.listener.simple.concurrency=1#消费之最大数量spring.rabbitmq.listener.simple.max-concurrency=10#在单个请求中处理的消息个数,(unack的最大数量)spring.rabbitmq.listener.simple.prefetch=2#消费者自动启动#spring.rabbitmq.listener.simple.auto-startup=true#消费者消费失败,自动重新入队#spring.rabbitmq.listener.simple.default-requeue-rejected=true#启用发送重试 队列满了发不进去时启动重试#spring.rabbitmq.template.retry.enabled=true#1秒钟后重试一次#spring.rabbitmq.template.retry.initial-interval=1000ms#最大重试次数 3次#spring.rabbitmq.template.retry.max-attempts=3#最大间隔 10秒钟#spring.rabbitmq.template.retry.max-interval=10000ms#等待间隔 的倍数。如果为2 第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒#spring.rabbitmq.template.retry.multiplier=1.0
3、编写配置类
@SpringBootConfigurationpublic class RabbitMQConfig { @Bean public Queue queueWork(){ return new Queue("queueWork1"); } /*发布订阅模式*/ //首先得申明两个队列 @Bean public Queue queueFanout1(){ return new Queue("queueFanout1"); } @Bean public Queue queueFanout2(){ return new Queue("queueFanout2"); } //准备一个交换机 @Bean public FanoutExchange fanoutExchange1(){ return new FanoutExchange("fanoutExchange1"); } //将队列绑定到交换机 @Bean public Binding bindingExchange1(Queue queueFanout1,FanoutExchange fanoutExchange1){ return BindingBuilder.bind(queueFanout1).to(fanoutExchange1); } @Bean public Binding bindingExchange2(Queue queueFanout2,FanoutExchange fanoutExchange1){ return BindingBuilder.bind(queueFanout2).to(fanoutExchange1); } //路由模型 //首先要准备两个队列 @Bean public Queue queueRouting1(){ return new Queue("queueRouting1"); } @Bean public Queue queueRouting2(){ return new Queue("queueRouting2"); } //准备一个交换机 @Bean public DirectExchange directExchange1(){ return new DirectExchange("directExchange1"); } //接下来进行绑定 @Bean public Binding bindingDirectExchange1(Queue queueRouting1,DirectExchange directExchange1){ return BindingBuilder.bind(queueRouting1).to(directExchange1).with("xiaobobo"); } @Bean public Binding bindingDirectExchange2(Queue queueRouting2,DirectExchange directExchange1){ return BindingBuilder.bind(queueRouting2).to(directExchange1).with("xiaowangzi"); }}
4、编写manager
@Componentpublic class RabbitMQManager { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息到工作模型中 */ public void sendWork(){ for (int i = 0; i <100 ; i++) { rabbitTemplate.convertAndSend("queueWork",new User(i,"张三","123")); } } /** * 向发布订阅模式里面发送消息 */ public void sendPublishWork(){ rabbitTemplate.convertSendAndReceive("fanoutExchange1","","发布订阅模式的值"); } /** * 向路由模型里面发送请求 */ public void sendDirectExchange(){ rabbitTemplate.convertSendAndReceive("directExchange1","xiaowangzi","路由模型的值"); } /** * Confirm机制 */ final RabbitTemplate.ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("确认机制........"); } }; /** * return机制 */ RabbitTemplate.ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback(){ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("return机制收到消息了....."); } }; /** * 发送数据的第二种方式 */ public void sendWork1(Object message, Map properties){// MessageProperties messageProperties = new MessageProperties();// //添加属性...// Message message1 = new Message("内容".getBytes(), messageProperties);// //现在用下 return 机制 confirm机制.... rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback);// //用下return机制// rabbitTemplate.setReturnCallback(returnCallback); //这里需要时间戳 每一条消息 都有一个时间戳 // 作业: // 把rabbitmq中的其他功能 // 手动确认 重回队列 限流 dlx死信队列玩一把 //rabbitTemplate.convertAndSend("queueWork","我是工作模型测试的值小波波"); }}
5、编写service
@Servicepublic class RabbitMQService implements IRabbitMQService { @Autowired private RabbitMQManager rabbitMQManager; @Override public void sendMsg() { rabbitMQManager.sendWork(); } @Override public void sendPublishWork() { rabbitMQManager.sendPublishWork(); } @Override public void sendDirectExchange() { rabbitMQManager.sendDirectExchange(); }}
6、编写controller
@RestControllerpublic class RabbitMQController { @Autowired private IRabbitMQService rabbitMQService; /** * 测试工作模型的发消息 * @return */ @RequestMapping("sendData") public Object sendMsg(){ rabbitMQService.sendMsg(); return "发送成功...."; } /** * 向发布订阅模式 发送值 * @return */ @RequestMapping("sendPublishData") public Object sendPublishData(){ rabbitMQService.sendPublishWork(); return "发送成功...."; } /** * 向发布订阅模式 发送值 * @return */ @RequestMapping("sendDirectData") public Object sendDirectData(){ rabbitMQService.sendDirectExchange(); return "发送成功...."; } }
7、编写消费者
@Componentpublic class WorkReciveListener { @RabbitListener(queues = {"queueWork"}) public void reciveMessage(User user, Channel channel, Message message2){ System.out.println("111收到消息了:"+user); System.out.println("111通道是:"+channel); System.out.println("111传输数据封装:"+message2);// message2.getMessageProperties().getDeliveryTag();// channel.basicAck(message2.getMessageProperties().getDeliveryTag(),false);// channel.basicNack(); } @RabbitListener(queues = {"queueWork"}) public void reciveMessage1(User user, Channel channel, Message message2){ System.out.println("222收到消息了:"+user); System.out.println("222通道是:"+channel); System.out.println("222传输数据封装:"+message2);// message2.getMessageProperties().getDeliveryTag();// channel.basicAck(message2.getMessageProperties().getDeliveryTag(),false);// channel.basicNack(); }}
8、编写发布订阅模式的消费者
@Componentpublic class PublishReciveListener { @RabbitListener(queues = {"queueFanout1"}) public void reciveMessage(String str, Channel channel, Message message2){ System.out.println("111收到消息了:"+str); } @RabbitListener(queues = {"queueFanout2"}) public void reciveMessage1(String str, Channel channel, Message message2){ System.out.println("222收到消息了:"+str); }}
9、编写路由模型的消费者
@Componentpublic class DirectReciveListener { @RabbitListener(queues = {"queueRouting1"}) public void reciveMessage(String str, Channel channel, Message message2){ System.out.println("111收到消息了:"+str); } @RabbitListener(queues = {"queueRouting2"}) public voi reciveMessage1(String str, Channel channel, Message message2){ System.out.println("222收到消息了:"+str); }}
rabbitmq工作模式_SpringBoot 整合RabbitMQ1、导包2、编写properties文件3、编写配置类4、编写manager5、编写service6、编写controller7、编写消费者8、编写发布订阅模式的消费者9、编写路由模型的消费者