Rabbitmq基礎知識
文章目錄
- Rabbitmq基礎知識
-
- mq的基本概念
- mq的優勢和劣勢
- mq的劣勢
- 常見的mq的産品
- RabbitMq簡介
- 安裝
- 簡單隊列的代碼示範
-
- provider
- consumer
- 常見的幾種消息政策
- 交換機類型
- 工作模式
- 訂閱模式
- Routing(路由)
- Topics工作模式(通配符)
- spring整合rabbitmq(生産者)
- spring整合rabbitmq(消費者)
- spring boot整合rabbitmq生産者
- spring boot整合rabbitmq消費者
- rabbitmq進階特性
-
- 消息的可靠投遞
-
- 确認模式
- 回退模式
- consumer ack
- 消費端限流
- TTL
- 單獨設定消息的過期時間
- 死信隊列
- rabbitmq的應用
- rabbitmq的叢集搭建
黑馬
看到第26集
mq的基本概念
mq的優勢和劣勢
應用解耦
異步提速
消峰填谷
消峰填谷
mq的劣勢
常見的mq的産品
RabbitMq簡介
JMS
安裝
進入到rabbitmq的sbin目錄,輕按兩下rabbitmq-server.bat,如果出現如下頁面則代表啟動成功
進入管理控制台
在浏覽器位址欄中輸入:http://localhost:15672,看到如下頁面則代表啟動成功
初始化:使用者名和密碼均為guest
簡單隊列的代碼示範
provider
結合下面這張圖寫代碼
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
Channel channel=connection.createChannel();
// 5.建立隊列
/*
durable:持久的
exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
autoDelete:是否自動删除(當沒有消費者時,是否自動删除)
queueDeclare(String queue, boolean durable, boolean exclusive
, boolean autoDelete, Map<String, Object> arguments)
*/
// 如果沒有hjx_quene的隊列則會建立該隊列,否則不會進行建立
channel.queueDeclare("hjx_queue",true,false,false,null);
// 6.發送消息
/*
1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
3.props:配置資訊
4.發送的内容
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String message="hello world,歡迎跟着黑馬大神一起學習rabbitmq";
channel.basicPublish("","hjx_quene",null,message.getBytes());
// 釋放資源
channel.close();
connection.close();
consumer
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
Channel channel=connection.createChannel();
// 靜态内部類
Consumer consumer=new DefaultConsumer(channel){
// 這是一個回調方法 當收到消息後會自動執行該方法
/**
*
* @param consumerTag 消息唯一辨別
* @param envelope 擷取一些資訊 交換機、路由key
* @param properties 配置資訊
* @param body 擷取到的資料
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope.getExchange());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hjx_queue",true,consumer);
// 注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
}
常見的幾種消息政策
交換機類型
type:交換機類型
* DIRECT("direct"):定向
* FANOUT("fanout") 扇形(也就是廣播,發送到每個隊列)
* TOPIC("topic"), 通配符的方式
* HEADERS("headers"); 通過參數比對 (很少用)
工作模式
這裡為了測試,需要建立2個consumer,然後啟動consumer和provider。當consumer監聽到有消息時就會将對應的消息列印出來。代碼如下:
provider
public void sendMs() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
Channel channel=connection.createChannel();
// 5.建立隊列
/*
durable:持久的
exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
autoDelete:是否自動删除(當沒有消費者時,是否自動删除)
queueDeclare(String queue, boolean durable, boolean exclusive
, boolean autoDelete, Map<String, Object> arguments)
*/
// 如果沒有hjx_quene的隊列則會建立該隊列,否則不會進行建立
channel.queueDeclare("hjx_queue",true,false,false,null);
// 6.發送消息
/*
1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
3.props:配置資訊
4.發送的内容
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
// 這裡為了測試友善,發送10條消息,因為時工作模式下的消息隊列嘛
for(int i=0;i<10;i++){
String message="hello world,歡迎跟着黑馬大神一起學習rabbitmq"+(i+1);
channel.basicPublish("","hjx_quene",null,message.getBytes());
}
// 釋放資源
channel.close();
connection.close();
}
consumer
注意:這裡的consumer,建立2個内容一模一樣的就可以了。隻要檔案名不一樣就行。
public void Test1() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
Channel channel=connection.createChannel();
// 靜态内部類
Consumer consumer=new DefaultConsumer(channel){
// 這是一個回調方法 當收到消息後會自動執行該方法
/**
*
* @param consumerTag 消息唯一辨別
* @param envelope 擷取一些資訊 交換機、路由key
* @param properties 配置資訊
* @param body 擷取到的資料
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope.getExchange());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hjx_queue",true,consumer);
// 注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
}
訂閱模式
生産者
@Test
public void test() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
/**
* exchange:交換機名稱
* type:交換機類型
* DIRECT("direct"):定向
* FANOUT("fanout") 扇形(也就是廣播,發送到每個隊列)
* TOPIC("topic"), 通配符的方式
* HEADERS("headers"); 通過參數比對
* durable:是否持久化
* autoDelete:是否自動删除
* internal:内部使用。一般為false
* arguments:參數
*
* exchangeDeclare(String exchange, BuiltinExchangeType type,
* boolean durable, boolean autoDelete, Map<String, Object> arguments)
*
*
*/
Channel channel=connection.createChannel();
String exchangeName="fanout_exchange";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);
String quueName1="quue_1";
String quueName2="quue_2";
/*
durable:持久的
exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
autoDelete:是否自動删除(當沒有消費者時,是否自動删除)
queueDeclare(String queue, boolean durable, boolean exclusive
boolean autoDelete, Map<String, Object> arguments)
*/
// 聲明隊列,這裡需要聲明多個隊列
channel.queueDeclare(quueName1,true,false,false,null);
channel.queueDeclare(quueName2,true,false,false,null);
// 綁定隊列和交換機的關系
/**
* queue:隊列的名稱
* exchange:交換機的名稱
* routingKey:路由key 如果交換機的類型為fanout,則routingKey=""
*
* queueBind(String queue, String exchange, String routingKey)
*/
channel.queueBind(quueName1,exchangeName,"");
channel.queueBind(quueName2,exchangeName,"");
// 發送消息
/*
1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
3.props:配置資訊
4.發送的内容
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String message="日志資訊";
//這樣交換機就會将消息轉發給對應的2個隊列
channel.basicPublish(exchangeName,"",null,message.getBytes());
// 釋放資源
channel.close();
connection.close();
}
消費者
consumer1
@Test
public void test() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
/**
* exchange:交換機名稱
* type:交換機類型
* DIRECT("direct"):定向
* FANOUT("fanout") 扇形(也就是廣播,發送到每個隊列)
* TOPIC("topic"), 通配符的方式
* HEADERS("headers"); 通過參數比對
* durable:是否持久化
* autoDelete:是否自動删除
* internal:内部使用。一般為false
* arguments:參數
*
* exchangeDeclare(String exchange, BuiltinExchangeType type,
* boolean durable, boolean autoDelete, Map<String, Object> arguments)
*
*
*/
Channel channel=connection.createChannel();
String exchangeName="fanout_exchange";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);
String quueName1="quue_1";
String quueName2="quue_2";
/*
durable:持久的
exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
autoDelete:是否自動删除(當沒有消費者時,是否自動删除)
queueDeclare(String queue, boolean durable, boolean exclusive
boolean autoDelete, Map<String, Object> arguments)
*/
// 聲明隊列,這裡需要聲明多個隊列
channel.queueDeclare(quueName1,true,false,false,null);
channel.queueDeclare(quueName2,true,false,false,null);
// 綁定隊列和交換機的關系
/**
* queue:隊列的名稱
* exchange:交換機的名稱
* routingKey:路由key 如果交換機的類型為fanout,則routingKey=""
*
* queueBind(String queue, String exchange, String routingKey)
*/
channel.queueBind(quueName1,exchangeName,"");
channel.queueBind(quueName2,exchangeName,"");
// 發送消息
/*
1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
3.props:配置資訊
4.發送的内容
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String message="日志資訊";
//這樣交換機就會将消息轉發給對應的2個隊列
channel.basicPublish(exchangeName,"",null,message.getBytes());
// 釋放資源
channel.close();
connection.close();
}
consumer2
@Test
public void Test() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
Channel channel=connection.createChannel();
// 隊列的名字
String quueName1="quue_1";
String quueName2="quue_2";
// 靜态内部類
Consumer consumer=new DefaultConsumer(channel){
// 這是一個回調方法 當收到消息後會自動執行該方法
/**
*
* @param consumerTag 消息唯一辨別
* @param envelope 擷取一些資訊 交換機、路由key
* @param properties 配置資訊
* @param body 擷取到的資料
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
System.out.println("将日志資訊儲存到資料庫.....");
}
};
channel.basicConsume(quueName2,true,consumer);
// 注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
}
Routing(路由)
發送的消息的routingkey如果和指定隊列的routingkey相同,則交換機就把消息轉發給對應的隊列。
provider
public void test() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
/**
* exchange:交換機名稱
* type:交換機類型
* DIRECT("direct"):定向
* FANOUT("fanout") 扇形(也就是廣播,發送到每個隊列)
* TOPIC("topic"), 通配符的方式
* HEADERS("headers"); 通過參數比對
* durable:是否持久化
* autoDelete:是否自動删除
* internal:内部使用。一般為false
* arguments:參數
*
* exchangeDeclare(String exchange, BuiltinExchangeType type,
* boolean durable, boolean autoDelete, Map<String, Object> arguments)
*
*
*/
Channel channel=connection.createChannel();
String exchangeName="direct_exchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
String quueName1="quue_1";
String quueName2="quue_2";
/*
durable:持久的
exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
autoDelete:是否自動删除(當沒有消費者時,是否自動删除)
queueDeclare(String queue, boolean durable, boolean exclusive
boolean autoDelete, Map<String, Object> arguments)
*/
// 聲明隊列,這裡需要聲明多個隊列
channel.queueDeclare(quueName1,true,false,false,null);
channel.queueDeclare(quueName2,true,false,false,null);
// 綁定隊列和交換機的關系
/**
* queue:隊列的名稱
* exchange:交換機的名稱
* routingKey:路由key 如果交換機的類型為fanout,則routingKey=""
*
* queueBind(String queue, String exchange, String routingKey)
*/
// 隊列1的綁定
channel.queueBind(quueName1,exchangeName,"error");
// 隊列2的綁定
channel.queueBind(quueName2,exchangeName,"info");
channel.queueBind(quueName2,exchangeName,"warning");
channel.queueBind(quueName2,exchangeName,"error");
// 發送消息
/*
1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
3.props:配置資訊
4.發送的内容
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String message="日志資訊";
//這樣交換機就會将消息轉發給對應的2個隊列 注意:這裡的第2個參數info為消息的路由key,它會和隊列的路由key進行比對
channel.basicPublish(exchangeName,"info",null,message.getBytes());
// 釋放資源
channel.close();
connection.close();
}
consumer1
public void Test() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
Channel channel=connection.createChannel();
// 隊列的名字
String quueName1="quue_1";
String quueName2="quue_2";
// 靜态内部類
Consumer consumer=new DefaultConsumer(channel){
// 這是一個回調方法 當收到消息後會自動執行該方法
/**
*
* @param consumerTag 消息唯一辨別
* @param envelope 擷取一些資訊 交換機、路由key
* @param properties 配置資訊
* @param body 擷取到的資料
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
System.out.println("将日志消息列印到控制台.....");
}
};
channel.basicConsume(quueName1,true,consumer);
// 注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
}
注意:另外一個consumer2和上面這裡的consumer1的代碼一樣(隻是将要接收的隊列名字不一樣就行了)
Topics工作模式(通配符)
通過通配符将消息轉發到指定隊列
注意:這裡的通配符: *:比對1個單詞 #:0個或多個單詞
provider
public void test() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
/**
* exchange:交換機名稱
* type:交換機類型
* DIRECT("direct"):定向
* FANOUT("fanout") 扇形(也就是廣播,發送到每個隊列)
* TOPIC("topic"), 通配符的方式
* HEADERS("headers"); 通過參數比對
* durable:是否持久化
* autoDelete:是否自動删除
* internal:内部使用。一般為false
* arguments:參數
*
* exchangeDeclare(String exchange, BuiltinExchangeType type,
* boolean durable, boolean autoDelete, Map<String, Object> arguments)
*
*
*/
Channel channel=connection.createChannel();
String exchangeName="topic_exchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String quueName1="quue_1";
String quueName2="quue_2";
/*
durable:持久的
exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
autoDelete:是否自動删除(當沒有消費者時,是否自動删除)
queueDeclare(String queue, boolean durable, boolean exclusive
boolean autoDelete, Map<String, Object> arguments)
*/
// 聲明隊列,這裡需要聲明多個隊列
channel.queueDeclare(quueName1,true,false,false,null);
channel.queueDeclare(quueName2,true,false,false,null);
// 綁定隊列和交換機的關系
/**
* queue:隊列的名稱
* exchange:交換機的名稱
* routingKey:路由key 如果交換機的類型為fanout,則routingKey=""
*
* queueBind(String queue, String exchange, String routingKey)
*/
// 隊列1的綁定
/**
* routingkey:系統名稱.日志級别
* 需求:将error級别的日志存入資料庫,所有order系統的日志也存入資料庫
*
*/
channel.queueBind(quueName1,exchangeName,"#.error");
// 隊列2的綁定
channel.queueBind(quueName1,exchangeName,"order.*");
channel.queueBind(quueName2,exchangeName,"*.*");
// 發送消息
/*
1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
3.props:配置資訊
4.發送的内容
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String message="日志資訊";
//這樣交換機就會将消息轉發給對應的2個隊列 注意:這裡的第2個參數info為消息的路由key,它會和隊列的路由key進行比對
channel.basicPublish(exchangeName,"order.info",null,message.getBytes());
// 釋放資源
channel.close();
connection.close();
}
consumer
public void Test() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory factory=new ConnectionFactory();
// 2.設定參數
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hjx");
factory.setPassword("123456");
// 3.建立連接配接
Connection connection =factory.newConnection();
// 4.建立channel
/*
1.quene:隊列名稱
2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
3.callback:回調函數
basicConsume(String queue, boolean autoAck, Consumer callback)
*/
Channel channel=connection.createChannel();
// 隊列的名字
String quueName1="quue_1";
String quueName2="quue_2";
// 靜态内部類
Consumer consumer=new DefaultConsumer(channel){
// 這是一個回調方法 當收到消息後會自動執行該方法
/**
*
* @param consumerTag 消息唯一辨別
* @param envelope 擷取一些資訊 交換機、路由key
* @param properties 配置資訊
* @param body 擷取到的資料
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
System.out.println("将日志消息列印到控制台.....");
}
};
channel.basicConsume(quueName1,true,consumer);
// 注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
}
注意:另外一個consume和上面這個一樣,隻是擷取消息的隊列的名字不一樣而已
spring整合rabbitmq(生産者)
1.導入依賴
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
2.application配置檔案
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/
3.配置xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 配置連接配接工廠 -->
<rabbit:connection-factory id="connectionFactory"
host="${spring.rabbitmq.host}" port="${spring.rabbitmq.port}" username="${spring.rabbitmq.username}" password="${spring.rabbitmq.password}" />
<!-- 定義mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />
<!--
聲明隊列
auto-declare:如果沒有該隊列就會自動建立
-->
<rabbit:queue name="que_cat" auto-declare="true" durable="true" />
<rabbit:queue name="que_pig" auto-declare="true" durable="true" />
<!-- 定義交換機綁定隊列(路由模式) -->
<rabbit:direct-exchange name="IExchange"
id="IExchange">
<rabbit:bindings>
<rabbit:binding queue="que_cat" key="que_cat_key" />
<rabbit:binding queue="que_pig" key="que_pig_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息對象json轉換類 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 定義模版 用于發送消息-->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" exchange="IExchange"
message-converter="jsonMessageConverter" />
</beans>
4.測試
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-rabbitmq-send.xml")
public class Test {
// 注入 rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@org.junit.Test
public void rabbitTest(){
// 發送消息 路由模式下的發送
rabbitTemplate.convertAndSend("que_cat_key","hello world");
}
}
spring整合rabbitmq(消費者)
1.依賴
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
2.xml配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 配置連接配接工廠 -->
<rabbit:connection-factory id="connectionFactory"
host="${spring.rabbitmq.host}" port="${spring.rabbitmq.port}" username="${spring.rabbitmq.username}" password="${spring.rabbitmq.password}" />
<!-- 定義mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 聲明隊列 -->
<rabbit:queue name="que_cat" auto-declare="true" durable="true" />
<rabbit:queue name="que_pig" auto-declare="true" durable="true" />
<!-- 定義消費者
CatHandler 和 PigHandler 是一個監聽者
-->
<bean name="catHandler" class="com.hjx.monitor.CatHandler" />
<bean name="pigHandler" class="com.hjx.monitor.PigHandler" />
<!-- 定義消費者監聽隊列 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="catHandler" queues="que_cat" />
<rabbit:listener ref="pigHandler" queues="que_pig" />
</rabbit:listener-container>
</beans>
3.properties配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/
4.編寫監聽器實作MessageListener接口實作onMessage方法
package com.hjx.monitor;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CatHandler implements MessageListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
public void onMessage(Message msg) {
try {
//msg就是rabbitmq傳來的消息
// 使用jackson解析
// msg.getBody():消息内容
JsonNode jsonData = MAPPER.readTree(msg.getBody());
System.out.println("我是可愛的小貓,我的id是" + jsonData.get("id").asText()
+ ",我的名字是" + jsonData.get("name").asText());
} catch (IOException e) {
e.printStackTrace();
}
}
}
spring boot整合rabbitmq生産者
1.導入依賴
<dependencies>
<!-- spring boot整合rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2.編寫配置檔案
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123
virtual-host: /
3.編寫配置類
package com.hjx.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//交換機的名稱
public static final String exchangeName="boot_topic";
//隊列的名稱
public static final String queueName="boot_quue";
/**
* 1.交換機
* 2.隊列
* 3.交換機和隊列的綁定關系
*/
// 1.交換機
@Bean("exchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
}
// 2.隊列
@Bean("Queue")
public Queue bootQueue(){
return QueueBuilder.durable(queueName).build();
}
//3.交換機和隊列的綁定關系
@Bean
public Binding bootBind(@Qualifier("exchange") Exchange exchange,@Qualifier("Queue") Queue queue ){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
4.測試
/**
* @Nullable:表示可以傳入空值
*/
@SpringBootTest
public class ProviderTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
String ms="hello,spring boot-rabbitmq!";
rabbitTemplate.convertSendAndReceive(RabbitConfig.exchangeName,"boot.hello",ms);
}
}
spring boot整合rabbitmq消費者
1.導入依賴
<!-- spring boot整合rabbitmq-->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2.配置檔案
spring:
rabbitmq:
virtual-host: /
host: 127.0.0.1
port: 5672
username: root
password: 123
3.編寫配置類
/監聽隊列
@Component
public class RabbitListenQue {
// 監聽 名為 boot_quue的隊列
@RabbitListener(queues="boot_quue")
public void ListnerQue(Message message){
System.out.println("message:"+message);
}
}
4.測試
啟動項目測試,看是否列印日志
rabbitmq進階特性
消息的可靠投遞
确認模式
這裡采用spring的方式,這裡的代碼和spring整合rabbitmq差不多,隻需要改變一小部分就可以了!
xml檔案
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 配置連接配接工廠
publisher-confirms="true":開啟确認模式
-->
<rabbit:connection-factory id="connectionFactory"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
publisher-confirms="true"
/>
<!-- 定義mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />
<!--
聲明隊列
auto-declare:如果沒有該隊列就會自動建立
-->
<rabbit:queue name="que_cat" auto-declare="true" durable="true" />
<rabbit:queue name="que_pig" auto-declare="true" durable="true" />
<!-- 定義交換機綁定隊列(路由模式) -->
<rabbit:direct-exchange name="IExchange"
id="IExchange">
<rabbit:bindings>
<rabbit:binding queue="que_cat" key="que_cat_key" />
<rabbit:binding queue="que_pig" key="que_pig_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息對象json轉換類 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 定義模版 用于發送消息-->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" exchange="IExchange"
message-converter="jsonMessageConverter" />
<!-- 消息可靠性投遞-->
<rabbit:queue id="test_quue_confirm" name="test_quue_confirm" ></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_quue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
測試
package com.hjx;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-rabbitmq-send.xml")
public class Test {
// 注入 rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@org.junit.Test
public void rabbitTest(){
// 發送消息 路由模式下的發送
rabbitTemplate.convertAndSend("que_cat_key","hello world");
}
@org.junit.Test
public void confirmTest(){
// 定義回調
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
/**
*
* @param correlationData 參數的資訊
* @param ack 交換機是否成功收到資訊 true:代表成功 false:代表失敗
* @param cause 失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause){
if(ack){
System.out.println("接收消息成功");
System.out.println("confirm方法被執行");
}else{
System.out.println("接收消息fail");
}
}
});
// 發送消息
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","确認消息的日志");
}
}
注意點:
一定要在xml中将确認開啟
回退模式
将消息發送給交換機後,交換機發給隊列失敗,此時消息會丢棄或回退給發送方。
1.xml配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 配置連接配接工廠
publisher-confirms="true":開啟确認模式
publisher-returns="true":開啟回退模式
-->
<rabbit:connection-factory id="connectionFactory"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
publisher-confirms="true"
publisher-returns="true"
/>
<!-- 定義mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />
<!--
聲明隊列
auto-declare:如果沒有該隊列就會自動建立
-->
<rabbit:queue name="que_cat" auto-declare="true" durable="true" />
<rabbit:queue name="que_pig" auto-declare="true" durable="true" />
<!-- 定義交換機綁定隊列(路由模式) -->
<rabbit:direct-exchange name="IExchange"
id="IExchange">
<rabbit:bindings>
<rabbit:binding queue="que_cat" key="que_cat_key" />
<rabbit:binding queue="que_pig" key="que_pig_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息對象json轉換類 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 定義模版 用于發送消息-->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" exchange="IExchange"
message-converter="jsonMessageConverter" />
<!-- 消息可靠性投遞-->
<rabbit:queue id="test_quue_confirm" name="test_quue_confirm" ></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_quue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
2.測試
// 回退模式的測試
@org.junit.Test
public void BackTest(){
// 設定交換機處理失敗消息的模式
rabbitTemplate.setMandatory(true); // 這樣交換機就會将發送失敗的消息傳回給發送方
rabbitTemplate.setReturnCallback(
new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingKey) {
System.out.println("回退的消息為:"+message);
System.out.println("replycode:"+replycode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("消息回退執行了");
}
}
);
}
注意點
xml中一定要開啟回退模式
consumer ack
注意:系統預設采用自動簽收的方式,下面這裡是手動簽收。
xml檔案
關鍵部分
<!--
定義消費者監聽隊列
acknowledge="manual":代表手動簽收
-->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" >
<rabbit:listener ref="catHandler" queues="que_cat" />
<rabbit:listener ref="pigHandler" queues="que_pig" />
</rabbit:listener-container>
監聽器
package com.hjx.monitor;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
/**
* 預設為自動簽收
* 1.這裡需要設定為手動簽收 acknowledge="manual"
* 2.讓監聽器實作ChannelAwareMessageListener接口(是MessageListener的子接口)
* 3.如果消息處理成功,則調用channel的basicAck()簽收,否者調用channel的basicNack()拒絕簽收,讓broker重新發送消息
*
*
*
*/
public class CatHandler implements ChannelAwareMessageListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//擷取消息的tag(标簽)
long deliveryTag=message.getMessageProperties().getDeliveryTag();
System.out.println(new String(message.getBody()));
// 處理業務邏輯
System.out.println("處理業務邏輯");
try {
channel.basicAck(deliveryTag,true);
}catch (Exception e){
//拒絕簽收 第3個參數:true代表消息重新發送
channel.basicNack(deliveryTag,true,true);
}
}
}
注意點:
1.xml檔案中的rabbit:listener-container一定要設定acknowledge=“manual”:代表手動簽收。
2.監聽器需要實作ChannelAwareMessageListener接口(MessageListener接口的子接口)onMessage方法
消費端限流
注意:限流與消費端有關
需滿足以下兩個條件
1.手動确認 ( 隻有當手動确認後才能拉去隊列中的下一條消息)
2.配置屬性(xml中進行配置)rabbit:listener-container節點中進行配置 perfetch屬性
prefetch=n:表示消費端每次從隊列中拉取n條消息
操作步驟
/**
*
* 限流
* 需要滿足以下幾個條件:
* 1.手動确認 ( 隻有當手動确認後才能拉去隊列中的下一條消息)
* 2.配置屬性(xml中進行配置)rabbit:listener-container節點中進行配置 perfetch屬性
* prefetch=n:表示消費端每次從隊列中拉取n條消息
*
*/
public class CurrentLimit implements ChannelAwareMessageListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 擷取消息
System.out.println(new String(message.getBody()));
// 處理業務邏輯
// 手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
監聽器
TTL
圖形化界面操作步驟
1.使用圖形化控制台添加一個隊列并設定隊列存活時間
2.建立一個交換機
3.交換機綁定隊列
4.釋出消息
代碼操作步驟
1.聲明一個隊列并設定隊列的過期時間
<!-- 用于TTL
聲明隊列
-->
<rabbit:queue name="test_quue_ttl" auto-declare="true" durable="true">
<rabbit:queue-arguments>
<!-- 設定隊列的過期時間
注意:這裡需要使用value-type設定參數的類型,因為預設是字元串,而我們想要的是數值類型
-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
2.聲明交換機并綁定隊列
<!-- 聲明交換機,用于TTL-->
<rabbit:topic-exchange name="test_exchange_ttl">
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_quue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
3.測試
// TTL測試
@org.junit.Test
public void TTL_test(){
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hh","TTL的測試-helloworld!!");
}
單獨設定消息的過期時間
注意:如果隊列的時間和消息的時間都設定話,則預設以時間短的為準!!
消息過期後,隻有消息在隊列頂端,才會判斷其是否過期(如果過期,則将該消息移除掉),而不是說消息的過期時間到了,該消息就會被移除。是以一般都設定隊列的過期時間
// 單獨設定消息的過期時間
@org.junit.Test
public void setMessageDiedtime(){
// 消息後處理器 可以設定一些參數
MessagePostProcessor postProcessor=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設定5秒後過期
message.getMessageProperties().setExpiration("5000");
return message;
}
};
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hh","TTL的測試-helloworld!!", postProcessor);
}
死信隊列
如果你的消息和死信作了一個綁定,那麼當你的消息被銷毀了後,該消息會被發送給死信交換機,進而死xingh