天天看點

spring-boot學習:二十、spring-boot內建RabbitMQ

由于JMS存在跨語言跨平台的缺陷,是以出現了AMQP(Advanced Message Queuing Protocol),一個提供統一消息服務的應用層标準進階消息隊列協定,代表RabbitMQ

一、下載下傳安裝RabbitMQ

  1. 安裝

    從https://www.rabbitmq.com/download.html下載下傳RabbitMQ

    由于RabbitMQ是用Erlang語言編寫的,安裝RabbitMQ需要先安裝Erlang運作平台

    http://www.erlang.org/downloads

    配置環境變量:

    ERLANG_HOME:D:\software\erl10.4 (安裝後可能已自動配置)

    path加入%ERLANG_HOME%\bin\erl.exe

    重新開機計算機

  2. 啟動RabbitMQ

    cmd視窗需要管理者權限運作,否則啟動時報錯(發生系統錯誤 5)

    spring-boot學習:二十、spring-boot內建RabbitMQ
    也可以在服務清單中啟動
  3. 安裝管理界面management ui

    參考:https://www.rabbitmq.com/management.html

    營運指令:rabbitmq-plugins enable rabbitmq_management

  4. 通路http://127.0.0.1:15672/
    spring-boot學習:二十、spring-boot內建RabbitMQ
  5. 配置使用者權限

    1) 通過指令rabbitmqctl.bat list_users檢視使用者

    spring-boot學習:二十、spring-boot內建RabbitMQ

    2)添加使用者

    rabbitmqctl.bat add_user root root

    spring-boot學習:二十、spring-boot內建RabbitMQ

    3)設定角色

    rabbitmqctl.bat set_user_tags root administrator

4)設定權限

rabbitmqctl.bat set_permissions -p / root “." ".” “.*”

5)使用賬号登入

spring-boot學習:二十、spring-boot內建RabbitMQ

6)删除使用者

rabbitmqctl delete_user username

7) 修改改密碼

rabbimqctl change_password username newpassword

二、RabbitMQ詳解

參考:

https://www.rabbitmq.com/getstarted.html

https://www.cnblogs.com/dongkuo/p/6001791.html 依托于官方文檔詳細解釋

https://www.jianshu.com/p/80eefec808e5

0. 概念

消息隊列服務一般由生産者、消息隊列和消費者組成,RabbitMQ加入了交換機 (Exchange)的概念,這樣生産者和隊列就沒有直接聯系, 轉而變成生産者把消息給交換器, 交換器根據排程政策再把消息再給隊列。

1)Server(Broker):接收用戶端連接配接,實作AMQP協定的消息隊列和路由功能的程序;

2)Virtual Host:虛拟主機的概念,類似權限控制組,一個Virtual Host裡可以有多個Exchange和Queue;

3)Exchange:交換機,接收生産者發送的消息,并根據Routing Key将消息路由到伺服器中的隊列Queue。

4)ExchangeType:交換機類型決定了路由消息行為,RabbitMQ中有四種類型Exchange,分别是fanout、direct、topic、headers

5)Message Queue:消息隊列,用于存儲還未被消費者消費的消息;

6)Message:由Header和body組成,Header是由生産者添加的各種屬性的集合,包括Message是否被持久化、優先級是多少、由哪個Message Queue接收等;body是真正需要發送的資料内容;

7)BindingKey:綁定關鍵字,将一個特定的Exchange和一個特定的Queue綁定起來。

1. 最簡單的消息發送Hello World

spring-boot學習:二十、spring-boot內建RabbitMQ

功能:一個生産者P發送消息到隊列Q,一個消費者C接收

生産者:

通過連接配接工廠ConnectionFactory建立連接配接connection,使用連接配接建立通道channel, channel聲明隊列queue,channel使用queue發送消息

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel());
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
           

消費者:跟生産者一樣,建立連接配接,建立channel,聲明queue,然後監聽queue

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
           

2. 工作隊列work queues

spring-boot學習:二十、spring-boot內建RabbitMQ

功能:一個生産者(Boss)向隊列發消息(任務),多個消費者(worker)從隊列接受消息(任務)

特點:

1)一條消息隻會被一個消費者接收;

2)消息是平均配置設定給消費者的;

3)消費者隻有在處理完某條消息後,才會收到下一條消息。

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

預設情況下,RabbitMQ将按順序向消費者發送消息,平均每個消費者收到相同數量的消息,這種分發消息叫做輪詢。可以通過三個以上的worker(消費者)來進行測試。

由于worker執行任務需要一定時間,為了確定消息不丢失,RabbitMQ 支援

1)消息确認機制

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");
  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);//可使用sleep模拟
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
           

2)消息持久化

// 将第二個參數設為true,表示聲明一個需要持久化的隊列。
// 需要注意的是,若你已經定義了一個非持久的,同名字的隊列,要麼将其先删除(不然會報錯),要麼換一個名字。
channel.queueDeclare("hello", true, false, false, null);

// 修改了第三個參數,這是表明消息需要持久化
channel.basicPublish("",  "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
           

3. 釋出/訂閱Publish/Subscribe

spring-boot學習:二十、spring-boot內建RabbitMQ

生産者是把消息發送到了交換機(exchange)中,然後交換機負責(決定)将消息發送到(哪一個)消息隊列中。

前面的兩個案例其實是經過了預設交換機(Default Exchange,用空字元串表示),每一個被建立的隊列都會被自動的綁定到預設交換機上,并且路由鍵就是隊列的名字。路由鍵用來指定交換機将消息發到指定的隊列。

交換機有4種不同的類型,分别是direct,fanout,topic,headers:

direct:要求和它綁定的隊列帶有一個路由鍵K,若有一個帶有路由鍵R的消息到達了交換機,交換機會将此消息路由到路由鍵K = R的隊列。預設交換機便是該類型。

fanout:會路由每一條消息到所有和它綁定的隊列,忽略路由鍵。即廣播形式。

4. Routing模式

當有些消息隻能部分消費者消費時,可使用Routing模式;隊列綁定交換機,同時帶上routing key,以多次調用隊列綁定方法,調用時,隊列名和交換機名都相同,而routing key不同,這樣可以使一個隊列帶有多個routing key。

5. Topic

通配符比對消息發送隊列,routing key由多個關鍵詞組成,詞與詞之間由點号(.)隔開,規定*表示任意的一個詞,#号表示任意的0個或多個詞。

6. RPC

參考文檔

三、springboot rabbitmq詳解

參考:

https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#sending-messages

https://www.cnblogs.com/ityouknow/p/6120544.html

1.pom.xml

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

2. application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
           

3. 注冊Queue,RabbitConfig.java

@Configuration
public class RabbitConfig {
	@Bean
	public Queue helloQueue() {
		return new Queue("hello");
	}
}
           

4. 生産者,RabbitMQProducer.java

@Component
public class RabbitMQProducer {
	@Autowired
	private AmqpTemplate amqpTemplate;

	public void sendMsg(String routingKey, String msg) {
		amqpTemplate.convertAndSend(routingKey, msg);
	}
}
           

5. 測試

@Component
@OpenAPI
public class ApiTest {
	@Autowired
	private RabbitMQProducer rabbitMQProducer;
	
	@OpenAPIMethod(methodName = "testRabbitMQSender")
	public Object testRabbitMQSender(final String msg) throws Exception {
		rabbitMQProducer.sendMsg("hello",msg);
		return null;
	}
}
           

發送消息:http://10.0.0.57:9001/api/testRabbitMQSender?msg=Hello%20World

檢視控制台,建立了一個叫hello的queue,積壓了1條消息

spring-boot學習:二十、spring-boot內建RabbitMQ

消息内容:

spring-boot學習:二十、spring-boot內建RabbitMQ

6. 消費者

@Component
public class RabbitMQConsumer {
	@RabbitListener(queues = "hello")
	public void consumerHelloQueueMessage(String message){
        System.out.println("收到hello-queue封包:"+message);
    }
}
           

啟動消費者後,會立即收到積壓的消息

收到hello-queue封包:Hello World

7. 測試多個消費者監聽同一個通道,如1個hello queue,3個消費者

發送9條消息,列印日志日下:

收到hello-queue封包:Hello World
收到hello-queue3封包:Hello World
收到hello-queue2封包:Hello World
收到hello-queue封包:Hello World
收到hello-queue封包:Hello World
收到hello-queue2封包:Hello World
收到hello-queue3封包:Hello World
收到hello-queue3封包:Hello World
收到hello-queue2封包:Hello World
           

結果表示RabbitMQ會輪詢發送消息給消費者,一條消息隻能被一個消費者接收

8. Direct Exchange

@Configuration
public class RabbitConfig {

	@Bean
	public Queue helloQueue() {
		return new Queue("hello");
	}
	
	@Bean
	public DirectExchange helloExchange() {
		return new DirectExchange("helloExchange");
	}
	
	@Bean
	public Binding helloBinding() {
		return BindingBuilder.bind(helloQueue()).to(helloExchange()).with("helloDirect");
	}
}
           

注冊了一個叫helloExchange的Direct Exchange

spring-boot學習:二十、spring-boot內建RabbitMQ

檢視helloExchange 詳情:direct類型,綁定了hello queue

spring-boot學習:二十、spring-boot內建RabbitMQ

檢視hello queue,除了綁定預設exchange還綁定了helloExchange

spring-boot學習:二十、spring-boot內建RabbitMQ
  • 測試demo1

    一個生産者,一個helloExchage,一個叫hello的queue,一個路由helloDirect,四個消費者(三個監聽叫hello的queue,一個監聽路由helloDirect對應hello queue且綁定了helloExchage交換器)

    發送消息:

    public void sendMsg(String exchange, String routingKey, String msg) {
    	amqpTemplate.convertAndSend(exchange, routingKey, msg);
    }
    
    public Object testRabbitMQSender(final String msg) throws Exception {
    	for(int i=0; i<9; i++) {
    		rabbitMQProducer.sendMsg("helloExchange", "helloDirect",msg);
    	}
    	return null;
    }
               
    消費消息:
    @RabbitListener(queues = "hello")
    public void consumerHelloQueueMessage(String message){
    	System.out.println("收到hello-queue封包:"+message);
    }
    
    @RabbitListener(queues = "hello")
    public void consumerHelloQueueMessage2(String message){
    	System.out.println("收到hello-queue2封包:"+message);
    }
    
    @RabbitListener(queues = "hello")
    public void consumerHelloQueueMessage3(String message){
    	System.out.println("收到hello-queue3封包:"+message);
    }
    
    // 當RabbitMQ中不存在綁定關系時,自動生成相應的綁定
    // 如不存在helloExchange、hello queue或helloDirect任意一個,都會自動生成并綁定關系
    @RabbitListener(bindings = @QueueBinding(
    		value = @Queue(value = "hello", durable = "true"),
    		exchange = @Exchange(value = "helloExchange", durable = "true"),
    		key = "helloDirect"
    	)
    )
    public void consumerQueueMessage(String message){
    	System.out.println("收到helloExchage->hello queue->helloDirect route封包:"+message);
    }
               
    測試結果:四個消費者均勻收到消息
    收到hello-queue2封包:Hello World
    收到hello-queue封包:Hello World
    收到hello-queue3封包:Hello World
    收到helloExchage->hello queue->helloDirect route封包:Hello World
    收到hello-queue2封包:Hello World
    收到hello-queue封包:Hello World
    收到hello-queue3封包:Hello World
    收到hello-queue封包:Hello World
    收到helloExchage->hello queue->helloDirect route封包:Hello World
               

    結果說明:

    關聯示意圖如下

    spring-boot學習:二十、spring-boot內建RabbitMQ
    當P通過helloExchange發送消息,經過路由helloDirect到達hello queue,然後均勻下發給所監聽的C,是以四個消費者都收到了消息
  • 測試demo2

    在測試demo1的基礎上增加一個hello2 queue, 并與helloExchange綁定,路由key為helloDirect2,一個消費者監聽,示意圖如下:

    spring-boot學習:二十、spring-boot內建RabbitMQ
    當P通過helloExchange發送消息,經過路由helloDirect2到達hello2 queue,監聽的消費者收到消息

9. Fanout Exchange

以廣播模式,給 Fanout 交換機發送消息,綁定了這個交換機的所有隊列都收到這個消息。

// 定義exchange、queue、binding
@Bean
public Queue fanoutQueue1() {
	return new Queue("fanout1");
}

@Bean
public Queue fanoutQueue2() {
	return new Queue("fanout2");
}

@Bean
public Queue fanoutQueue3() {
	return new Queue("fanout3");
}

@Bean
public FanoutExchange fanoutExchange() {
	return new FanoutExchange("fanoutExchange");
}

@Bean
public Binding fanoutBinding() {
	return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}

@Bean
public Binding fanoutBinding2() {
	return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}

@Bean
public Binding fanoutBinding3() {
	return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
           
// 發送消息
public Object testRabbitMQSender(final String msg) throws Exception {
	rabbitMQProducer.sendMsg("fanoutExchange", "",msg);
	return null;
}
           
// 消費消息
@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "fanout1", durable = "true"),
		exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
	)
)
public void consumerFanoutMessage1(String message){
	System.out.println("收到fanoutExchange->fanout封包1:"+message);
}

@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "fanout2", durable = "true"),
		exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
	)
)
public void consumerFanoutMessage2(String message){
	System.out.println("收到fanoutExchange->fanout封包2:"+message);
}

@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "fanout3", durable = "true"),
		exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
	)
)
public void consumerFanoutMessage3(String message){
	System.out.println("收到fanoutExchange->fanout封包3:"+message);
}
           

運作結果:

收到fanoutExchange->fanout封包1:Hello World fanout
收到fanoutExchange->fanout封包3:Hello World fanout
收到fanoutExchange->fanout封包2:Hello World fanout
           

10. Topic Exchange

使用通配符(*和#)路由

* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
           
// 定義exchange、queue、binding
@Bean
public Queue topicQueue1() {
	return new Queue("topic.q1");
}

@Bean
public Queue topicQueue2() {
	return new Queue("topic.q2");
}

@Bean
public Queue topicQueue3() {
	return new Queue("topic.q3");
}

@Bean
public TopicExchange topicExchange() {
	return new TopicExchange("topicExchange");
}

@Bean
public Binding topicBinding1() {
	return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.q1");
}

@Bean
public Binding topicBinding2() {
	return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}

@Bean
public Binding topicBinding3() {
	return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("topic.*");
}
           
// 消費者
@RabbitListener(queues = "topic.q1")
public void consumerTopicMessage(String message){
	System.out.println("收到topic.q1封包:"+message);
}

@RabbitListener(queues = "topic.q2")
public void consumerTopicMessage2(String message){
	System.out.println("收到topic.q2封包:"+message);
}

@RabbitListener(queues = "topic.q3")
public void consumerTopicMessage3(String message){
	System.out.println("收到topic.q3封包:"+message);
}
           
// 發送消息
1)rabbitMQProducer.sendMsg("topicExchange", "topic.q1",msg);
列印日志:(滿足topic.q1、topic.#、topic.*的監聽)
收到topic.q1封包:Hello World topic
收到topic.q2封包:Hello World topic
收到topic.q3封包:Hello World topic


2)rabbitMQProducer.sendMsg("topicExchange", "topic.q2",msg);
列印日志:(滿足topic.#、topic.*的監聽)
收到topic.q2封包:Hello World topic
收到topic.q3封包:Hello World topic

3)rabbitMQProducer.sendMsg("topicExchange", "topic.q2.x",msg);
列印日志:(滿足topic.#的監聽)
收到topic.q2封包:Hello World topic
           

常見異常

1. 未定義queue,直接監聽queue報錯:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[hello2]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'hello2' in vhost '/', class-id=50, method-id=10)
           

解決方法:

1) 控制台手動建立queue

2)配置queue

@Configuration
public class RabbitConfig {

	@Bean
	public Queue helloQueue() {
		return new Queue("hello");
	}
}
           

繼續閱讀