天天看點

Rabbit MQ 上篇

1.今日内容
1.Rabbitmq介紹
2.Rabbitmq優勢以及劣勢介紹
3.Rabbitmq架構
4.Rabbitmq管控台介紹
5.Rabbitmq五種發送消息的模式代碼實作
           

一 RabbitMq介紹

Rabbit Mq ——》 Message Queue : 消息隊列

隊列特點:先進先出

生産者:發送消息方稱為是生産者,誰生産消息誰就是生産者

消費者:監聽消息(隊列),監聽到後拉取下來進行消費,稱為是消費者

二 Rabbitmq優劣勢

Rabbit MQ 上篇
1. 優勢
  • 應用解耦

    生産者和消費者之間進行了應用的解耦

  • 異步提速

    使用前提是系統為異步調用系統

Rabbit MQ 上篇
  • 削峰填谷
Rabbit MQ 上篇
Rabbit MQ 上篇

通過削峰填谷保證系統的穩定性

2. 劣勢
  • 系統穩定性下降
  • 資料一緻性問題
    • 消息丢失---->消息确認
    • 消息重複---->消息幂等性問題
3. 使用場景
  1. 系統必須支援異步通信

三 MQ常見産品

  • RabbitMq
    • 使用erlang語言進行開發,效率高,吞吐量很大
    • 屬于rabbit公司
  • RocketMq
    • 使用java開發,脫胎于kafka(來源、模仿)
    • 屬于阿裡,目前已經捐獻給apache,稱為了頂級項目
  • ActiveMQ
    • 是一款很古老的mq,目前使用的較少
    • 屬于apache
  • Kafka
    • 屬于apache
    • 用于大資料環境使用

四 Rabbitmq架構

Rabbit MQ 上篇

注意事項:

  • Exchange交換機不具備儲存消息的功能,需要與隊列完成綁定
  • 綁定後符合規則的消息會路由至隊列進行儲存,不符合路由規則的消息或者是沒有綁定隊列的交換機消息會丢失

五 Rabbitmq管控台介紹

六 Rabbitmq五種發送消息的模式代碼實作

https://www.rabbitmq.com/getstarted.html

1. 簡單模式(hello world)
Rabbit MQ 上篇

步驟:

  1. 建立項目
    • web/lombok/rabbitmq
  2. 編寫配置檔案
    • #mq配置
      spring:
        rabbitmq:
          host: 192.168.200.132
                 
  3. 建立隊列----》Java Bean
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //聲明是一個配置類
    public class QueueConfig {
        
        /**
         * @Author: guodong 
         * @Date: 11:33 2020/9/26
         * @Parms []
         * @ReturnType: org.springframework.amqp.core.Queue
         * @Description: 建立itheima-31隊列
        */
        @Bean
        public Queue queue(){
            return new Queue("itheima-31");
        }
    
    }
    
               
  4. 建立生産者發送消息
    @RestController
    @RequestMapping("hello")
    @Slf4j
    public class HelloController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("sendMsg/{msg}")
        public void sendMsg(@PathVariable String msg){
            /*
                第一個參數是routingkey: 路由鍵,預設是隊列名稱
                第二個參數是發送的消息
             */
            rabbitTemplate.convertAndSend("itheima-31", msg);
    
            log.info("發送消息完畢 : {}" , msg);
        }
    }
    
               
  5. 建立消費者監聽消息
    @RabbitListener(queues = "itheima-31") //聲明監聽的消息隊列
    @Slf4j
    @Component
    public class ConsumerListener {
    
        @RabbitHandler //轉換消息
        public void receiveMsg(String msg){
            log.info("消費者接收到消息: {}" , msg);
        }
    
    }
               
2. 工作隊列模式(work queue)
Rabbit MQ 上篇

步驟:

  1. 複制粘貼,需要修改日志輸出,以便檢視是那個消費者接收到消息

小結:

  • 工作隊列模式特點:
    • 消費者之間是競争關系
    • 消費者之間會進行輪訓接收消息
    • 以上兩種模式共有的特點是不需要聲明交換機,但是使用的是預設的交換機。
3. 釋出訂閱模式
  • Fanout(廣播類型交換機)
Rabbit MQ 上篇

步驟:

  1. 建立fanout類型交換機
  2. 建立fanout.A、fanout.B、fanout.C
  3. 完成三個隊列與交換機的綁定
  4. 發送消息
  5. 編寫三個消費者

代碼實作:

  • 建立隊列+交換機并且完成綁定
    /**
         * @Author: guodong 
         * @Date: 12:06 2020/9/26
         * @Parms []
         * @ReturnType: org.springframework.amqp.core.FanoutExchange
         * @Description: 建立fanout類型交換機
        */
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("itheihei");
        }
        /*
            bean在建立時預設的bean name是方法名
         */
        @Bean
        public Queue fanoutA(){
            return new Queue("fanout.A");
        }
        @Bean
        public Queue fanoutB(){
            return new Queue("fanout.B");
        }
        @Bean
        public Queue fanoutC(){
            return new Queue("fanout.C");
        }
    
        /*
            交換機與隊列完成綁定
         */
        @Bean
        public Binding bindFanoutA(FanoutExchange fanoutExchange, Queue fanoutA){
            return BindingBuilder.bind(fanoutA).to(fanoutExchange);
        }
    
        @Bean
        public Binding bindFanoutB(FanoutExchange fanoutExchange, Queue fanoutB){
            return BindingBuilder.bind(fanoutB).to(fanoutExchange);
        }
        @Bean
        public Binding bindFanoutC(FanoutExchange fanoutExchange, Queue fanoutC){
            return BindingBuilder.bind(fanoutC).to(fanoutExchange);
        }
               
  • 發送消息
    @GetMapping("sendMsgEx/{msg}")
        public void sendMsgEx(@PathVariable String msg){
            /*
                第一個參數是routingkey: 路由鍵,預設是隊列名稱
                第二個參數是發送的消息
             */
            rabbitTemplate.convertAndSend("itheihei", "", msg);
    
            log.info("發送廣播類型消息完畢 : {}" , msg);
        }
               
  • 接收消息
    @RabbitListener(queues = "fanout.A") //聲明監聽的消息隊列
    @Slf4j
    @Component
    public class FanoutListenerA {
    
        @RabbitHandler //轉換消息
        public void receiveMsg(String msg){
            log.info("fanoutA 接收到消息: {}" , msg);
        }
    }
               

小結:

  • 廣播類型消息特點:
    • 需要聲明fanout類型交換機
    • 需要完成交換機與隊列的綁定
    • 發送的消息可以被所有完成綁定的隊列接收到,形成廣播類型的消息
  • Direct(路由鍵模式/點對點模式)
Rabbit MQ 上篇

步驟:

  • 建立交換機
  • 建立隊列
  • 完成隊列與交換機的綁定并且設定路由鍵

小結:

  • direct模式的特點:
    • 需要聲明direct類型的交換機
    • 完成隊列與交換機的綁定并且需要設定路由鍵
    • 在發送消息時需要指定路由鍵,隻有符合規則(完全比對)的路由鍵才可以将消息從交換機路由至隊列,
    • 如上圖所示如果routingKey為itheima則A和B全部可以接收到消息,如果routingKey為itcast則隻有B隊列可以接收到消息
  • Topic(主題模式/通配符模式)
Rabbit MQ 上篇

步驟:

  • 建立交換機
  • 建立隊列
  • 完成隊列與交換機的綁定并且設定路由鍵

小結:

  • topic模式特點:
    • 需要聲明direct類型的交換機
    • 完成隊列與交換機的綁定并且需要設定路由鍵
    • 在發送消息時需要指定路由鍵,隻有符合規則才會路由至隊列
    • 規則:
      • “*” : 不多不少正好比對一個詞
      • “#”: 比對大于等于0個詞
      • 究竟有多少個詞是用 "."來判斷的
      • 上圖所示的案例中并且必須以itheima作為第一個詞存在