天天看點

Rabbitmq基礎知識Rabbitmq基礎知識

Rabbitmq基礎知識

文章目錄

  • Rabbitmq基礎知識
    • mq的基本概念
    • mq的優勢和劣勢
    • mq的劣勢
    • 常見的mq的産品
    • RabbitMq簡介
    • 安裝
    • 簡單隊列的代碼示範
      • provider
      • consumer
    • 常見的幾種消息政策
    • 交換機類型
    • 工作模式
    • 訂閱模式
    • Routing(路由)
    • Topics工作模式(通配符)
    • spring整合rabbitmq(生産者)
    • spring整合rabbitmq(消費者)
    • spring boot整合rabbitmq生産者
    • spring boot整合rabbitmq消費者
    • rabbitmq進階特性
      • 消息的可靠投遞
        • 确認模式
        • 回退模式
      • consumer ack
      • 消費端限流
      • TTL
      • 單獨設定消息的過期時間
      • 死信隊列
    • rabbitmq的應用
    • rabbitmq的叢集搭建

黑馬

看到第26集

mq的基本概念

Rabbitmq基礎知識Rabbitmq基礎知識

mq的優勢和劣勢

Rabbitmq基礎知識Rabbitmq基礎知識

應用解耦

Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識

異步提速

Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識

消峰填谷

Rabbitmq基礎知識Rabbitmq基礎知識

消峰填谷

Rabbitmq基礎知識Rabbitmq基礎知識

mq的劣勢

Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識

常見的mq的産品

Rabbitmq基礎知識Rabbitmq基礎知識

RabbitMq簡介

Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識

JMS

Rabbitmq基礎知識Rabbitmq基礎知識

安裝

進入到rabbitmq的sbin目錄,輕按兩下rabbitmq-server.bat,如果出現如下頁面則代表啟動成功

Rabbitmq基礎知識Rabbitmq基礎知識

進入管理控制台

在浏覽器位址欄中輸入:http://localhost:15672,看到如下頁面則代表啟動成功

初始化:使用者名和密碼均為guest

Rabbitmq基礎知識Rabbitmq基礎知識

簡單隊列的代碼示範

provider

結合下面這張圖寫代碼

Rabbitmq基礎知識Rabbitmq基礎知識
//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		Channel channel=connection.createChannel();
//		5.建立隊列
		/*
			durable:持久的
			exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
			autoDelete:是否自動删除(當沒有消費者時,是否自動删除)

			queueDeclare(String queue, boolean durable, boolean exclusive
			, boolean autoDelete, Map<String, Object> arguments)
		 */
//		如果沒有hjx_quene的隊列則會建立該隊列,否則不會進行建立
		channel.queueDeclare("hjx_queue",true,false,false,null);
//		6.發送消息
		/*
		1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
		2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
		3.props:配置資訊
		4.發送的内容

		basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		 */
		String message="hello world,歡迎跟着黑馬大神一起學習rabbitmq";
		channel.basicPublish("","hjx_quene",null,message.getBytes());
//		釋放資源
		channel.close();
		connection.close();
           

consumer

//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		Channel channel=connection.createChannel();
//		靜态内部類
		Consumer consumer=new DefaultConsumer(channel){
//			這是一個回調方法 當收到消息後會自動執行該方法

			/**
			 *
			 * @param consumerTag  消息唯一辨別
			 * @param envelope     擷取一些資訊 交換機、路由key
			 * @param properties   配置資訊
			 * @param body         擷取到的資料
			 * @throws IOException
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				System.out.println("consumerTag:"+consumerTag);
				System.out.println("envelope:"+envelope.getExchange());
				System.out.println("properties:"+properties);
				System.out.println("body:"+new String(body));
			}
		};
		channel.basicConsume("hjx_queue",true,consumer);
//		注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
	}
           

常見的幾種消息政策

Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識

交換機類型

type:交換機類型
		 *        DIRECT("direct"):定向
		 *        FANOUT("fanout")  扇形(也就是廣播,發送到每個隊列)
		 *        TOPIC("topic"),   通配符的方式
		 *        HEADERS("headers");  通過參數比對 (很少用)
           

工作模式

Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識
這裡為了測試,需要建立2個consumer,然後啟動consumer和provider。當consumer監聽到有消息時就會将對應的消息列印出來。代碼如下:

provider

public void sendMs() throws IOException, TimeoutException {
//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		Channel channel=connection.createChannel();
//		5.建立隊列
		/*
			durable:持久的
			exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
			autoDelete:是否自動删除(當沒有消費者時,是否自動删除)

			queueDeclare(String queue, boolean durable, boolean exclusive
			, boolean autoDelete, Map<String, Object> arguments)
		 */
//		如果沒有hjx_quene的隊列則會建立該隊列,否則不會進行建立
		channel.queueDeclare("hjx_queue",true,false,false,null);
//		6.發送消息
		/*
		1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
		2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
		3.props:配置資訊
		4.發送的内容

		basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		 */
//		這裡為了測試友善,發送10條消息,因為時工作模式下的消息隊列嘛
		for(int i=0;i<10;i++){
			String message="hello world,歡迎跟着黑馬大神一起學習rabbitmq"+(i+1);
			channel.basicPublish("","hjx_quene",null,message.getBytes());
		}

//		釋放資源
		channel.close();
		connection.close();
	}
           

consumer

注意:這裡的consumer,建立2個内容一模一樣的就可以了。隻要檔案名不一樣就行。
public void Test1() throws IOException, TimeoutException {
		//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		Channel channel=connection.createChannel();
//		靜态内部類
		Consumer consumer=new DefaultConsumer(channel){
//			這是一個回調方法 當收到消息後會自動執行該方法

			/**
			 *
			 * @param consumerTag  消息唯一辨別
			 * @param envelope     擷取一些資訊 交換機、路由key
			 * @param properties   配置資訊
			 * @param body         擷取到的資料
			 * @throws IOException
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				/*System.out.println("consumerTag:"+consumerTag);
				System.out.println("envelope:"+envelope.getExchange());
				System.out.println("properties:"+properties);*/
				System.out.println("body:"+new String(body));
			}
		};
		channel.basicConsume("hjx_queue",true,consumer);
//		注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
	}
           
Rabbitmq基礎知識Rabbitmq基礎知識

訂閱模式

Rabbitmq基礎知識Rabbitmq基礎知識

生産者

@Test
	public void test() throws IOException, TimeoutException {
		//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		/**
		 * exchange:交換機名稱
		 * type:交換機類型
		 *        DIRECT("direct"):定向
		 *        FANOUT("fanout")  扇形(也就是廣播,發送到每個隊列)
		 *        TOPIC("topic"),   通配符的方式
		 *        HEADERS("headers");  通過參數比對
		 * durable:是否持久化
		 * autoDelete:是否自動删除
		 * internal:内部使用。一般為false
		 * arguments:參數
		 *
		 * exchangeDeclare(String exchange, BuiltinExchangeType type,
		 * boolean durable, boolean autoDelete, Map<String, Object> arguments)
		 *
		 *
		 */
		Channel channel=connection.createChannel();
		String exchangeName="fanout_exchange";
		channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);

		String quueName1="quue_1";
		String quueName2="quue_2";
		/*
			durable:持久的
			exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
			autoDelete:是否自動删除(當沒有消費者時,是否自動删除)

			queueDeclare(String queue, boolean durable, boolean exclusive
			boolean autoDelete, Map<String, Object> arguments)
		 */
		//		聲明隊列,這裡需要聲明多個隊列
		channel.queueDeclare(quueName1,true,false,false,null);
		channel.queueDeclare(quueName2,true,false,false,null);
		//		綁定隊列和交換機的關系
		/**
		 *  queue:隊列的名稱
		 *  exchange:交換機的名稱
		 *  routingKey:路由key  如果交換機的類型為fanout,則routingKey=""
		 *
		 * queueBind(String queue, String exchange, String routingKey)
		 */
		channel.queueBind(quueName1,exchangeName,"");
		channel.queueBind(quueName2,exchangeName,"");
		//		發送消息
		/*
		1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
		2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
		3.props:配置資訊
		4.發送的内容

		basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		 */
		String message="日志資訊";
		//這樣交換機就會将消息轉發給對應的2個隊列
		channel.basicPublish(exchangeName,"",null,message.getBytes());
		//		釋放資源
		channel.close();
		connection.close();

	}
           

消費者

consumer1

@Test
	public void test() throws IOException, TimeoutException {
		//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		/**
		 * exchange:交換機名稱
		 * type:交換機類型
		 *        DIRECT("direct"):定向
		 *        FANOUT("fanout")  扇形(也就是廣播,發送到每個隊列)
		 *        TOPIC("topic"),   通配符的方式
		 *        HEADERS("headers");  通過參數比對
		 * durable:是否持久化
		 * autoDelete:是否自動删除
		 * internal:内部使用。一般為false
		 * arguments:參數
		 *
		 * exchangeDeclare(String exchange, BuiltinExchangeType type,
		 * boolean durable, boolean autoDelete, Map<String, Object> arguments)
		 *
		 *
		 */
		Channel channel=connection.createChannel();
		String exchangeName="fanout_exchange";
		channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);

		String quueName1="quue_1";
		String quueName2="quue_2";
		/*
			durable:持久的
			exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
			autoDelete:是否自動删除(當沒有消費者時,是否自動删除)

			queueDeclare(String queue, boolean durable, boolean exclusive
			boolean autoDelete, Map<String, Object> arguments)
		 */
		//		聲明隊列,這裡需要聲明多個隊列
		channel.queueDeclare(quueName1,true,false,false,null);
		channel.queueDeclare(quueName2,true,false,false,null);
		//		綁定隊列和交換機的關系
		/**
		 *  queue:隊列的名稱
		 *  exchange:交換機的名稱
		 *  routingKey:路由key  如果交換機的類型為fanout,則routingKey=""
		 *
		 * queueBind(String queue, String exchange, String routingKey)
		 */
		channel.queueBind(quueName1,exchangeName,"");
		channel.queueBind(quueName2,exchangeName,"");
		//		發送消息
		/*
		1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
		2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
		3.props:配置資訊
		4.發送的内容

		basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		 */
		String message="日志資訊";
		//這樣交換機就會将消息轉發給對應的2個隊列
		channel.basicPublish(exchangeName,"",null,message.getBytes());
		//		釋放資源
		channel.close();
		connection.close();

	}
           

consumer2

@Test
	public void Test() throws IOException, TimeoutException {
			//		1.建立連接配接工廠
			ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
			factory.setHost("127.0.0.1");
			factory.setPort(5672);
			factory.setVirtualHost("/");
			factory.setUsername("hjx");
			factory.setPassword("123456");
//		3.建立連接配接
			Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
			Channel channel=connection.createChannel();
//		隊列的名字
			String quueName1="quue_1";
			String quueName2="quue_2";
//		靜态内部類
			Consumer consumer=new DefaultConsumer(channel){
//			這是一個回調方法 當收到消息後會自動執行該方法

				/**
				 *
				 * @param consumerTag  消息唯一辨別
				 * @param envelope     擷取一些資訊 交換機、路由key
				 * @param properties   配置資訊
				 * @param body         擷取到的資料
				 * @throws IOException
				 */
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
					System.out.println(new String(body));
					System.out.println("将日志資訊儲存到資料庫.....");
				}
			};
			channel.basicConsume(quueName2,true,consumer);
//		注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
		}
           

Routing(路由)

發送的消息的routingkey如果和指定隊列的routingkey相同,則交換機就把消息轉發給對應的隊列。
Rabbitmq基礎知識Rabbitmq基礎知識

provider

public void test() throws IOException, TimeoutException {
		//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		/**
		 * exchange:交換機名稱
		 * type:交換機類型
		 *        DIRECT("direct"):定向
		 *        FANOUT("fanout")  扇形(也就是廣播,發送到每個隊列)
		 *        TOPIC("topic"),   通配符的方式
		 *        HEADERS("headers");  通過參數比對
		 * durable:是否持久化
		 * autoDelete:是否自動删除
		 * internal:内部使用。一般為false
		 * arguments:參數
		 *
		 * exchangeDeclare(String exchange, BuiltinExchangeType type,
		 * boolean durable, boolean autoDelete, Map<String, Object> arguments)
		 *
		 *
		 */
		Channel channel=connection.createChannel();
		String exchangeName="direct_exchange";
		channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

		String quueName1="quue_1";
		String quueName2="quue_2";
		/*
			durable:持久的
			exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
			autoDelete:是否自動删除(當沒有消費者時,是否自動删除)

			queueDeclare(String queue, boolean durable, boolean exclusive
			boolean autoDelete, Map<String, Object> arguments)
		 */
		//		聲明隊列,這裡需要聲明多個隊列
		channel.queueDeclare(quueName1,true,false,false,null);
		channel.queueDeclare(quueName2,true,false,false,null);
		//		綁定隊列和交換機的關系
		/**
		 *  queue:隊列的名稱
		 *  exchange:交換機的名稱
		 *  routingKey:路由key  如果交換機的類型為fanout,則routingKey=""
		 *
		 * queueBind(String queue, String exchange, String routingKey)
		 */
		// 隊列1的綁定
		channel.queueBind(quueName1,exchangeName,"error");
		//  隊列2的綁定
		channel.queueBind(quueName2,exchangeName,"info");
		channel.queueBind(quueName2,exchangeName,"warning");
		channel.queueBind(quueName2,exchangeName,"error");
		//		發送消息
		/*
		1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
		2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
		3.props:配置資訊
		4.發送的内容

		basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		 */
		String message="日志資訊";
		//這樣交換機就會将消息轉發給對應的2個隊列   注意:這裡的第2個參數info為消息的路由key,它會和隊列的路由key進行比對
		channel.basicPublish(exchangeName,"info",null,message.getBytes());
		//		釋放資源
		channel.close();
		connection.close();

	}
           

consumer1

public void Test() throws IOException, TimeoutException {
		//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		Channel channel=connection.createChannel();
//		隊列的名字
		String quueName1="quue_1";
		String quueName2="quue_2";
//		靜态内部類
		Consumer consumer=new DefaultConsumer(channel){
//			這是一個回調方法 當收到消息後會自動執行該方法

			/**
			 *
			 * @param consumerTag  消息唯一辨別
			 * @param envelope     擷取一些資訊 交換機、路由key
			 * @param properties   配置資訊
			 * @param body         擷取到的資料
			 * @throws IOException
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				System.out.println(new String(body));
				System.out.println("将日志消息列印到控制台.....");
			}
		};
		channel.basicConsume(quueName1,true,consumer);
//		注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
	}
           
注意:另外一個consumer2和上面這裡的consumer1的代碼一樣(隻是将要接收的隊列名字不一樣就行了)

Topics工作模式(通配符)

通過通配符将消息轉發到指定隊列

注意:這裡的通配符: *:比對1個單詞 #:0個或多個單詞

Rabbitmq基礎知識Rabbitmq基礎知識

provider

public void test() throws IOException, TimeoutException {
		//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		/**
		 * exchange:交換機名稱
		 * type:交換機類型
		 *        DIRECT("direct"):定向
		 *        FANOUT("fanout")  扇形(也就是廣播,發送到每個隊列)
		 *        TOPIC("topic"),   通配符的方式
		 *        HEADERS("headers");  通過參數比對
		 * durable:是否持久化
		 * autoDelete:是否自動删除
		 * internal:内部使用。一般為false
		 * arguments:參數
		 *
		 * exchangeDeclare(String exchange, BuiltinExchangeType type,
		 * boolean durable, boolean autoDelete, Map<String, Object> arguments)
		 *
		 *
		 */
		Channel channel=connection.createChannel();
		String exchangeName="topic_exchange";
		channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

		String quueName1="quue_1";
		String quueName2="quue_2";
		/*
			durable:持久的
			exclusive:1.該隊列是否隻能有一個消費者對其進行監聽 2.當連接配接關閉時是否删除隊列
			autoDelete:是否自動删除(當沒有消費者時,是否自動删除)

			queueDeclare(String queue, boolean durable, boolean exclusive
			boolean autoDelete, Map<String, Object> arguments)
		 */
		//		聲明隊列,這裡需要聲明多個隊列
		channel.queueDeclare(quueName1,true,false,false,null);
		channel.queueDeclare(quueName2,true,false,false,null);
		//		綁定隊列和交換機的關系
		/**
		 *  queue:隊列的名稱
		 *  exchange:交換機的名稱
		 *  routingKey:路由key  如果交換機的類型為fanout,則routingKey=""
		 *
		 * queueBind(String queue, String exchange, String routingKey)
		 */
		// 隊列1的綁定
		/**
		 * routingkey:系統名稱.日志級别
		 * 需求:将error級别的日志存入資料庫,所有order系統的日志也存入資料庫
		 *
		 */
		channel.queueBind(quueName1,exchangeName,"#.error");
		//  隊列2的綁定
		channel.queueBind(quueName1,exchangeName,"order.*");
		channel.queueBind(quueName2,exchangeName,"*.*");
		//		發送消息
		/*
		1.exchange:交換機名稱 (簡單模式下使用預設的交換機,即設定為空字元串即可。
		2.routingKey:路由鍵,隻要routingKey和隊列名稱一樣,該消息就會發送到該隊列中
		3.props:配置資訊
		4.發送的内容

		basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		 */
		String message="日志資訊";
		//這樣交換機就會将消息轉發給對應的2個隊列   注意:這裡的第2個參數info為消息的路由key,它會和隊列的路由key進行比對
		channel.basicPublish(exchangeName,"order.info",null,message.getBytes());
		//		釋放資源
		channel.close();
		connection.close();

	}
           

consumer

public void Test() throws IOException, TimeoutException {
		//		1.建立連接配接工廠
		ConnectionFactory factory=new ConnectionFactory();
//		2.設定參數
		factory.setHost("127.0.0.1");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setUsername("hjx");
		factory.setPassword("123456");
//		3.建立連接配接
		Connection connection =factory.newConnection();
//		4.建立channel
		/*
		1.quene:隊列名稱
		2.autoAck:是否确認(消費者收到消息了後自動給隊列回複收到了)
		3.callback:回調函數

		basicConsume(String queue, boolean autoAck, Consumer callback)
		 */
		Channel channel=connection.createChannel();
//		隊列的名字
		String quueName1="quue_1";
		String quueName2="quue_2";
//		靜态内部類
		Consumer consumer=new DefaultConsumer(channel){
//			這是一個回調方法 當收到消息後會自動執行該方法

			/**
			 *
			 * @param consumerTag  消息唯一辨別
			 * @param envelope     擷取一些資訊 交換機、路由key
			 * @param properties   配置資訊
			 * @param body         擷取到的資料
			 * @throws IOException
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				System.out.println(new String(body));
				System.out.println("将日志消息列印到控制台.....");
			}
		};
		channel.basicConsume(quueName1,true,consumer);
//		注意:消費者不要關閉資源,如果資源被關閉了,它就無法對隊列中的消息進行監聽了
	}
           
注意:另外一個consume和上面這個一樣,隻是擷取消息的隊列的名字不一樣而已

spring整合rabbitmq(生産者)

1.導入依賴

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
           

2.application配置檔案

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/

           

3.配置xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

    <!-- 配置連接配接工廠 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${spring.rabbitmq.host}" port="${spring.rabbitmq.port}" username="${spring.rabbitmq.username}" password="${spring.rabbitmq.password}" />

    <!-- 定義mq管理 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--
            聲明隊列
            auto-declare:如果沒有該隊列就會自動建立
    -->
    <rabbit:queue name="que_cat" auto-declare="true" durable="true" />
    <rabbit:queue name="que_pig" auto-declare="true" durable="true" />

    <!-- 定義交換機綁定隊列(路由模式) -->
    <rabbit:direct-exchange name="IExchange"
                            id="IExchange">
        <rabbit:bindings>
            <rabbit:binding queue="que_cat" key="que_cat_key" />
            <rabbit:binding queue="que_pig" key="que_pig_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息對象json轉換類 -->
    <bean id="jsonMessageConverter"
          class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- 定義模版  用于發送消息-->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory" exchange="IExchange"
                     message-converter="jsonMessageConverter" />

</beans>
           

4.測試

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-rabbitmq-send.xml")
public class Test {
//	注入 rabbitTemplate
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@org.junit.Test
	public void rabbitTest(){
//		發送消息  路由模式下的發送
		rabbitTemplate.convertAndSend("que_cat_key","hello world");
	}
}

           

spring整合rabbitmq(消費者)

1.依賴

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
           

2.xml配置

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 配置連接配接工廠 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${spring.rabbitmq.host}" port="${spring.rabbitmq.port}" username="${spring.rabbitmq.username}" password="${spring.rabbitmq.password}" />

    <!-- 定義mq管理 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 聲明隊列 -->
    <rabbit:queue name="que_cat" auto-declare="true" durable="true" />
    <rabbit:queue name="que_pig" auto-declare="true" durable="true" />

    <!-- 定義消費者
        CatHandler 和 PigHandler 是一個監聽者
    -->
    <bean name="catHandler" class="com.hjx.monitor.CatHandler" />
    <bean name="pigHandler" class="com.hjx.monitor.PigHandler" />

    <!-- 定義消費者監聽隊列 -->
    <rabbit:listener-container
            connection-factory="connectionFactory">
        <rabbit:listener ref="catHandler" queues="que_cat" />
        <rabbit:listener ref="pigHandler" queues="que_pig" />
    </rabbit:listener-container>

</beans>
           

3.properties配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/

           

4.編寫監聽器實作MessageListener接口實作onMessage方法

package com.hjx.monitor;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CatHandler implements MessageListener {

	private static final ObjectMapper MAPPER = new ObjectMapper();

	public void onMessage(Message msg) {

		try {
			//msg就是rabbitmq傳來的消息
			// 使用jackson解析
			// msg.getBody():消息内容
			JsonNode jsonData = MAPPER.readTree(msg.getBody());
			System.out.println("我是可愛的小貓,我的id是" + jsonData.get("id").asText()
					+ ",我的名字是" + jsonData.get("name").asText());

		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}
           

spring boot整合rabbitmq生産者

1.導入依賴

<dependencies>
    <!--        spring boot整合rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
           

2.編寫配置檔案

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: 123
    virtual-host: /

           

3.編寫配置類

package com.hjx.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
	//交換機的名稱
	public static final String exchangeName="boot_topic";
	//隊列的名稱
	public static final String queueName="boot_quue";
	/**
	 * 1.交換機
	 * 2.隊列
	 * 3.交換機和隊列的綁定關系
	 */

//	1.交換機
	@Bean("exchange")
	public Exchange bootExchange(){
		return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
	}
//	2.隊列
	@Bean("Queue")
	public Queue bootQueue(){
		return QueueBuilder.durable(queueName).build();
	}

//3.交換機和隊列的綁定關系
	@Bean
	public Binding bootBind(@Qualifier("exchange") Exchange exchange,@Qualifier("Queue") Queue queue ){
		return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
	}

}

           

4.測試

/**
 * @Nullable:表示可以傳入空值
 */
@SpringBootTest
public class ProviderTest {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Test
	public void testSend(){
		String ms="hello,spring boot-rabbitmq!";
		rabbitTemplate.convertSendAndReceive(RabbitConfig.exchangeName,"boot.hello",ms);
	}
}
           

spring boot整合rabbitmq消費者

1.導入依賴

<!--        spring boot整合rabbitmq-->
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
           

2.配置檔案

spring:
  rabbitmq:
    virtual-host: /
    host: 127.0.0.1
    port: 5672
    username: root
    password: 123
           

3.編寫配置類

/監聽隊列
@Component
public class RabbitListenQue {
	// 監聽 名為 boot_quue的隊列
	@RabbitListener(queues="boot_quue")
	public void ListnerQue(Message message){
		System.out.println("message:"+message);
	}
}

           

4.測試

啟動項目測試,看是否列印日志

rabbitmq進階特性

Rabbitmq基礎知識Rabbitmq基礎知識

消息的可靠投遞

Rabbitmq基礎知識Rabbitmq基礎知識

确認模式

這裡采用spring的方式,這裡的代碼和spring整合rabbitmq差不多,隻需要改變一小部分就可以了!

xml檔案

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 配置連接配接工廠
        publisher-confirms="true":開啟确認模式
    -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${spring.rabbitmq.host}"
                               port="${spring.rabbitmq.port}"
                               username="${spring.rabbitmq.username}"
                               password="${spring.rabbitmq.password}"
                               publisher-confirms="true"
    />

    <!-- 定義mq管理 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--
            聲明隊列
            auto-declare:如果沒有該隊列就會自動建立
    -->
    <rabbit:queue name="que_cat" auto-declare="true" durable="true" />
    <rabbit:queue name="que_pig" auto-declare="true" durable="true" />


    <!-- 定義交換機綁定隊列(路由模式) -->
    <rabbit:direct-exchange name="IExchange"
                            id="IExchange">
        <rabbit:bindings>
            <rabbit:binding queue="que_cat" key="que_cat_key" />
            <rabbit:binding queue="que_pig" key="que_pig_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息對象json轉換類 -->
    <bean id="jsonMessageConverter"
          class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- 定義模版  用于發送消息-->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory" exchange="IExchange"
                     message-converter="jsonMessageConverter" />

    <!--    消息可靠性投遞-->
    <rabbit:queue id="test_quue_confirm" name="test_quue_confirm" ></rabbit:queue>
    <rabbit:direct-exchange name="test_exchange_confirm">
        <rabbit:bindings>
            <rabbit:binding queue="test_quue_confirm" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>


</beans>
           

測試

package com.hjx;

import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-rabbitmq-send.xml")
public class Test {
//	注入 rabbitTemplate
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@org.junit.Test
	public void rabbitTest(){
//		發送消息  路由模式下的發送
		rabbitTemplate.convertAndSend("que_cat_key","hello world");
	}

	@org.junit.Test
	public void confirmTest(){
//		定義回調
		rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
			/**
			 *
			 * @param correlationData  參數的資訊
			 * @param ack              交換機是否成功收到資訊 true:代表成功 false:代表失敗
			 * @param cause            失敗的原因
			 */
			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause){
				if(ack){
					System.out.println("接收消息成功");
					System.out.println("confirm方法被執行");
				}else{
					System.out.println("接收消息fail");
				}
			}
		});
//		發送消息
			rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","确認消息的日志");
	}
}

           

注意點:

一定要在xml中将确認開啟
Rabbitmq基礎知識Rabbitmq基礎知識

回退模式

Rabbitmq基礎知識Rabbitmq基礎知識
将消息發送給交換機後,交換機發給隊列失敗,此時消息會丢棄或回退給發送方。

1.xml配置

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 配置連接配接工廠
        publisher-confirms="true":開啟确認模式
        publisher-returns="true":開啟回退模式
    -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${spring.rabbitmq.host}"
                               port="${spring.rabbitmq.port}"
                               username="${spring.rabbitmq.username}"
                               password="${spring.rabbitmq.password}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />

    <!-- 定義mq管理 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--
            聲明隊列
            auto-declare:如果沒有該隊列就會自動建立
    -->
    <rabbit:queue name="que_cat" auto-declare="true" durable="true" />
    <rabbit:queue name="que_pig" auto-declare="true" durable="true" />


    <!-- 定義交換機綁定隊列(路由模式) -->
    <rabbit:direct-exchange name="IExchange"
                            id="IExchange">
        <rabbit:bindings>
            <rabbit:binding queue="que_cat" key="que_cat_key" />
            <rabbit:binding queue="que_pig" key="que_pig_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息對象json轉換類 -->
    <bean id="jsonMessageConverter"
          class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- 定義模版  用于發送消息-->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory" exchange="IExchange"
                     message-converter="jsonMessageConverter" />

    <!--    消息可靠性投遞-->
    <rabbit:queue id="test_quue_confirm" name="test_quue_confirm" ></rabbit:queue>
    <rabbit:direct-exchange name="test_exchange_confirm">
        <rabbit:bindings>
            <rabbit:binding queue="test_quue_confirm" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>


</beans>
           

2.測試

//	回退模式的測試
	@org.junit.Test
	public void BackTest(){
//		設定交換機處理失敗消息的模式
		rabbitTemplate.setMandatory(true); // 這樣交換機就會将發送失敗的消息傳回給發送方


		rabbitTemplate.setReturnCallback(
				new RabbitTemplate.ReturnCallback() {
					@Override
					public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingKey) {
						System.out.println("回退的消息為:"+message);
						System.out.println("replycode:"+replycode);
						System.out.println("replyText:"+replyText);
						System.out.println("exchange:"+exchange);
						System.out.println("routingKey:"+routingKey);

						System.out.println("消息回退執行了");
					}
				}
		);
	}
           

注意點

xml中一定要開啟回退模式

consumer ack

Rabbitmq基礎知識Rabbitmq基礎知識
注意:系統預設采用自動簽收的方式,下面這裡是手動簽收。

xml檔案

關鍵部分

<!--
         定義消費者監聽隊列
         acknowledge="manual":代表手動簽收
    -->
    <rabbit:listener-container
            connection-factory="connectionFactory" acknowledge="manual" >
        <rabbit:listener ref="catHandler" queues="que_cat" />
        <rabbit:listener ref="pigHandler" queues="que_pig" />
    </rabbit:listener-container>
           

監聽器

package com.hjx.monitor;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

/**
 *  預設為自動簽收
 *  1.這裡需要設定為手動簽收  acknowledge="manual"
 *  2.讓監聽器實作ChannelAwareMessageListener接口(是MessageListener的子接口)
 *  3.如果消息處理成功,則調用channel的basicAck()簽收,否者調用channel的basicNack()拒絕簽收,讓broker重新發送消息
 *
 *
 *
 */
public class CatHandler implements ChannelAwareMessageListener {

	private static final ObjectMapper MAPPER = new ObjectMapper();

	@Override
	public void onMessage(Message message, Channel channel) throws Exception {
		//擷取消息的tag(标簽)
		long deliveryTag=message.getMessageProperties().getDeliveryTag();
		System.out.println(new String(message.getBody()));
//		處理業務邏輯
		System.out.println("處理業務邏輯");
		try {
			channel.basicAck(deliveryTag,true);
		}catch (Exception e){
			//拒絕簽收  第3個參數:true代表消息重新發送
			channel.basicNack(deliveryTag,true,true);
		}

	}

}
           

注意點:

1.xml檔案中的rabbit:listener-container一定要設定acknowledge=“manual”:代表手動簽收。

2.監聽器需要實作ChannelAwareMessageListener接口(MessageListener接口的子接口)onMessage方法

消費端限流

注意:限流與消費端有關
Rabbitmq基礎知識Rabbitmq基礎知識

需滿足以下兩個條件

1.手動确認 ( 隻有當手動确認後才能拉去隊列中的下一條消息)

2.配置屬性(xml中進行配置)rabbit:listener-container節點中進行配置 perfetch屬性

prefetch=n:表示消費端每次從隊列中拉取n條消息

操作步驟

/**
 *
 * 限流
 * 需要滿足以下幾個條件:
 *          1.手動确認  ( 隻有當手動确認後才能拉去隊列中的下一條消息)
 *          2.配置屬性(xml中進行配置)rabbit:listener-container節點中進行配置 perfetch屬性
 *              prefetch=n:表示消費端每次從隊列中拉取n條消息
 *
 */
 public class CurrentLimit implements ChannelAwareMessageListener {

	private static final ObjectMapper MAPPER = new ObjectMapper();

	@Override
	public void onMessage(Message message, Channel channel) throws Exception {
//		擷取消息
		System.out.println(new String(message.getBody()));
//		處理業務邏輯

//		手動簽收
		channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
	}

}



           

監聽器

Rabbitmq基礎知識Rabbitmq基礎知識

TTL

Rabbitmq基礎知識Rabbitmq基礎知識
Rabbitmq基礎知識Rabbitmq基礎知識

圖形化界面操作步驟

1.使用圖形化控制台添加一個隊列并設定隊列存活時間

Rabbitmq基礎知識Rabbitmq基礎知識

2.建立一個交換機

Rabbitmq基礎知識Rabbitmq基礎知識

3.交換機綁定隊列

Rabbitmq基礎知識Rabbitmq基礎知識

4.釋出消息

Rabbitmq基礎知識Rabbitmq基礎知識

代碼操作步驟

1.聲明一個隊列并設定隊列的過期時間

<!--    用于TTL
        聲明隊列
-->
    <rabbit:queue name="test_quue_ttl" auto-declare="true" durable="true">
        <rabbit:queue-arguments>
        <!--            設定隊列的過期時間
                        注意:這裡需要使用value-type設定參數的類型,因為預設是字元串,而我們想要的是數值類型
        -->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

           

2.聲明交換機并綁定隊列

<!--    聲明交換機,用于TTL-->
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_quue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
           

3.測試

//	TTL測試
	@org.junit.Test
	public void TTL_test(){
		rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hh","TTL的測試-helloworld!!");
	}
           

單獨設定消息的過期時間

注意:如果隊列的時間和消息的時間都設定話,則預設以時間短的為準!!

消息過期後,隻有消息在隊列頂端,才會判斷其是否過期(如果過期,則将該消息移除掉),而不是說消息的過期時間到了,該消息就會被移除。是以一般都設定隊列的過期時間

//	單獨設定消息的過期時間
	@org.junit.Test
	public void setMessageDiedtime(){
		// 消息後處理器 可以設定一些參數
		MessagePostProcessor postProcessor=new MessagePostProcessor() {
			@Override
			public Message postProcessMessage(Message message) throws AmqpException {
				// 設定5秒後過期
				message.getMessageProperties().setExpiration("5000");
				return message;
			}
		};

		rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hh","TTL的測試-helloworld!!", postProcessor);
	}
           

死信隊列

如果你的消息和死信作了一個綁定,那麼當你的消息被銷毀了後,該消息會被發送給死信交換機,進而死xingh
Rabbitmq基礎知識Rabbitmq基礎知識

rabbitmq的應用

Rabbitmq基礎知識Rabbitmq基礎知識

rabbitmq的叢集搭建

Rabbitmq基礎知識Rabbitmq基礎知識