天天看点

RabbitMQ进阶学习复习笔记

一、RabbitMQ概述

1.1 概念

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。

1.2 优势

**可靠性(Reliablity):**使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。

**灵活的路由(Flexible Routing):**在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。

**消息集群(Clustering):**多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。

**高可用(Highly Avaliable Queues):**队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

**多种协议(Multi-protocol):**支持多种消息队列协议,如STOMP、MQTT等。

**多种语言客户端(Many Clients):**几乎支持所有常用语言,比如Java、.NET、Ruby等。

**管理界面(Management UI):**提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。

**跟踪机制(Tracing):**如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。

**插件机制(Plugin System):**提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

二、RabbitMQ组件

RabbitMQ进阶学习复习笔记

**Broker:**标识消息队列服务器实体.

**Virtual Host:**虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

**Exchange:**交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

direct交换机:完全匹配,才会路由过去,点对点通信模式。

fanout交换机:广播类型

topic交换机:发布-订阅模式,消息交于所有订阅的交换机。

**Queue:**消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

**Banding:**绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

**Channel:**信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

**Connection:**网络连接,比如一个TCP连接。

**Publisher:**消息的生产者,也是一个向交换器发布消息的客户端应用程序。

**Consumer:**消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

**Message:**消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

三、安装RabbitMQ

3.1 docker 安装

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management      

4369 25672 【Erlang发现 集群端口】

5672 5671 【AMQP端口】

15672 【web管理控制台端口】

61613 61614 【STOMP协议端口】

1883 8883 【mqtt协议端口】

开机自启动

docker update rabbitmq --restart=always      

访问:http://ip:15672

账号密码均为 guest

3.2 控制台体验Direct Exchange交换机

1.系统默认的交换机

RabbitMQ进阶学习复习笔记

2.创建一个交换机

RabbitMQ进阶学习复习笔记

3.为交换机绑定队列—创建队列

RabbitMQ进阶学习复习笔记

4.绑定交换机与队列

RabbitMQ进阶学习复习笔记

绑定交换机与队列,添加好队列的名字与 路由键的名字

5.发消息,需要现发消息给交换机,交换机才能路由给消息队列

RabbitMQ进阶学习复习笔记

刷新队列,既可以看到指定的队列中用准备好了一条消息,南无如何消费这条消息呢?

需要点击这条消息队列中去,获取该条消息。

RabbitMQ进阶学习复习笔记

3.3 控制台体验fanout Exchange交换机

1.创建fanout exchange 交换机

RabbitMQ进阶学习复习笔记

这种交换机会发送至所有的队列-----

2.为其绑定多个消息队列…

创建几个消息队列,绑定

3.向其中一个消息队列的发送消息,指定其中一个路由键【广播,可以不设置路由键】。

RabbitMQ进阶学习复习笔记

4.查看消息队列,发现所有的队列全部收到消息。

3.3 控制台体验Topic Exchange交换机

1.创建主题交换机

RabbitMQ进阶学习复习笔记

2.绑定消息队列,路由键如何设置???

xxx.# 代表xxx以后匹配0个或者多个

*.xxx 代表匹配以.xxx结尾的消息队列消息路由键

3.测试【略】

四、Springboot整合RabbitMQ

4.1 引入依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>      

4.2 配置文件

spring.rabbitmq.host=192.168.52.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/      

4.3 启动类开启注解

4.4 AmqpAdmin创建Exchange/Queue/Binding

@Autowired
AmqpAdmin amqpAdmin;

/*
* 创建交换机
*/
@Test
public void createExchange(){
    // 名字,是否持久化,自动删除
    DirectExchage directExchange = new DirectExchage("hello-java-exchage",true,false);
    amqp.declareExchange(directExchange);
}

/*
* 创建队列
*/
@Test
public void createQueue(){
    //名字 ,持久化,排他,自动删除
    Queue queue = new Queue("hello-java-queue",true,false,false);
    amqp.declareQueue();
}

/*
* 绑定交换机与队列
*/
@Test
public void createBinding(){
    //目的地,目的地类型,交换机,路由键,参数
    Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
    amqp.declareBinding(binding);
}      

4.5 RabbitTemplate发送消息

@Autowired
RabbitTemplate rabbitTemplate;

/*
* 发送消息
*/
@Test
public void sendMessage(){
    // 交换机,路由键,消息内容【Object类型可以发送对象,但是java序列化了,对象必须序列化】
    rabbitTemplate.converAndSend("hello-java-exchage","hello.java","消息体");
}


为了能够用json类型的展示,需要配以一个序列化器放入IOC容器中
@Configuration
public class MyRabbitConfig{
    
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}      

4.6 @RabbitListener监听接受消息

// 必须放到组建中 方法中的类
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message,实体类 xxx){
    // 1.第一种接受消息 【麻烦】
    byte[] body = message.getBody();
    JSON.parse(body); 
    // 2.第二种接收消息 直接传入实体类
    System.out.println(message);
}      

继续阅读