RabbitMQ简介
RabbitMQ是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量地使用。RabbitMQ凭借其高可靠、易扩展、高可用及丰富的功能特性受到越来越多企业的青睐。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
RabbitMQ安装
官网下载地址:https://www.rabbitmq.com/install-windows.html

不过安装RabbitMQ之前需要先安装对应支持版本的Erlang。
https://www.rabbitmq.com/which-erlang.html#supported-version-policy
Erlang下载地址:http://www.erlang.org/downloads
RabbitMQ安装好后再环境变量中添加系统变量ERLANG_HOMT,变量值为安装Erlang的路径(路径中不要包含bin目录)。
在系统变量path中添加%ERLANG_HOME%\bin
在浏览器中输入http://localhost:15672,默认用户名和密码都是guest,进入主页面后就是这样
生产者与消费者
Producer:生产者,就是投递消息的一方。
生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体和标签。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者.
Consumer:消费者,就是接受消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就是不知道消息的生产者是谁,当然消费者也不需要知道。
队列、交换器、路由键、绑定
Queue:队列,是RabbitMQ的内部对象,用于存储消息。
RabbitMQ中消息都只能存储在队列中。多个消费者可以订约同一个队列,这时队列中的消息会被平均分摊又称轮询分发。
Exchange:交换器。生产者将消息发送到Exchange,由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。RabbitMQ中的交换器可以看作一个简单的实体。
RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个Routing Key需要与交换器类型和绑定联合使用才能最终生效。
Binding:绑定。RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键BindingKey,这样RabbitMQ就知道如何正确地将消息路由到队列了。RoutingKey相当于邮寄包裹的地址,BindingKey相当于包裹的目的地。在某些情况下RoutingKey和BindingKey可以看作同一个东西。
交换器类型
RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种。
fanout
它会把发送到该交换器的消息路由到所有与该交换器绑定的队列中。
direct
direct类型的交换器路由规则也很简单,它会把消息路由到BindingKey和RoutingKey完全匹配的队列中,如下图发送消息的时候设置路由键为"warning",消息就是同时路由到Queue1和Queue2,如果发送消息的时候设置路由键为"info"或者"debug",消息就只会路由到Queue2。如果发送消息的时候设置路由键为其它的,那么消息就不会发送到这两个队列中。
topic
topic交换器与direct相似也是将消息路由到BindingKey和RountingKey相匹配的队列中,但这里的匹配规则有些不同,它有以下一些约定:
- RoutingKey为一个点号"·“分隔的字符串(被点号”·“分隔开的每一段独立的字符串称为一个单词,)如“com.rabbitmq.client”。
- BindingKey和RoutingKey一样也是"·"分隔的字符串。
- BindingKey中可以存在两种特殊的字符串"*“和”#",用于做模糊匹配,其中"**“用于匹配一个单词,”#"用于匹配多规格单词。
在下图中路由键为“com.rabbitmq.client"会同时路由到Queue1和Queue2;“com.hidden.demo"会路由到Queue2中;“java.rabbitmq.demo"会路由到Queue1中。
headers
headers类型的交换器不依赖路由键的匹配规则来路由消息,而是根据发送消息内容中的headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列。
RabbitMq运行流程
生产者生产消息
- 连接到RabbitMq Broker,建立一个连接,开启一个Channel
- 生产者声明一个交换器
- 生产者声明一个队列并设置相关属性
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至RabbitMq Broker
- 相应的交换器根据接收到的路由键查找相匹配的队列
- 如果找到则将生产者发送过来的消息存入应用中
- 如果没有则丢弃或者返回给生产者
- 关闭Channel
- 关闭连接
消费者接收消息
- 连接到RabbitMq Broker,建立一个连接,开启一个Channel
- 消费者向RabbitMq Broker请求消费队列中的消息
- 等待RabbitMq Broker回应并投递相应队列中的消息,消费者接收消息
- 消费者确认消息
- RabbitMq从队列中删除相应已被确认的消息
- 关闭Channel
- 关闭连接
代码事例
- 获取MQ连接
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException{
//定义一个连接工厂
ConnectionFactory factory =new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("vhost");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123456");
return factory.newConnection();
}
}
这里的vhost可以在Virtual Hosts中通过Add virtual hosts自己添加
- 生产者,采用的交换机模式为direct
public class Send {
private static final String EXCHANGE_NAME="test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String msg="hello direct!";
String routingKey="info";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("send "+msg);
channel.close();
connection.close();
}
}
- 消费者1
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.basicQos(1);
//定义一个消费者
Consumer consumer=new DefaultConsumer(channel){
//消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[1] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("[1] done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck=false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAck , consumer);
}
}
- 消费者2
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
channel.basicQos(1);
//定义一个消费者
Consumer consumer=new DefaultConsumer(channel){
//消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[2] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("[2] done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck=false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAck , consumer);
}
}
queueDeclare在RabbitMq源码如下:
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException , Object> arguments) throws IOException ;
它有五个参数:
- queue:队列名称
- durable:设置是否持久化
- exclusive:设置是否排他
- autoDelete:设置是否自动删除
- arguments:设置队列一些其他参数
queueBind在RabbitMq源码如下:
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException ;
参数解析:
- queue:队列名称
- exchange:交换机名称
- routingKey:用来绑定队列和交换器的路由键
在上述代码中交换机是direct类型,消费者1和消费者2的routingKey:
这是生产者的routingKey(bindingKey):
依次运行消费者和生产者,在消费者2的控制台会打印输出而消费者1的控制台不会打印:
同时RabbitMq客户端会自动生成Queue1和Queue2
如果之前没有创建或生成消费者绑定的交换机,则会报错:
此时要先创建交换机或者先运行生产者一次让其自动生成交换机。