学习工作模式前,先看一下rabbitmq 给的helloworld案例

这是传统的一对一,,,, 也就是一台机器生产,一台机器接收....
为了更好的了解代码....我这里演示的话用底层的代码来演示....不整合框架了
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
这是rabbitmq 提供的依赖......导入一下就可以测试了
下面是我写的生产类...
测试是没问题的
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer01 {
//队列名称
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null; //与
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//设置端口
factory.setPort(5672);
//设置账号密码
factory.setUsername("guest"); //默认账号密码都是guest
factory.setPassword("guest");
//设置虚拟空间
factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器
//创建与RabbitMQ服务的TCP连接
connection = factory.newConnection();
//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话
channel = connection.createChannel();
//这有五个参数
/***
* 声明队列,如果Rabbit中没有此队列将自动创建*
* param1:队列名称*
* param2:是否持久化* rabbit 关闭了该队列是否存在..
* param3:队列是否独占此连接* 如果参数是true,那么一个连接connection 只能存在这一个channel,除非关闭程序
* param4:队列不再使用时是否自动删除此队列* 该队列不使用了就会删除该队列
*
* param5:队列参数*/
channel.queueDeclare(QUEUE, true, false, false,null );
String message = "你爱到极致的人,不会爱你";
//发布消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*
* param1 : 交换机 后面我会讲这里是指定交换机,使用默认的交换机
* param2 : 路由key,这也是先不写,,后面讲 大概作用是用于Exchange(交换机)将消息转发到指定的消息队列
* param3 :消息包含的属性
* 消息体
*
*/
channel.basicPublish("", QUEUE,null ,message.getBytes() );
System.out.println("Send Message is:'" + message + "'");
}catch (Exception e){
}finally {
if (channel != null){
channel.close();
}
if (connection != null){
connection.close();
}
}
}
}
然后下面是我写的消费类 ,, 连接mq代码都一样来着...关注发送消息和接收消息的方法就行 ...
同样测试过,代码是可运行的
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer01 {
//队列名称
private static final String QUEUE = "helloworld";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//设置端口
factory.setPort(5672);
//设置账号密码
factory.setUsername("guest"); //默认账号密码都是guest
factory.setPassword("guest");
//设置虚拟空间
factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器
//创建与RabbitMQ服务的TCP连接
connection = factory.newConnection();
//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话
channel = connection.createChannel();
//这有五个参数
/***
* 声明队列,如果Rabbit中没有此队列将自动创建*
* param1:队列名称*
* param2:是否持久化* rabbit 关闭了该队列是否存在..
* param3:队列是否独占此连接* 如果参数是true,那么一个连接connection 只能存在这一个channel,除非关闭程序
* param4:队列不再使用时是否自动删除此队列* 该队列不使用了就会删除该队列
*
* param5:队列参数*/
channel.queueDeclare(QUEUE, true, false, false,null ); //这里其实可以不用声明队列的,因为 生产者已经声明过了,但是如果生产者后发布服务,队列没有声明,消费者去监听队列..会报错
// 创建默认消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重写监听方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 监听队列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 队列名称
* param2 : 是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在
* param3 : 消费对象,,包含消费方法
*
*/
channel.basicConsume(QUEUE,true , consumer);
}catch (Exception e){
}
}
}
以上的话就是rabbitmq提供的案例
一台生产者 , 一台消费者
emmm,,,
这里讲的工作的模式是 workqueues
WorkQueues
对比helloword案例,这里多了个消费者..
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试
我们启动俩次消费者
然后用刚刚写的生产者,发送五条信息
然后我们看消费者打印的信息
结果 :
mq workqueues 使用的是轮询方式讲信息平均发给消费者 ,
消费者会在处理完消息后 接收下一条消息
2.Publish/subscribe 发布订阅模式
特点
生产者将消息发送给broker.由交换机将消息发给每个跟绑定了交换机绑定的消息队列,每个队列都能收到生产者发送的每一条消息
生产者 :
声明Exchange_fanout_inform交换机。
声明两个队列并且绑定到此交换机,
绑定时不需要指定routingkey发送消息时不需要指定routingkey
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer02 {
//消息队列名称
public static final String QUEUE_INFORM_Test1 = "queue_inform_1";
public static final String QUEUE_INFORM_Test2 = "queue_inform_2";
//交换机名称
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//设置端口
factory.setPort(5672);
//设置账号密码
factory.setUsername("guest"); //默认账号密码都是guest
factory.setPassword("guest");
//设置虚拟空间
factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器
//创建与RabbitMQ服务的TCP连接
connection = factory.newConnection();
//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话
channel = connection.createChannel();
//声明交换机
/*
String exchange, String type
param1 : 交换机
param2 : 交换机 类型 fanout 、 topic、direct、headers
FANOUT 对应的模式是 发布订阅模式 publish/subscribe 模式
其他的工作模式以后会将
DIRECT 对应的是路由的工作模式
TOPIC 对应的是通配符工作模式
HEADERS 对应了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//声明队列
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*
* param1: 队列名称
* param2: 是否持久化
* param3 : 是否独占此队列
* param4 : 队列不用是否自动删除
* param5 : 参数
*/
channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );
channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );
//交换机和队列绑定
/**
* String queue, String exchange, String routingKey
* param1 : 队列名称
* exchange : 交换机
* routingKey : 路由key 后面讲,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );
// 发送消息
String message = "";
for (int i = 0; i < 9; i++) {
message = "故事的开头总是这样,适逢其会,猝不及防。故事的结局总是这样,花开两朵,天各一方。"+ i;
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*
* param1 交换机名称
* param2 路由key,后面讲,先用 "" 代替 ,
* param3 参数
* param4 传递的字符串
*
*
*/
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
}catch (Exception e){
}finally{
if(channel!=null){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
下面是我写的俩个消费者
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest1 {
//队列名称
private static final String QUEUE_INFORM_Test1 = "queue_inform_1";
//交换机名称
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//设置端口
factory.setPort(5672);
//设置账号密码
factory.setUsername("guest"); //默认账号密码都是guest
factory.setPassword("guest");
//设置虚拟空间
factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器
//创建与RabbitMQ服务的TCP连接
connection = factory.newConnection();
//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话
channel = connection.createChannel();
//这有五个参数
/***
* 声明队列,如果Rabbit中没有此队列将自动创建*
* param1:队列名称*
* param2:是否持久化* rabbit 关闭了该队列是否存在..
* param3:队列是否独占此连接* 如果参数是true,那么一个连接connection 只能存在这一个channel,除非关闭程序
* param4:队列不再使用时是否自动删除此队列* 该队列不使用了就会删除该队列
*
* param5:队列参数*/
channel.queueDeclare(QUEUE_INFORM_Test1, true, false, false,null ); //这里其实可以不用声明队列的,因为 生产者已经声明过了,但是如果生产者后发布服务,队列没有声明,消费者去监听队列..会报错
//声明交换机
/*
String exchange, String type
param1 : 交换机
param2 : 交换机 类型 fanout 、 topic、direct、headers
FANOUT 对应的模式是 发布订阅模式 publish/subscribe 模式
其他的工作模式以后会将
DIRECT 对应的是路由的工作模式
TOPIC 对应的是通配符工作模式
HEADERS 对应了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//交换机和队列绑定
/**
* String queue, String exchange, String routingKey
* param1 : 队列名称
* exchange : 交换机
* routingKey : 路由key 后面讲,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
// 创建默认消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重写监听方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 监听队列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 队列名称
* param2 : 是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在
* param3 : 消费对象,,包含消费方法
*
*/
channel.basicConsume(QUEUE_INFORM_Test1,true , consumer);
}catch (Exception e){
}
}
}
消费者二
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest2 {
//队列名称
public static final String QUEUE_INFORM_Test2 = "queue_inform_2";
//交换机名称
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//设置端口
factory.setPort(5672);
//设置账号密码
factory.setUsername("guest"); //默认账号密码都是guest
factory.setPassword("guest");
//设置虚拟空间
factory.setVirtualHost("/");//虚拟机默认的虚拟名称为/ , 虚拟机相当于一个独立的服务器
//创建与RabbitMQ服务的TCP连接
connection = factory.newConnection();
//创建连接通道 ,每个连接可以创建多个通道,每个通道只有一个会话
channel = connection.createChannel();
//这有五个参数
/***
* 声明队列,如果Rabbit中没有此队列将自动创建*
* param1:队列名称*
* param2:是否持久化* rabbit 关闭了该队列是否存在..
* param3:队列是否独占此连接* 如果参数是true,那么一个连接connection 只能存在这一个channel,除非关闭程序
* param4:队列不再使用时是否自动删除此队列* 该队列不使用了就会删除该队列
*
* param5:队列参数*/
channel.queueDeclare(QUEUE_INFORM_Test2, true, false, false,null ); //这里其实可以不用声明队列的,因为 生产者已经声明过了,但是如果生产者后发布服务,队列没有声明,消费者去监听队列..会报错
//声明交换机
/*
String exchange, String type
param1 : 交换机
param2 : 交换机 类型 fanout 、 topic、direct、headers
FANOUT 对应的模式是 发布订阅模式 publish/subscribe 模式
其他的工作模式以后会将
DIRECT 对应的是路由的工作模式
TOPIC 对应的是通配符工作模式
HEADERS 对应了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//交换机和队列绑定
/**
* String queue, String exchange, String routingKey
* param1 : 队列名称
* exchange : 交换机
* routingKey : 路由key 后面讲,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );
// 创建默认消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重写监听方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 监听队列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 队列名称
* param2 : 是否自动回复,接收到消息会自动恢复mq收到了,mq会删除消息,如果拒绝的话需要手动回复,不回复的话会导致mq不删除被消费过的消息,一直存在
* param3 : 消费对象,,包含消费方法
*
*/
channel.basicConsume(QUEUE_INFORM_Test2,true , consumer);
}catch (Exception e){
}
}
}
细心的同学可能发现了,,生产者跟消费者的不同代码其实就是发送消息,接收消息的方法而已 ....
生产者声明队列跟交换机,消费者也声明各自的队列跟交换机... 其实就是为了怕先启动消费者没有发现队列跟交换机报错而已...
核心代码的话 其实 就是 生成交换机,生成队列 绑定交换机跟队列
测试的话
我们先启动俩个消费者
然后我们启动生产者
我们看打印结果,,俩台消费者各自都处理了9条信息
其实这种方法比WORKQUEUES 工作模式强,因为多台机器可以监听一个队列,也就是下图所示,我们可以要俩个队列,当然也可以创建一个队列...创建多少队列跟一个队列多少消费者完全取决与我们
因为时间问题,,,,还有4种工作模式下次写- -