天天看點

RabbitMQ

1. 消息隊列概述

目标:能夠說出什麼是消息隊列;為什麼使用消息隊列;常見産品有哪些

小結:

消息隊列是應用程式之間的通信方法;無需即時傳回的且耗時的操作進行異步處理進而提高系統的吞吐量;可以實作程式之間的解耦合。

  • 實作方式:AMQP,JMS
  • 常見産品:activeMQ,zeroMQ,RabbitMQ,RocketMQ,kafka

2. 安裝及配置RabbitMQ

目标:按照文檔在本機安裝windows版本RabbitMQ,并配置其使用者和Virtual Hosts

分析:

  1. 安裝erlang;
  2. 安裝rabbitMQ;
  3. 安裝RabbitMQ的圖形管理界面插件;
  4. 建立管理使用者;
  5. 建立虛拟主機Virtual Hosts

graph LR;

1[安裝erlang] --> 2[安裝RabbitMQ]

2 --> 3[安裝管理插件]

3 --> 4[建立使用者]

4 --> 5[建立虛拟主機]

安裝上述的元件時候都需要使用以管理者身份運作。

3. 搭建RabbitMQ入門工程

目标:搭建RabbitMQ入門工程并配置對應的maven依賴

建立heima-rabbitmq的工程;用于測試RabbitMQ的消息收發。添加用于操作RabbitMQ的依賴。

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
           

使用IDEA建立maven工程;使用了jdk1.8。在工程中的pom.xml檔案中添加了上述的依賴。

4. 入門工程-生産者

目标:編寫消息生産者代碼,發送消息到隊列

入門工程:生産者發送消息到RabbitMQ的隊列(simple_queue);消費者可以從隊列中擷取消息。可以使用RabbitMQ的簡單模式(simple)。

生産者實作發送消息的步驟:

  1. 建立連接配接工廠(設定RabbitMQ的連接配接參數);
  2. 建立連接配接;
  3. 建立頻道;
  4. 聲明隊列;
  5. 發送消息;
  6. 關閉資源
package com.itheima.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 簡單模式:發送消息
 */
public class Producer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        //1. 建立連接配接工廠(設定RabbitMQ的連接配接參數);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機;預設localhost
        connectionFactory.setHost("localhost");
        //連接配接端口;預設5672
        connectionFactory.setPort(5672);
        //虛拟主機;預設/
        connectionFactory.setVirtualHost("/itcast");
        //使用者名;預設guest
        connectionFactory.setUsername("heima");
        //密碼;預設guest
        connectionFactory.setPassword("heima");

        //2. 建立連接配接;
        Connection connection = connectionFactory.newConnection();
        //3. 建立頻道;
        Channel channel = connection.createChannel();
        //4. 聲明隊列;
        /**
         * 參數1:隊列名稱
         * 參數2:是否定義持久化隊列(消息會持久化儲存在伺服器上)
         * 參數3:是否獨占本連接配接
         * 參數4:是否在不使用的時候隊列自動删除
         * 參數5:其它參數
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //5. 發送消息;
        String message = "你好!小兔紙。";

        /**
         * 參數1:交換機名稱;如果沒有則指定空字元串(表示使用預設的交換機)
         * 參數2:路由key,簡單模式中可以使用隊列名稱
         * 參數3:消息其它屬性
         * 參數4:消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("已發送消息:" + message);
        //6. 關閉資源
        channel.close();
        connection.close();
    }
}
           
在設定連接配接工廠的時候;如果沒有指定連接配接的參數則會有預設值;可以去設定虛拟主機。

5. 入門工程-消費者

目标:編寫消息消費者代碼,從隊列中接收消息并消費

從RabbitMQ中隊列(與生産者發送消息時的隊列一緻;simple_queue)接收消息;

實作消費者步驟:

  1. 建立連接配接工廠;
  2. 建立連接配接;(抽取一個擷取連接配接的工具類)
  3. 建立消費者(接收消息并處理消息);
  4. 監聽隊列
package com.itheima.rabbitmq.simple;

import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 簡單模式;消費者接收消息
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1. 建立連接配接工廠;
        //2. 建立連接配接;(抽取一個擷取連接配接的工具類)
        Connection connection = ConnectionUtil.getConnection();
        //3. 建立頻道;
        Channel channel = connection.createChannel();
        //4. 聲明隊列;
        /**
         * 參數1:隊列名稱
         * 參數2:是否定義持久化隊列(消息會持久化儲存在伺服器上)
         * 參數3:是否獨占本連接配接
         * 參數4:是否在不使用的時候隊列自動删除
         * 參數5:其它參數
         */
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        //5. 建立消費者(接收消息并處理消息);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機
                System.out.println("交換機為:" + envelope.getExchange());
                //消息id
                System.out.println("消息id為:" + envelope.getDeliveryTag());
                //接收到的消息
                System.out.println("接收到的消息為:" + new String(body, "utf-8"));
            }
        };
        //6. 監聽隊列
        /**
         * 參數1:隊列名
         * 參數2:是否要自動确認;設定為true表示消息接收到自動向MQ回複接收到了,MQ則會将消息從隊列中删除;
         * 如果設定為false則需要手動确認
         * 參數3:消費者
         */
        channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
    }
}

           
需要持續監聽隊列消息,是以不要關閉資源

6. 入門工程測試

目标:啟動消費者和生産者,到RabbitMQ中查詢隊列并在消費者端IDEA控制台檢視接收到的消息

生産者:發送消息到RabbitMQ隊列(simple_queue)

消費者:接收RabbitMQ隊列消息

簡單模式:生産者發送消息到隊列中,一個消費者從隊列中接收消息。

在RabbitMQ中消費者隻能從隊列接收消息。

如果接收消息的消費者在同一個隊列中有兩個或多個時;消息是如何配置設定的?

7. Work queues工作隊列模式

目标:編寫生産者、消費者代碼并測試了解Work queues工作隊列模式的特點

工作隊列模式:在同一個隊列中可以有多個消費者,消費者之間對于消息的接收是競争關系。

生産者:發送30個消息

消費者:建立兩個消費者監聽同一個隊列,檢視兩個消費者的接收消息是否存在重複。

工作隊列模式:一個消息隻能被一個消費者接收,其它消費者是不能接收到同一條消息的。

應用場景:可以在消費者端處理任務比較耗時的時候;添加對同一個隊列的消費者來提高任務處理能力。

8. 訂閱模式類型說明

目标:說出訂閱模式中的Exchange交換機作用以及交換機的三種類型

訂閱模式與前面的兩種模式比較:多了一個角色Exchange交換機,接收生産者發送的消息并決定如何投遞消息到其綁定的隊列;消息的投遞決定于交換機的類型。

交換機類型:廣播(fanout)、定向(direct)、通配符(topic)

交換機隻做消息轉發,自身不存儲資料。

9. Publish/Subscribe釋出與訂閱模式

目标:編寫生産者、消費者代碼并測試了解Publish/Subscribe釋出與訂閱模式的特點

釋出與訂閱模式特點:一個消息可以被多個消費者接收;其實是使用了訂閱模式,交換機類型為:fanout廣播

  • 生産者(發送10個消息)
    1. 聲明交換機(fanout);
    2. 隊列綁定到交換機;
  • 消費者(至少兩個消費者)
    1. 聲明交換機;
    2. 建立消費者;
    3. 監聽隊列;

釋出與訂閱模式:一個消息可以被多個消費者接收;一個消費者對于的隊列,該隊列隻能被一個消費者監聽。使用了訂閱模式中交換機類型為:廣播。

10. Routing路由模式

目标:編寫生産者、消費者代碼并測試了解Routing路由模式的特點

生産者:發送兩條消息(路由key分别為:insert、update)

消費者:建立兩個消費者,監聽的隊列分别綁定路由key為:insert、update

  1. 消息中路由key為insert的會被綁定路由key為insert的隊列接收并被其監聽的消費者接收、處理;
  2. 消息中路由key為update的會被綁定路由key為update的隊列接收并被其監聽的消費者接收、處理;

Routing 路面模式要求隊列綁定到交換機的時候指定路由key;消費發送時候需要攜帶路由key;隻有消息的路由key與隊列路由key完全一緻才能讓該隊列接收到消息。

11. Topics通配符模式

目标:編寫生産者、消費者代碼并測試了解Topics通配符模式的特點

  • 生産者:發送包含有item.insert、item.update,item.delete的3中路由key消息
  • 消費者1:監聽的隊列綁定到交換機的路由key為:item.update,item.delete
  • 消費者2:監聽的隊列綁定到交換機的路由key為:item.*

Topics通配符模式:可以根據路由key将消息傳遞到對應路由key的隊列;隊列綁定到交換機的路由key可以有多個;通配符模式中路由key可以使用

*

#

;使用了通配符模式之後對于路由Key的配置更加靈活。

12. RabbitMQ模式總結

目标:對比總結RabbitMQ的5種模式特征

  • 不直接Exchange交換機(預設交換機)
    1. simple簡單模式:一個生産者生産一個消息到一個隊列被一個消費者接收
    2. work工作隊列模式:生産者發送消息到一個隊列中,然後可以被多個消費者監聽該隊列;一個消息隻能被一個消費者接收,消費者之間是競争關系
  • 使用Exchange交換機;訂閱模式(交換機:廣播fanout、定向direct、通配符topic)
    1. 釋出與訂閱模式:使用了fanout廣播類型的交換機,可以将一個消息發送到所有綁定了該交換機的隊列
    2. 路由模式:使用了direct定向類型的交換機,消費會攜帶路由key,交換機根據消息的路由key與隊列的路由key進行對比,一緻的話那麼該隊列可以接收到消息
    3. 通配符模式:使用了topic通配符類型的交換機,消費會攜帶路由key(*, #),交換機根據消息的路由key與隊列的路由key進行對比,比對的話那麼該隊列可以接收到消息

13. 建立SpringBoot整合RabbitMQ的兩個工程

目标:建立springboot-rabbitmq-producer工程用于生産消息;建立springboot-rabbitmq-consumer工程用于接收消息

Spring Boot提供了對于AMQP的整合;可以使用RabbitTemplate發送消息;可以使用@RabbitListener注解接收消息。

生産者工程springboot-rabbitmq-producer:發送消息

  1. 建立工程;
  2. 添加依賴(spring-boot-stater-amqp,spring-boot-starter-test);
  3. 建立啟動引導類;
  4. 添加配置檔案application.yml

消費者工程springboot-rabbitmq-consumer:接收消息

  1. 添加依賴(spring-boot-stater-amqp);
可以使用插件自動生産Spring Boot工程的啟動引導類Application.java和配置檔案application.yml

14. 配置生産者工程

目标:配置springboot-rabbitmq-producer工程的RabbitMQ,一個交換機、隊列并綁定

使用通配符模式:将隊列綁定到交換機(topic)時需要指定路由key(item.#)

  1. 配置RabbitMQ的連接配接參數:主機、連接配接端口、虛拟主機、使用者名、密碼;
  2. 聲明交換機、隊列并将隊列綁定到交換機,指定的路由key(item.#)
  • 配置application.yml檔案
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /itcast
    username: heima
    password: heima
           
  • 配置交換機、隊列和綁定,建立一個配置類
@Configuration
public class RabbitMQConfig {
    //交換機名稱
    public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";

    //隊列名稱
    public static final String ITEM_QUEUE = "item_queue";

    //聲明交換機
    @Bean("itemTopicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }

    //聲明隊列
    @Bean("itemQueue")
    public Queue itemQueue(){
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }

    //将隊列綁定到交換機
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
}

           

15. 配置消費者工程

目标:配置springboot-rabbitmq-consumer工程的RabbitMQ,編寫消息監聽器接收消息

  1. 配置application.yml檔案,設定RabbitMQ的連接配接參數;
  2. 編寫消息監聽器接收隊列(item_queue)消息;可以使用注解@RabbitListener接收隊列消息
  • 配置application.yml檔案;與生産者工程一緻
  • 編寫監聽器類
@Component
public class MyListener {

    /**
     * 接收隊列消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "item_queue")
    public void myListener1(String message){
        System.out.println("消費者接收到消息:" + message);
    }
}

           
接收消息的隊列名稱要與生産者發送消息時的隊列名稱一緻

16. 測試消息發送和接收

目标:生産者編寫測試類RabbitMQTest發送消息到交換機和特定的路由(item.insert,item.update,item.delete)

生産者:編寫測試類RabbitMQTest,利用RabbitTemplate發送3條消息,這3條消息的路由key分别是item.insert,item.update,item.delete

消費者:在IDEA控制台檢視是否能接收到符合路由key的消息

編寫測試類如下:

package com.itheima.rabbitmq;

import com.itheima.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
                "item.insert", "商品新增,路由Key為item.insert");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
                "item.update", "商品新增,路由Key為item.update");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
                "item.delete", "商品新增,路由Key為item.delete");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
                "a.item.delete", "商品新增,路由Key為a.item.delete");
    }
}

           
先啟動測試類進行聲明交換機、隊列和綁定;之後再啟動消費者工程接收消息。