天天看點

RabbitMQ 安裝配置和 Spring 內建

本文從安裝和配置 RabbitMQ 開始,準備好環境後,直接在 Spring 中內建,并且針對 Spring 中的常見用法提供了示例和講解。

由于我個人更多的在 centos 安裝,是以這裡插入最快安裝的過程,使用 Package Cloud 方式。

安裝 erlang

打開 ​​​https://packagecloud.io/rabbitmq/erlang/install#bash-rpm​​ 執行:

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

然後 ​

​yum install erlang​

​ 安裝

安裝 rabbitmq

打開 ​​​https://packagecloud.io/rabbitmq/rabbitmq-server/install#bash-rpm​​ 執行:

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

然後 ​

​yum install rabbitmq-server​

​​ 安裝完成。

繼續看本文配置部分。

安裝

一般開發環境可能用的都是 Windows,生産環境 Linux 用的比較多,這裡針對 Windows 和 Ubuntu 的安裝說明簡單提煉。其他環境可以直接參考官方文檔:https://www.rabbitmq.com/download.html

Windows 安裝

Windows 上安裝很容易,先安裝 Erlang/OTP 環境(注意和 RabbitMQ 版本比對),再安裝 RabbitMQ 即可。

下載下傳位址:

  • 版本依賴: https://www.rabbitmq.com/which-erlang.html
  • Erlang/OTP: http://www.erlang.org/downloads
  • RabbitMQ: https://www.rabbitmq.com/install-windows.html

Ubuntu 安裝

1 為了使用存儲庫方式安裝最新版本,需要将 RabbitMQ 簽名秘鑰添加到 ​

​apt-key​

​ 中,從下面兩種方式選擇一種方式執行:

sudo apt-key adv --keyserver "hkps.pool.sks-keyservers.net" --recv-keys "0x6B73A36E6026DFCA"      

或者

wget -O - "https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc" | sudo      

第二種方式無需密鑰伺服器即可下載下傳和導入密鑰。

我使用的第一種。

2 然後在 ​​packagecloud​​ 有段腳本(自動根據伺服器版本選擇對應的安裝源),目前(2018-11-30)的内容如下:

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash      

3 執行該腳本後,繼續然後執行下面的指令:

sudo apt-get      

更新後,可以通過下面指令檢視目前的 rabbitmq-server 的可用版本:

apt-cache madison rabbitmq-server      

我這裡的結果(2018-12-01)顯示如下:

rabbitmq-server |    3.7.9-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.8-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.7-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.6-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.5-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.16-2 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.16-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.15-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.10-1 | http://archive.ubuntu.com/ubuntu bionic/main amd64 Packages      

4 執行下面的指令安裝 rabbitmq-server

sudo apt-get install      

此時安裝的應該是最新的版本。

可以通過 sudo apt-get install rabbitmq-server=3.7.9-1 安裝指定版本。

配置

如果想要修改 mq 存儲資料的位置,需要先參考下面的文檔進行配置:

​​​https://www.rabbitmq.com/configure.html#config-file​​

接下來主要是在 Ubuntu 環境(Windows 環境類似)進行配置。由于沒有桌面環境,是以先通過指令建立可以外網通路 rabbitmq 的使用者,然後啟用 management 在通過網頁進行管理。

添加使用者 root,密碼 root。(根據自己需要設定)

sudo      

給 root 添加管理權限。

sudo      

給 root 添加預設虛拟主機的所有權限。

sudo rabbitmqctl set_permissions -p / root ".*" ".*" ".*"      

Windows 中的操作過程

​D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat add_user root root Adding user "root" ... D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat set_user_tags root administrator Setting tags for user "root" to [administrator] ... D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*" Setting permissions for user "root" in vhost "/" ... ​

啟用 ​

​rabbitmq_management​

sudo rabbitmq-plugins enable      
啟用 ​

​rabbitmq_management​

​ 後不需要重新開機服務

此後可以直接通路 rabbitmq 的 http://RabbitMQ服務IP:15672 通過 WEB 進行管理。

備忘錄(暫時不用關注這裡,測試叢集時可用)

單機啟動多個帶有 rabbitmq_management 節點時的配置

​RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbitl RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port , 156721}]" rabbitmq-server -detached ​

參考 RabbitMQ實戰指南 7.1.5 單機多節點配置

準備好 RabbitMQ 環境後,下面直接和 Spring 內建。

初學者建議先通過官方示例了解 RabbitMQ 的基本概念和用法:https://www.rabbitmq.com/getstarted.html

Spring 內建

下面先是 Spring 內建的配置,然後是項目中具體的用法。

下面示例所有連結都可以直接打開展示完整内容。

完整示例位址:https://github.com/abel533/spring-rabbitmq-demo

配置

1 添加​​相關依賴​​

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.11.RELEASE</version>
</dependency>
<!-- spring-rabbit 依賴 spring-amqp,下面這個依賴可以不顯示引入 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>1.7.11.RELEASE</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.8.11.1</version>
</dependency>
<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.13</version>
</dependency>      

2 配置檔案

将 ​​spring-rabbit 配置​​​單獨放在一個檔案中,需要的時候可以直接在 Spring 中 ​

​<import>​

​。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
       http://www.springframework.org/schema/task
       http://www.springframework.org/schema/task/spring-task.xsd">

    <!--啟用注解監聽消息-->
    <rabbit:annotation-driven/>

    <!--連接配接工廠配置-->
    <rabbit:connection-factory id="rabbitConnectionFactory"
                               thread-factory="amqpThreadFactory"
                               virtual-host="${rabbitmq.virtual-host:/}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               channel-cache-size="${rabbitmq.channel-cache-size:30}"
                               addresses="${rabbitmq.addresses}"/>

    <bean id="amqpThreadFactory" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
        <constructor-arg value="rabbitmq-"/>
    </bean>

    <!--消息模闆-->
    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
                     message-converter="amqpMessageConverter"/>

    <!--消息轉換,生産者和消費者都需要 -->
    <bean id="amqpMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!--amqp管理-->
    <rabbit:admin id="amqpAdmin" connection-factory="rabbitConnectionFactory"/>

    <!--消息監聽容器,配合注解監聽消息-->
    <bean id="rabbitListenerContainerFactory"
          class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
        <!--并發消費者數量-->
        <property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers:3}"/>
        <!--最大數量-->
        <property name="maxConcurrentConsumers" value="${rabbitmq.maxConcurrentConsumers:10}"/>
        <!--消息轉換-->
        <property name="messageConverter" ref="amqpMessageConverter"/>
        <!--任務線程池-->
        <property name="taskExecutor">
            <task:executor id="amqpTaskExecutor" pool-size="${rabbitmq.task-executor.pool-size:100}"/>
        </property>
        <!--手動确認-->
        <property name="acknowledgeMode" value="${rabbitmq.acknowledgeMode:MANUAL}"/>
    </bean>

</beans>      

3 ​​Spring 配置檔案​​中需要提供的配置

# rabbitmq 消息配置
rabbitmq.addresses=localhost:5672
rabbitmq.virtual-host=/
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.channel-cache-size=50
rabbitmq.concurrentConsumers=3
rabbitmq.maxConcurrentConsumers=10
# 确認方式 MANUAL 手動,AUTO 自動,NONE 自動确認
rabbitmq.acknowledgeMode=MANUAL
# 線程池數量 = 并發數 * 監聽數
rabbitmq.task-executor.pool-size=100      

下面是和 Spring 內建後的用法。

測試中,增加了 ​​spring.xml​​ 配置檔案,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/task
       http://www.springframework.org/schema/task/spring-task.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">

    <!--加載屬性配置檔案-->
    <context:property-placeholder location="classpath:META-INF/spring/application.properties"/>

    <!--掃描包-->
    <context:component-scan base-package="rabbitmq"/>

    <!--Producter 中的任務排程使用-->
    <task:scheduler id="taskScheduler"/>
    <task:annotation-driven scheduler="taskScheduler"/>

    <!--引入 spring-rabbitmq 配置-->
    <import resource="classpath*:META-INF/spring/spring-rabbitmq.xml"/>

</beans>      

生産者

​​示例代碼​​ 如下:

@Component
public class Producter {
    public static final Logger logger = LoggerFactory.getLogger(Producter.class);

    @Autowired
    private AmqpTemplate template;

    @Autowired
    private AmqpAdmin admin;

    @PostConstruct
    protected void init() {
        //定義交換機
        Exchange exchange = ExchangeBuilder.topicExchange("logger").durable(true).build();
        admin.declareExchange(exchange);
        //還可以定義隊列和綁定
    }

    final Random random = new Random();
    final String[] keys = new String[]{"logger.error", "logger.warn", "logger.info"};
    AtomicInteger count = new AtomicInteger();

    @Scheduled(fixedDelay = 1000)
    protected void product() {
        String key = keys[random.nextInt(3)];
        int i = count.getAndIncrement();
        String message = key + " > " + i + " " + new Date();
        User obj = new User(message, i);
        template.convertAndSend("logger", key, obj);
        logger.info("[Send] " + obj);
    }

}      
  1. 在代碼中直接注入​

    ​AmqpTemplate​

    ​,用于發送或接收消息。
  2. 根據需要注入​

    ​AmqpAdmin​

    ​,可以用于建立交換機、隊列和綁定。

上面代碼中,在 ​

​init​

​​ 初始化中定義了一個交換機。通過 ​

​product​

​​ 定時任務,每隔 1000 毫秒執行一次,調用 ​

​template.convertAndSend("logger", key, obj);​

​ 發送消息,發送的對象會根據前面 spring-rabbit 配置檔案中的消息轉換器轉換為 JSON 資料進行發送。

生産者的邏輯可以根據業務需要進行定制。

消費者

消費者有多種用法,這裡使用最友善的注解用法。

在 ​​Consumer​​ 代碼中,有 3 個例子,這裡拿第一個進行講解:

/**
 * 接收對象的例子
 *
 * 該方法還可以直接注入 org.springframework.amqp.core.Message 對象
 *
 * @param data
 * @param deliveryTag
 * @param channel
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "logger.all", durable = "true"),
        exchange = @Exchange(value = "logger", 
                             durable = "true", 
                             ignoreDeclarationExceptions = "true", 
                             type = ExchangeTypes.TOPIC),
        key = "logger.#"
))
public void all(User data, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, Channel channel) {
    try {
        //測試用,随機确認和拒絕(并傳回隊列)
        if(Math.random() > 0.5d){
            logger.info("[reject] deliveryTag:" + deliveryTag + ", message: " + data);
            channel.basicReject(deliveryTag, true);
        } else {
            logger.info("[ack   ] deliveryTag:" + deliveryTag + ", message: " + data);
            channel.basicAck(deliveryTag, false);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}      

注解

消費者監聽的主要注解就是 ​

​@RabbitListener​

​​,上面例子是一個比較複雜的用法,下面從簡單開始說起。

最簡單的情況下,注解用法如下:

@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
    ...
}      

這種情況下,要求 ​

​myQueue​

​ 隊列已經存在,這樣就能直接監聽該隊列。除此之外這裡接收的參數要求是字元串類型,和消費者發送的消息類型需要一緻。

再稍微簡單點的情況下,用法如下:

@RabbitListener(bindings = @QueueBinding(
      value = @Queue,
      exchange = @Exchange(value = "auto.exch"),
      key = "invoiceRoutingKey")
)
public void processInvoice(String data) {
  ...
}      

實際上這裡已經有些複雜了,這個例子的特點就是,不需要事先存在交換機、隊列和綁定。Spring 在啟動的時候會根據這裡的注解去建立這三者(RabbitMQ 規則是如果隊列、交換機已經存在,在參數相同的情況下會直接複用,不會建立新的,如果參數不同會報錯)。這裡的隊列隻用了 ​

​@Queue​

​​,是以會建立一個匿名獨占自動删除的隊列。交換機的名字指定了 ​

​auto.exch​

​​,隊列和交換機通過 ​

​invoiceRoutingKey​

​ 進行綁定。

現在再來看本例的用法:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "logger.all", durable = "true"),
        exchange = @Exchange(value = "logger", 
                             durable = "true", 
                             ignoreDeclarationExceptions = "true", 
                             type = ExchangeTypes.TOPIC),
        key = "logger.#"
))      

這裡建立了一個指定名稱的隊列,并且配置了持久化。還建立了一個支援持久化的交換機,類型為 ​

​TOPIC​

​​,并且忽略交換機的聲明異常(如果已經存在并且屬性不同時,忽略此異常)。通過 ​

​logger.#​

​​ 進行比對,在主題交換機中,有兩個特殊的字元 ​

​*​

​​ 和 ​

​#​

​​,分别比對一個逗号隔開的單詞和任意(可0)單詞。是以這裡能比對 ​

​logger.info​

​​, ​

​logger.xxx.debug​

​ 等路由。

除了上面這些常見用法外,還有一個特殊的情況,可以根據接收類型自動比對的用法,如下:

@RabbitListener(id="multi", queues = "someQueue")
public class MultiListenerBean {

    @RabbitHandler
    @SendTo("my.reply.queue")
    public String bar(Bar bar) {
        ...
    }

    @RabbitHandler
    public String baz(Baz baz) {
        ...
    }

    @RabbitHandler
    public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
        ...
    }

}      

在類上使用了 ​

​@RabbitListener​

​​ 注解,在方法上使用了 ​

​@RabbitHandler​

​​ 注解。在監聽 ​

​someQueue​

​​ 隊列時,會根據消息的實際類型,調用比對的方法(​

​Bar​

​​, ​

​Baz​

​​ 和 ​

​Qux​

​)。

特别注意:隻有上面這種用法下才會根據類型進行比對,直接在方法上使用 ​

​@RabbitListener​

​ 注解時不會自動比對。

下面來看看這個參數需要注意的地方。

參數

在我們配置的 JSON 轉換中,除了轉換的 JSON 串之外,在消息中還記錄了類型的資訊。如下圖所示:

RabbitMQ 安裝配置和 Spring 內建

可以看到在消息屬性頭中,通過 ​

​__TypeId__​

​​ 記錄了消息對象的實際類型,是以在 Spring 中的序列化和反序列化中能夠根據這裡的類型進行轉換,當接收類型和這裡指定的類型不一緻時會報錯(隻有前面 ​

​@RabbitHandler​

​ 用法中會去比對正确的方法,無法比對時報錯)。

Spring AMQP 中支援以下幾類參數:

  1. 消息對象(payload),如果參數類型不能明确比對時,需要通過​

    ​@Payload​

    ​ 指定消息體。
  2. ​com.rabbitmq.client.Channel​

    ​,消息通道,可以調用 AMQP 的基本方法,常用于 ack 和 reject。
  3. ​@Header​

    ​ 注解的參數,從消息頭提取指定的資訊。
  4. ​org.springframework.amqp.core.Message​

    ​ 消息的原始對象。
  5. ​org.springframework.messaging.Message<T>​

    ​ 消息接口,通過泛型指定消息體類型,可以在 1 的基礎上額外擷取消息頭資訊。

ack 和 reject

在本例中,由于要手動 ACK 或 REJECT,是以在消息體之外還注入了 ​

​@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag​

​​ 和 ​

​Channel​

​。

在業務邏輯執行完成後或者發生異常時,根據具體的情況來選擇執行。

如果業務順利執行完成,我們可以直接通過 ​

​channel.basicAck(deliveryTag, false);​

​ 确認消費,此後消息隊列會删除這條已消費的消息。

如果業務中出現了異常,需要具體分析,如果隻是網絡或可以重試的問題,我們可以通過 ​

​channel.basicReject(deliveryTag, true);​

​​ 将消息返還給消息隊列。如果出現的是問題是業務邏輯或者就算重複執行仍然有問題的情況,可能就需要通過 ​

​channel.basicReject(deliveryTag, false);​

​删除該消息(存在死信隊列的情況會接收該消息,可以進行後續處理)。

總結

學會使用 RabbitMQ 是一件很容易的事情,但是用好用對是很不容易的事。不同常見和業務都需要考慮使用什麼類型的交換機,使用什麼樣的隊列,每個隊列配置設定多少個并發,這些都很重要。

想要真正用好消息隊列,還需要學習很多知識,你可以通過下面的參考資料了解更多。

參考資料

在我學 RabbitMQ 的過程中,下面這些資料是特别有用的,都是官方提供的項目文檔,必要的時候可以多看幾遍。

  1. ​​https://www.rabbitmq.com​​
  2. ​​https://www.rabbitmq.com/man/rabbitmqctl.8.html​​
  3. ​​https://docs.spring.io/spring-amqp/docs/1.7.11.RELEASE/reference/html/index.html​​
  • RabbitMQ實戰:高效部署分布式消息隊列
  • RabbitMQ實戰指南