天天看點

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

在項目中有商品的變化,包括訂單的一些狀态變化需要用到RabbitMQ通知買家或者賣家,是以自己就去學習了一下。

在springBoot中使用RabbitMQ

  • 1.RabbitMQ是什麼?
    • 1.1什麼是AMQP?
      • 1.1.1 AMQP 中包含的主要元素
      • 1.1.2 exchange 與 Queue 的路由機制
    • 1.2 什麼是JMS
  • 2.為什麼要使用RabbitMQ?
  • 3.使用RabbitMQ的場景
  • 4.RabbitMQ在springBoot中的使用
    • 4.1 建立一個springBoot工程
    • 4.2 配置rabbitMQ
    • 4.3設定Exchage政策
      • 4.3.1 Direct
      • 4.3.2 Fanout
      • 4.3.3 Topic
      • 4.3.4 Header

1.RabbitMQ是什麼?

采用AMQP進階消息隊列協定的一種消息隊列技術,最大的特點就是消費并不需要確定提供方存在,實作了服務之間的高度解耦

1.1什麼是AMQP?

Advanced Message Queuing Protocol, 進階隊列協定,協定層規範,就像http協定隻要按照規範在任何平台都能用。RabbitMQ使用Erlang語言編寫,支援多種協定,高可用,支援消息叢集以及多語言用戶端等等。

1.1.1 AMQP 中包含的主要元素

  • 生産者(Producer):向Exchange釋出消息的應用。
  • 消費者(Consumer):從消息隊列queue中消費消息的應用。
  • 消息隊列(Message Queue):伺服器元件,用于儲存消息,直到發送給消費者。
  • Queue:消息載體;每個消息都會被投入到一個或多個隊列。
  • 消息(Message):傳輸的内容。
  • 交換器(exchange):路由元件,接收Producer發送的消息,并根據Routing Key轉發給消息隊列queue。
  • Routing Key:路由關鍵字,exchange根據這個Routing Key進行消息投遞到隊列queue。
  • 綁定器(Binding):把exchange和queue按照路由規則綁定起來。

1.1.2 exchange 與 Queue 的路由機制

生産者發消息不需要指定Queue,消費者可以指定Queue綁定到某個RoutingKey和某個Exchange,也可以不指定Queue,就隻根據某個Exchange和某個RoutingKey接受到消息。

Exchange 将消息發送到哪一個queue是由exchange type 和 Binding綁定規則決定的,目前常用的有3種exchange,Direct exchange, Fanout exchange, Topic exchange :

1.2 什麼是JMS

java message server 它通過統一的java api層面的标準使得多個消息用戶端來通過JMS互動,大部分消息中間件提供商都對JMS提供支援。比如ActiveMQ,JMS類似于JDBC,而這個ActiveMQ就像資料庫驅動,JMS是一個規範,是一個标準,而ActivMQ則是一個具體的實作。JMS包括2種消息釋出模型,點對點,釋出者訂閱者,隻支援java平台

2.為什麼要使用RabbitMQ?

  1. 在分布式系統下具備異步,削峰,負載均衡等一系列進階功能
  2. 擁有持久化的機制,程序消息,隊列中的資訊也可以儲存下來。
  3. 實作消費者和生産者之間的解耦
  4. 對于高并發場景下,利用消息隊列可以使得同步通路變為串行通路達到一定量的限流,利于資料庫的操作
  5. 可以使用消息隊列達到異步下單的效果,排隊中,背景進行邏輯下單

3.使用RabbitMQ的場景

  1. 服務間異步通信
  2. 順序消費
  3. 定時任務
  4. 請求削峰

4.RabbitMQ在springBoot中的使用

4.1 建立一個springBoot工程

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

選擇rabbitMQ依賴

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用
springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

4.2 配置rabbitMQ

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

4.3設定Exchage政策

RabbitMQ提供了4種不同的Exchange政策,分别是Direct, Fanout,Topic,Header

  • Direct:所有發送到DirectExchange的消息将會被轉發到RouteKey中指定的Queue
  • Fanout:所有發送到FanoutExchange的消息将會被轉發到與該Exchange綁定(Binding)的所有的Queue
  • Topic:所有發送到TopicExchange的消息被将會被轉發到所有關心Routekey指定Topic的Queue上,Exchange将RouteKey和某Topic進行模糊比對。
  • Header:取消了RouteKey,使用了header中的key/value比對隊列

4.3.1 Direct

Direct的定義是 所有發送到DirectExchange的消息将會被轉發到RouteKey中指定的Queue所有發送到DirectExchange的消息将會被轉發到RouteKey中指定的Queue

先建立靜态變量,用來記錄狀态的變化

package org.java.rabbitmq.base;


public class AmqpExchange {


    /**
     * 商品變化消息
     */
    public final static String GOODS_CHANGE = "GOODS_CHANGE";


    /**
     * 訂單狀态變化消息
     * 帶入庫的
     */
    public final static String ORDER_STATUS_CHANGE = "ORDER_STATUS_CHANGE";

    /**
     * 會員登入消息
     */
    public final static String MEMEBER_LOGIN = "MEMEBER_LOGIN";
}

           
springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

然後建立商品的消費者

package org.java.rabbitmq.receiver;

import org.java.rabbitmq.base.AmqpExchange;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;



@Component
public class GoodsChangeReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = AmqpExchange.GOODS_CHANGE + "_QUEUE"),
            exchange = @Exchange(value = AmqpExchange.GOODS_CHANGE)
    ))
    public void handler(String msg){
        System.out.println("商品變化資訊-------------"+msg);
    }


}

           

接下來在test裡面進行測試

package org.java.rabbitmq;

import org.java.rabbitmq.base.AmqpExchange;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend(AmqpExchange.GOODS_CHANGE + "_QUEUE","蘋果庫存-1");
    }




}

           

可以看到成功被消費

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

4.3.2 Fanout

Fanout的定義是 所有發送到FanoutExchange的消息将會被轉發到與該Exchange綁定(Binding)的所有的Queue

Fanout也是用的最多的,像建立一個訂單不僅僅 隻發送到一個Queue,所有與之關聯的都要

我們将type = ExchangeTypes 設定為Fanout,然後建立2個不同的Queue就行了

package org.java.rabbitmq.receiver;

import org.java.rabbitmq.base.AmqpExchange;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class OrderStatusChangeReceiver {
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = AmqpExchange.ORDER_STATUS_CHANGE + "_QUEUE_ORDER",name = ""),
                    exchange = @Exchange(value = AmqpExchange.ORDER_STATUS_CHANGE,type = ExchangeTypes.FANOUT)
            ))
    public void orderChange(String msg){
        System.out.println("訂單系統收到-------------"+msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = AmqpExchange.ORDER_STATUS_CHANGE + "_QUEUE_TRADE"),
                    exchange = @Exchange(value = AmqpExchange.ORDER_STATUS_CHANGE,type = ExchangeTypes.FANOUT)
            ))
    public void tradeChange(String msg){
        System.out.println("交易系統收到-------------"+msg);
    }

}

           
springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

接下來在測試類裡面測試

package org.java.rabbitmq;

import org.java.rabbitmq.base.AmqpExchange;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend(AmqpExchange.GOODS_CHANGE + "_QUEUE","蘋果庫存-1");
    }

    @Test
    public void test(){
        rabbitTemplate.convertAndSend(AmqpExchange.ORDER_STATUS_CHANGE,null,"訂單号:k9527,狀态:已建立");
    }



}

           

可以看到 與之相關的Queue 都被消費了

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

4.3.3 Topic

所有發送到TopicExchange的消息被将會被轉發到所有關心Routekey指定Topic的Queue上,Exchange将RouteKey和某Topic進行模糊比對。

定義說到需要将Routekey 進行模糊比對,可是去哪設定呢?于是 我點開了@QueueBinding的源碼,發現裡面有個key,這個應該就是RoutKey。

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用
springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

接下來再建立一個有關登入的消費者,跟之前一樣在@QueueBinding 添加一個key

package org.java.rabbitmq.receiver;

import org.java.rabbitmq.base.AmqpExchange;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MemeberChangeReceiver {

    @RabbitListener(
            bindings = @QueueBinding(
                    key = "#.pc.#",
                    value = @Queue(value = AmqpExchange.MEMEBER_LOGIN + "_QUEUE_1"),
                    exchange = @Exchange(value = AmqpExchange.MEMEBER_LOGIN,type = ExchangeTypes.TOPIC)
            ))
    public void orderChange(String msg){
        System.out.println("pc登入:-------------"+msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    key = "huawei.#",
                    value = @Queue(value = AmqpExchange.MEMEBER_LOGIN + "_QUEUE_2"),
                    exchange = @Exchange(value = AmqpExchange.MEMEBER_LOGIN,type = ExchangeTypes.TOPIC)
            ))
    public void orderChange2(String msg){
        System.out.println("華為登入:-------------"+msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    key = "iphone.#",
                    value = @Queue(value = AmqpExchange.MEMEBER_LOGIN + "_QUEUE_3"),
                    exchange = @Exchange(value = AmqpExchange.MEMEBER_LOGIN,type = ExchangeTypes.TOPIC)
            ))
    public void orderChange3(String msg){
        System.out.println("蘋果登入:-------------"+msg);
    }
}
           

其中xxxx.# 代表 以xxxx的開頭的

#.xxxx.# 代表 以包含xxxx的

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

在測試類進行測試

package org.java.rabbitmq;

import org.java.rabbitmq.base.AmqpExchange;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend(AmqpExchange.GOODS_CHANGE + "_QUEUE","蘋果庫存-1");
    }

    @Test
    public void test(){
        rabbitTemplate.convertAndSend(AmqpExchange.ORDER_STATUS_CHANGE,null,"訂單号:k9527,狀态:已建立");
    }
    @Test
    public void test1(){
        rabbitTemplate.convertAndSend(AmqpExchange.MEMEBER_LOGIN,"iphone.wap","會員編号:00001,狀态:登入");
        rabbitTemplate.convertAndSend(AmqpExchange.MEMEBER_LOGIN,"xiaomi.wap","會員編号:10001,狀态:登入");
        rabbitTemplate.convertAndSend(AmqpExchange.MEMEBER_LOGIN,"huawei.pc","會員編号:900001,狀态:登入");
    }


}

           

可以看到,結果與預期相符

springBoot整合RabbitMQ1.RabbitMQ是什麼?2.為什麼要使用RabbitMQ?3.使用RabbitMQ的場景4.RabbitMQ在springBoot中的使用

4.3.4 Header

這個政策用得比較少,就不講了。

繼續閱讀