1、導包
org.springframework.boot spring-boot-starter-web org.projectlombok lombok trueorg.springframework.boot spring-boot-starter-amqp
2、編寫properties檔案
#給應用取名字spring.application.name=springboot-rabbitmq#ip位址spring.rabbitmq.host=47.96.119.26#端口spring.rabbitmq.port=5672#使用者名spring.rabbitmq.username=xiaobobo#密碼spring.rabbitmq.password=12345678#開啟消息确認機制spring.rabbitmq.publisher-confirms=true#開始return機制spring.rabbitmq.publisher-returns=true#整個虛拟機spring.rabbitmq.virtual-host=/#消息采用手動确認spring.rabbitmq.listener.direct.acknowledge-mode=manual#消費者最小數量spring.rabbitmq.listener.simple.concurrency=1#消費之最大數量spring.rabbitmq.listener.simple.max-concurrency=10#在單個請求中處理的消息個數,(unack的最大數量)spring.rabbitmq.listener.simple.prefetch=2#消費者自動啟動#spring.rabbitmq.listener.simple.auto-startup=true#消費者消費失敗,自動重新入隊#spring.rabbitmq.listener.simple.default-requeue-rejected=true#啟用發送重試 隊列滿了發不進去時啟動重試#spring.rabbitmq.template.retry.enabled=true#1秒鐘後重試一次#spring.rabbitmq.template.retry.initial-interval=1000ms#最大重試次數 3次#spring.rabbitmq.template.retry.max-attempts=3#最大間隔 10秒鐘#spring.rabbitmq.template.retry.max-interval=10000ms#等待間隔 的倍數。如果為2 第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒#spring.rabbitmq.template.retry.multiplier=1.0
3、編寫配置類
@SpringBootConfigurationpublic class RabbitMQConfig { @Bean public Queue queueWork(){ return new Queue("queueWork1"); } /*釋出訂閱模式*/ //首先得申明兩個隊列 @Bean public Queue queueFanout1(){ return new Queue("queueFanout1"); } @Bean public Queue queueFanout2(){ return new Queue("queueFanout2"); } //準備一個交換機 @Bean public FanoutExchange fanoutExchange1(){ return new FanoutExchange("fanoutExchange1"); } //将隊列綁定到交換機 @Bean public Binding bindingExchange1(Queue queueFanout1,FanoutExchange fanoutExchange1){ return BindingBuilder.bind(queueFanout1).to(fanoutExchange1); } @Bean public Binding bindingExchange2(Queue queueFanout2,FanoutExchange fanoutExchange1){ return BindingBuilder.bind(queueFanout2).to(fanoutExchange1); } //路由模型 //首先要準備兩個隊列 @Bean public Queue queueRouting1(){ return new Queue("queueRouting1"); } @Bean public Queue queueRouting2(){ return new Queue("queueRouting2"); } //準備一個交換機 @Bean public DirectExchange directExchange1(){ return new DirectExchange("directExchange1"); } //接下來進行綁定 @Bean public Binding bindingDirectExchange1(Queue queueRouting1,DirectExchange directExchange1){ return BindingBuilder.bind(queueRouting1).to(directExchange1).with("xiaobobo"); } @Bean public Binding bindingDirectExchange2(Queue queueRouting2,DirectExchange directExchange1){ return BindingBuilder.bind(queueRouting2).to(directExchange1).with("xiaowangzi"); }}
4、編寫manager
@Componentpublic class RabbitMQManager { @Autowired private RabbitTemplate rabbitTemplate; /** * 發送消息到工作模型中 */ public void sendWork(){ for (int i = 0; i <100 ; i++) { rabbitTemplate.convertAndSend("queueWork",new User(i,"張三","123")); } } /** * 向釋出訂閱模式裡面發送消息 */ public void sendPublishWork(){ rabbitTemplate.convertSendAndReceive("fanoutExchange1","","釋出訂閱模式的值"); } /** * 向路由模型裡面發送請求 */ public void sendDirectExchange(){ rabbitTemplate.convertSendAndReceive("directExchange1","xiaowangzi","路由模型的值"); } /** * Confirm機制 */ final RabbitTemplate.ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("确認機制........"); } }; /** * return機制 */ RabbitTemplate.ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback(){ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("return機制收到消息了....."); } }; /** * 發送資料的第二種方式 */ public void sendWork1(Object message, Map properties){// MessageProperties messageProperties = new MessageProperties();// //添加屬性...// Message message1 = new Message("内容".getBytes(), messageProperties);// //現在用下 return 機制 confirm機制.... rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback);// //用下return機制// rabbitTemplate.setReturnCallback(returnCallback); //這裡需要時間戳 每一條消息 都有一個時間戳 // 作業: // 把rabbitmq中的其他功能 // 手動确認 重回隊列 限流 dlx死信隊列玩一把 //rabbitTemplate.convertAndSend("queueWork","我是工作模型測試的值小波波"); }}
5、編寫service
@Servicepublic class RabbitMQService implements IRabbitMQService { @Autowired private RabbitMQManager rabbitMQManager; @Override public void sendMsg() { rabbitMQManager.sendWork(); } @Override public void sendPublishWork() { rabbitMQManager.sendPublishWork(); } @Override public void sendDirectExchange() { rabbitMQManager.sendDirectExchange(); }}
6、編寫controller
@RestControllerpublic class RabbitMQController { @Autowired private IRabbitMQService rabbitMQService; /** * 測試工作模型的發消息 * @return */ @RequestMapping("sendData") public Object sendMsg(){ rabbitMQService.sendMsg(); return "發送成功...."; } /** * 向釋出訂閱模式 發送值 * @return */ @RequestMapping("sendPublishData") public Object sendPublishData(){ rabbitMQService.sendPublishWork(); return "發送成功...."; } /** * 向釋出訂閱模式 發送值 * @return */ @RequestMapping("sendDirectData") public Object sendDirectData(){ rabbitMQService.sendDirectExchange(); return "發送成功...."; } }
7、編寫消費者
@Componentpublic class WorkReciveListener { @RabbitListener(queues = {"queueWork"}) public void reciveMessage(User user, Channel channel, Message message2){ System.out.println("111收到消息了:"+user); System.out.println("111通道是:"+channel); System.out.println("111傳輸資料封裝:"+message2);// message2.getMessageProperties().getDeliveryTag();// channel.basicAck(message2.getMessageProperties().getDeliveryTag(),false);// channel.basicNack(); } @RabbitListener(queues = {"queueWork"}) public void reciveMessage1(User user, Channel channel, Message message2){ System.out.println("222收到消息了:"+user); System.out.println("222通道是:"+channel); System.out.println("222傳輸資料封裝:"+message2);// message2.getMessageProperties().getDeliveryTag();// channel.basicAck(message2.getMessageProperties().getDeliveryTag(),false);// channel.basicNack(); }}
8、編寫釋出訂閱模式的消費者
@Componentpublic class PublishReciveListener { @RabbitListener(queues = {"queueFanout1"}) public void reciveMessage(String str, Channel channel, Message message2){ System.out.println("111收到消息了:"+str); } @RabbitListener(queues = {"queueFanout2"}) public void reciveMessage1(String str, Channel channel, Message message2){ System.out.println("222收到消息了:"+str); }}
9、編寫路由模型的消費者
@Componentpublic class DirectReciveListener { @RabbitListener(queues = {"queueRouting1"}) public void reciveMessage(String str, Channel channel, Message message2){ System.out.println("111收到消息了:"+str); } @RabbitListener(queues = {"queueRouting2"}) public voi reciveMessage1(String str, Channel channel, Message message2){ System.out.println("222收到消息了:"+str); }}
rabbitmq工作模式_SpringBoot 整合RabbitMQ1、導包2、編寫properties檔案3、編寫配置類4、編寫manager5、編寫service6、編寫controller7、編寫消費者8、編寫釋出訂閱模式的消費者9、編寫路由模型的消費者