1. 消息隊列概述
目标:能夠說出什麼是消息隊列;為什麼使用消息隊列;常見産品有哪些
小結:
消息隊列是應用程式之間的通信方法;無需即時傳回的且耗時的操作進行異步處理進而提高系統的吞吐量;可以實作程式之間的解耦合。
- 實作方式:AMQP,JMS
- 常見産品:activeMQ,zeroMQ,RabbitMQ,RocketMQ,kafka
2. 安裝及配置RabbitMQ
目标:按照文檔在本機安裝windows版本RabbitMQ,并配置其使用者和Virtual Hosts
分析:
- 安裝erlang;
- 安裝rabbitMQ;
- 安裝RabbitMQ的圖形管理界面插件;
- 建立管理使用者;
- 建立虛拟主機Virtual Hosts
graph LR;
1[安裝erlang] --> 2[安裝RabbitMQ]
2 --> 3[安裝管理插件]
3 --> 4[建立使用者]
4 --> 5[建立虛拟主機]
安裝上述的元件時候都需要使用以管理者身份運作。
3. 搭建RabbitMQ入門工程
目标:搭建RabbitMQ入門工程并配置對應的maven依賴
建立heima-rabbitmq的工程;用于測試RabbitMQ的消息收發。添加用于操作RabbitMQ的依賴。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
使用IDEA建立maven工程;使用了jdk1.8。在工程中的pom.xml檔案中添加了上述的依賴。
4. 入門工程-生産者
目标:編寫消息生産者代碼,發送消息到隊列
入門工程:生産者發送消息到RabbitMQ的隊列(simple_queue);消費者可以從隊列中擷取消息。可以使用RabbitMQ的簡單模式(simple)。
生産者實作發送消息的步驟:
- 建立連接配接工廠(設定RabbitMQ的連接配接參數);
- 建立連接配接;
- 建立頻道;
- 聲明隊列;
- 發送消息;
- 關閉資源
package com.itheima.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 簡單模式:發送消息
*/
public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
//1. 建立連接配接工廠(設定RabbitMQ的連接配接參數);
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機;預設localhost
connectionFactory.setHost("localhost");
//連接配接端口;預設5672
connectionFactory.setPort(5672);
//虛拟主機;預設/
connectionFactory.setVirtualHost("/itcast");
//使用者名;預設guest
connectionFactory.setUsername("heima");
//密碼;預設guest
connectionFactory.setPassword("heima");
//2. 建立連接配接;
Connection connection = connectionFactory.newConnection();
//3. 建立頻道;
Channel channel = connection.createChannel();
//4. 聲明隊列;
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列(消息會持久化儲存在伺服器上)
* 參數3:是否獨占本連接配接
* 參數4:是否在不使用的時候隊列自動删除
* 參數5:其它參數
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//5. 發送消息;
String message = "你好!小兔紙。";
/**
* 參數1:交換機名稱;如果沒有則指定空字元串(表示使用預設的交換機)
* 參數2:路由key,簡單模式中可以使用隊列名稱
* 參數3:消息其它屬性
* 參數4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已發送消息:" + message);
//6. 關閉資源
channel.close();
connection.close();
}
}
在設定連接配接工廠的時候;如果沒有指定連接配接的參數則會有預設值;可以去設定虛拟主機。
5. 入門工程-消費者
目标:編寫消息消費者代碼,從隊列中接收消息并消費
從RabbitMQ中隊列(與生産者發送消息時的隊列一緻;simple_queue)接收消息;
實作消費者步驟:
- 建立連接配接工廠;
- 建立連接配接;(抽取一個擷取連接配接的工具類)
- 建立消費者(接收消息并處理消息);
- 監聽隊列
package com.itheima.rabbitmq.simple;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 簡單模式;消費者接收消息
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1. 建立連接配接工廠;
//2. 建立連接配接;(抽取一個擷取連接配接的工具類)
Connection connection = ConnectionUtil.getConnection();
//3. 建立頻道;
Channel channel = connection.createChannel();
//4. 聲明隊列;
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列(消息會持久化儲存在伺服器上)
* 參數3:是否獨占本連接配接
* 參數4:是否在不使用的時候隊列自動删除
* 參數5:其它參數
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//5. 建立消費者(接收消息并處理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("接收到的消息為:" + new String(body, "utf-8"));
}
};
//6. 監聽隊列
/**
* 參數1:隊列名
* 參數2:是否要自動确認;設定為true表示消息接收到自動向MQ回複接收到了,MQ則會将消息從隊列中删除;
* 如果設定為false則需要手動确認
* 參數3:消費者
*/
channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
}
}
需要持續監聽隊列消息,是以不要關閉資源
6. 入門工程測試
目标:啟動消費者和生産者,到RabbitMQ中查詢隊列并在消費者端IDEA控制台檢視接收到的消息
生産者:發送消息到RabbitMQ隊列(simple_queue)
消費者:接收RabbitMQ隊列消息
簡單模式:生産者發送消息到隊列中,一個消費者從隊列中接收消息。
在RabbitMQ中消費者隻能從隊列接收消息。
如果接收消息的消費者在同一個隊列中有兩個或多個時;消息是如何配置設定的?
7. Work queues工作隊列模式
目标:編寫生産者、消費者代碼并測試了解Work queues工作隊列模式的特點
工作隊列模式:在同一個隊列中可以有多個消費者,消費者之間對于消息的接收是競争關系。
生産者:發送30個消息
消費者:建立兩個消費者監聽同一個隊列,檢視兩個消費者的接收消息是否存在重複。
工作隊列模式:一個消息隻能被一個消費者接收,其它消費者是不能接收到同一條消息的。
應用場景:可以在消費者端處理任務比較耗時的時候;添加對同一個隊列的消費者來提高任務處理能力。
8. 訂閱模式類型說明
目标:說出訂閱模式中的Exchange交換機作用以及交換機的三種類型
訂閱模式與前面的兩種模式比較:多了一個角色Exchange交換機,接收生産者發送的消息并決定如何投遞消息到其綁定的隊列;消息的投遞決定于交換機的類型。
交換機類型:廣播(fanout)、定向(direct)、通配符(topic)
交換機隻做消息轉發,自身不存儲資料。
9. Publish/Subscribe釋出與訂閱模式
目标:編寫生産者、消費者代碼并測試了解Publish/Subscribe釋出與訂閱模式的特點
釋出與訂閱模式特點:一個消息可以被多個消費者接收;其實是使用了訂閱模式,交換機類型為:fanout廣播
- 生産者(發送10個消息)
- 聲明交換機(fanout);
- 隊列綁定到交換機;
- 消費者(至少兩個消費者)
- 聲明交換機;
- 建立消費者;
- 監聽隊列;
釋出與訂閱模式:一個消息可以被多個消費者接收;一個消費者對于的隊列,該隊列隻能被一個消費者監聽。使用了訂閱模式中交換機類型為:廣播。
10. Routing路由模式
目标:編寫生産者、消費者代碼并測試了解Routing路由模式的特點
生産者:發送兩條消息(路由key分别為:insert、update)
消費者:建立兩個消費者,監聽的隊列分别綁定路由key為:insert、update
- 消息中路由key為insert的會被綁定路由key為insert的隊列接收并被其監聽的消費者接收、處理;
- 消息中路由key為update的會被綁定路由key為update的隊列接收并被其監聽的消費者接收、處理;
Routing 路面模式要求隊列綁定到交換機的時候指定路由key;消費發送時候需要攜帶路由key;隻有消息的路由key與隊列路由key完全一緻才能讓該隊列接收到消息。
11. Topics通配符模式
目标:編寫生産者、消費者代碼并測試了解Topics通配符模式的特點
- 生産者:發送包含有item.insert、item.update,item.delete的3中路由key消息
- 消費者1:監聽的隊列綁定到交換機的路由key為:item.update,item.delete
- 消費者2:監聽的隊列綁定到交換機的路由key為:item.*
Topics通配符模式:可以根據路由key将消息傳遞到對應路由key的隊列;隊列綁定到交換機的路由key可以有多個;通配符模式中路由key可以使用
*
和
#
;使用了通配符模式之後對于路由Key的配置更加靈活。
12. RabbitMQ模式總結
目标:對比總結RabbitMQ的5種模式特征
- 不直接Exchange交換機(預設交換機)
- simple簡單模式:一個生産者生産一個消息到一個隊列被一個消費者接收
- work工作隊列模式:生産者發送消息到一個隊列中,然後可以被多個消費者監聽該隊列;一個消息隻能被一個消費者接收,消費者之間是競争關系
- 使用Exchange交換機;訂閱模式(交換機:廣播fanout、定向direct、通配符topic)
- 釋出與訂閱模式:使用了fanout廣播類型的交換機,可以将一個消息發送到所有綁定了該交換機的隊列
- 路由模式:使用了direct定向類型的交換機,消費會攜帶路由key,交換機根據消息的路由key與隊列的路由key進行對比,一緻的話那麼該隊列可以接收到消息
- 通配符模式:使用了topic通配符類型的交換機,消費會攜帶路由key(*, #),交換機根據消息的路由key與隊列的路由key進行對比,比對的話那麼該隊列可以接收到消息
13. 建立SpringBoot整合RabbitMQ的兩個工程
目标:建立springboot-rabbitmq-producer工程用于生産消息;建立springboot-rabbitmq-consumer工程用于接收消息
Spring Boot提供了對于AMQP的整合;可以使用RabbitTemplate發送消息;可以使用@RabbitListener注解接收消息。
生産者工程springboot-rabbitmq-producer:發送消息
- 建立工程;
- 添加依賴(spring-boot-stater-amqp,spring-boot-starter-test);
- 建立啟動引導類;
- 添加配置檔案application.yml
消費者工程springboot-rabbitmq-consumer:接收消息
- 添加依賴(spring-boot-stater-amqp);
可以使用插件自動生産Spring Boot工程的啟動引導類Application.java和配置檔案application.yml
14. 配置生産者工程
目标:配置springboot-rabbitmq-producer工程的RabbitMQ,一個交換機、隊列并綁定
使用通配符模式:将隊列綁定到交換機(topic)時需要指定路由key(item.#)
- 配置RabbitMQ的連接配接參數:主機、連接配接端口、虛拟主機、使用者名、密碼;
- 聲明交換機、隊列并将隊列綁定到交換機,指定的路由key(item.#)
- 配置application.yml檔案
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /itcast
username: heima
password: heima
- 配置交換機、隊列和綁定,建立一個配置類
@Configuration
public class RabbitMQConfig {
//交換機名稱
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
//隊列名稱
public static final String ITEM_QUEUE = "item_queue";
//聲明交換機
@Bean("itemTopicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
//聲明隊列
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
//将隊列綁定到交換機
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
15. 配置消費者工程
目标:配置springboot-rabbitmq-consumer工程的RabbitMQ,編寫消息監聽器接收消息
- 配置application.yml檔案,設定RabbitMQ的連接配接參數;
- 編寫消息監聽器接收隊列(item_queue)消息;可以使用注解@RabbitListener接收隊列消息
- 配置application.yml檔案;與生産者工程一緻
- 編寫監聽器類
@Component
public class MyListener {
/**
* 接收隊列消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "item_queue")
public void myListener1(String message){
System.out.println("消費者接收到消息:" + message);
}
}
接收消息的隊列名稱要與生産者發送消息時的隊列名稱一緻
16. 測試消息發送和接收
目标:生産者編寫測試類RabbitMQTest發送消息到交換機和特定的路由(item.insert,item.update,item.delete)
生産者:編寫測試類RabbitMQTest,利用RabbitTemplate發送3條消息,這3條消息的路由key分别是item.insert,item.update,item.delete
消費者:在IDEA控制台檢視是否能接收到符合路由key的消息
編寫測試類如下:
package com.itheima.rabbitmq;
import com.itheima.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"item.insert", "商品新增,路由Key為item.insert");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"item.update", "商品新增,路由Key為item.update");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"item.delete", "商品新增,路由Key為item.delete");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"a.item.delete", "商品新增,路由Key為a.item.delete");
}
}
先啟動測試類進行聲明交換機、隊列和綁定;之後再啟動消費者工程接收消息。