
消息生産者 交換機 隊列 消息消費者
【注意】首先保證在生産環境,RabbitMQ服務能夠正常啟動
1,搭建springboot開發環境 2,确認rabbitmq環境正确
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbitmq.demo</groupId>
<artifactId>rabbitmqDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<!-- springBoot預設元件 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
</parent>
<!-- springboot web資源引入 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<!-- maven 打包 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
#action
logging:
level:
org.springframework: INFO
com.example: INFO
#服務端口
server:
port: 8080
#spring boot
spring:
#RabbitMq配置
application:
name : rabbitmq 名稱
rabbitmq:
publisher-confirms : true
host : localhost 本地ip也可以
port : 5672 預設端口是15276,但是在實際應用中為避免端口沖突,改成5672
username : guest 使用者賬号
password : yyzzpp rabbit使用者密碼
doMain .java
package com.rabbitmqDemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
* @author 18580
* 啟動中心
*/
@SpringBootApplication
public class doMain {
public static void main(String[] args) throws Exception {
SpringApplication.run(doMain.class, args);
}
}
HelloRabbitConfig.java
package com.rabbitmqDemo;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 楊xx
* @date 2018年6月7日
* @version 1.0
* 配置中心
*/
@Configuration
public class HelloRabbitConfig {
//Direct模式
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
HelloSender.java
/**
*
*/
package com.rabbitmqDemo;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* @author 楊xx
* @date 2018年6月7日
* @version 1.0
* 發送者
*/
@Component
public class HelloSender {
protected static Logger logger=LoggerFactory.getLogger(HelloSender.class);
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String name) {
rabbitTemplate.convertAndSend("hello","hello,rabbit~"+name);
}
}
HelloReceiver.java
/**
*
*/
package com.rabbitmqDemo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 楊xx
* @date 2018年6月7日
* @version 1.0 消費者
*/
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);
@RabbitHandler
public void process(String hello) {
logger.info("消息消費者【hello】 : " + hello);
}
}
RabbitController.java
/**
*
*/
package com.rabbitmqDemo;
/**
* @author 楊xx
* @date 2018年6月7日
* @version 1.0
*
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitController {
protected static Logger logger = LoggerFactory.getLogger(RabbitController.class);
@Autowired
private HelloSender helloSender;
@RequestMapping("/send/{name}")
public void helloworld(@PathVariable String name) {
//單次請求
//helloSender.send(name);
//請求10次
for (int i=0;i<10;i++) {
helloSender.send(name);
}
}
}
至此,啟動項目,單對單消息隊列發送-接收成功;
單對多就是上邊的請求10次【在創 建一個消費者方法】
HelloReceiver2.java
/**
*
*/
package com.rabbitmqDemo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 楊xx
* @date 2018年6月7日
* @version 1.0 消費者2
*/
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver2 {
protected static Logger logger = LoggerFactory.getLogger(HelloReceiver2.class);
@RabbitHandler
public void processC(String hello) {
logger.info("消息消費者[hello2]:" + hello);
}
}
【基本使用消息單對單和單對多成功】
【說明】在此過程中,遇到幾種錯誤,标記如下,以作參考
問題主要是在配置檔案中port與管理平台産生沖突,是以需要更改【port】,15672--5672;
2018-08-03 20:03:53.945 INFO 10336 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:15672]
2018-08-03 20:03:58.972 ERROR 10336 --- [127.0.0.1:15672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured
java.net.SocketException: Socket Closed
at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_144]
該問題是由于在注解@RabbitListener(queues = "aqueues")使用時,會再次申明隊列資訊,在該注解進行申明時,會産生錯誤連結不到服務錯誤
需要去rabbitmq管理平台建立該隊列資訊/或者重新啟動rabbitmq服務,即可正常運作例子
2018-08-03 20:06:56.811 INFO 14016 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:15672]
2018-08-03 20:07:01.856 ERROR 14016 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpIOException: java.io.IOException
建立新使用者賬号時,一定不能删除管理者guest賬戶,更改guest管理者密碼時,必須做好備份,否則密碼不便找回;在更改guest密碼後必須重新登陸一次驗證更新資訊是否正常登陸
【建立隊列】
能否正常打開,賬号能否正常登陸,或者配置檔案裡端口注意不要與其他項目産生沖突;基本就這些問題;