
消息生产者 交换机 队列 消息消费者
【注意】首先保证在生产环境,RabbitMQ服务能够正常启动
1,搭建springboot开发环境 2,确认rabbitmq环境正确
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbitmq.demo</groupId>
<artifactId>rabbitmqDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<!-- springBoot默认组件 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
</parent>
<!-- springboot web资源引入 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<!-- maven 打包 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
#action
logging:
level:
org.springframework: INFO
com.example: INFO
#服务端口
server:
port: 8080
#spring boot
spring:
#RabbitMq配置
application:
name : rabbitmq 名称
rabbitmq:
publisher-confirms : true
host : localhost 本地ip也可以
port : 5672 默认端口是15276,但是在实际应用中为避免端口冲突,改成5672
username : guest 用户账号
password : yyzzpp rabbit用户密码
doMain .java
package com.rabbitmqDemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
* @author 18580
* 启动中心
*/
@SpringBootApplication
public class doMain {
public static void main(String[] args) throws Exception {
SpringApplication.run(doMain.class, args);
}
}
HelloRabbitConfig.java
package com.rabbitmqDemo;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 杨xx
* @date 2018年6月7日
* @version 1.0
* 配置中心
*/
@Configuration
public class HelloRabbitConfig {
//Direct模式
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
HelloSender.java
/**
*
*/
package com.rabbitmqDemo;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* @author 杨xx
* @date 2018年6月7日
* @version 1.0
* 发送者
*/
@Component
public class HelloSender {
protected static Logger logger=LoggerFactory.getLogger(HelloSender.class);
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String name) {
rabbitTemplate.convertAndSend("hello","hello,rabbit~"+name);
}
}
HelloReceiver.java
/**
*
*/
package com.rabbitmqDemo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 杨xx
* @date 2018年6月7日
* @version 1.0 消费者
*/
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);
@RabbitHandler
public void process(String hello) {
logger.info("消息消费者【hello】 : " + hello);
}
}
RabbitController.java
/**
*
*/
package com.rabbitmqDemo;
/**
* @author 杨xx
* @date 2018年6月7日
* @version 1.0
*
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitController {
protected static Logger logger = LoggerFactory.getLogger(RabbitController.class);
@Autowired
private HelloSender helloSender;
@RequestMapping("/send/{name}")
public void helloworld(@PathVariable String name) {
//单次请求
//helloSender.send(name);
//请求10次
for (int i=0;i<10;i++) {
helloSender.send(name);
}
}
}
至此,启动项目,单对单消息队列发送-接收成功;
单对多就是上边的请求10次【在创 建一个消费者方法】
HelloReceiver2.java
/**
*
*/
package com.rabbitmqDemo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 杨xx
* @date 2018年6月7日
* @version 1.0 消费者2
*/
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver2 {
protected static Logger logger = LoggerFactory.getLogger(HelloReceiver2.class);
@RabbitHandler
public void processC(String hello) {
logger.info("消息消费者[hello2]:" + hello);
}
}
【基本使用消息单对单和单对多成功】
【说明】在此过程中,遇到几种错误,标记如下,以作参考
问题主要是在配置文件中port与管理平台产生冲突,所以需要更改【port】,15672--5672;
2018-08-03 20:03:53.945 INFO 10336 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:15672]
2018-08-03 20:03:58.972 ERROR 10336 --- [127.0.0.1:15672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured
java.net.SocketException: Socket Closed
at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_144]
该问题是由于在注解@RabbitListener(queues = "aqueues")使用时,会再次申明队列信息,在该注解进行申明时,会产生错误链接不到服务错误
需要去rabbitmq管理平台创建该队列信息/或者重新启动rabbitmq服务,即可正常运行例子
2018-08-03 20:06:56.811 INFO 14016 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:15672]
2018-08-03 20:07:01.856 ERROR 14016 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpIOException: java.io.IOException
创建新用户账号时,一定不能删除管理员guest账户,更改guest管理员密码时,必须做好备份,否则密码不便找回;在更改guest密码后必须重新登陆一次验证更新信息是否正常登陆
【创建队列】
能否正常打开,账号能否正常登陆,或者配置文件里端口注意不要与其他项目产生冲突;基本就这些问题;