天天看點

springboot+ RabbitMq 初識rabbitmq消息隊列(一)初識RabbitMqwindows 安裝代碼實作

初識RabbitMq

應用場景

郵箱發送:使用者注冊後投遞消息到rabbitmq中,由消息的消費方異步的發送郵件,提升系統響應速度

流量削峰:一般在秒殺活動中應用廣泛,秒殺會因為流量過大,導緻應用挂掉,為了解決這個問題,一般在應用前端加入消息隊列。用于控制活動人數,将超過此一定閥值的訂單直接丢棄。緩解短時間的高流量壓垮應用。

訂單逾時:利用rabbitmq的延遲隊列,可以很簡單的實作訂單逾時的功能,比如使用者在下單後30分鐘未支付取消訂單。.

各中間件比較

.各種消息中間件性能的比較:

TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。

持久化消息比較—zeroMq不支援,activeMq和rabbitMq都支援。持久化消息主要是指:MQ down或者MQ所在的伺服器down了,消息不會丢失的機制。

可靠性、靈活的路由、叢集、事務、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統、社群—RabbitMq最好,ActiveMq次之,ZeroMq最差。

高并發—從實作語言來看,RabbitMQ最高,原因是它的實作語言是天生具備高并發高可用的erlang語言。

綜上所述:RabbitMQ的性能相對來說更好更全面,是消息中間件的首選。

主要元件說明:

Broker:它提供一種傳輸服務,它的角色就是維護一條從生産者到消費者的路線,保證資料能按照指定的方式進行傳輸,

Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。

Queue:消息的載體,每個消息都會被投到一個或多個隊列。

Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.

Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。

vhost:虛拟主機,一個broker裡可以有多個vhost,用作不同使用者的權限分離。

Producer:消息生産者,就是投遞消息的程式.

Consumer:消息消費者,就是接受消息的程式.

Channel:消息通道,在用戶端的每個連接配接裡,可建立多個channel.

windows 安裝

安裝Erlang

是以在安裝rabbitMQ之前,需要先安裝Erlang 。

配置好環境變量

在這裡插入圖檔描述

指令行輸入指令:erl ,驗證是否安裝成功

下載下傳rabbitMq

下載下傳運作rabbitmq-server-3.6.5 ,需要其他版本或者32位系統的,可以去官網下載下傳。

依舊可以不改變預設進行安裝。

需要注意:預設安裝的RabbitMQ 監聽端口是5672

然後是配置環境變量。

激活 RabbitMQ’s Management Plugin

使用RabbitMQ 管理插件,可以更好的可視化方式檢視Rabbit MQ 伺服器執行個體的狀态。

打開指令視窗:

輸入指令:

“D:\JavaApp\RabbitMq\RabbitMQ Server\rabbitmq_server-3.7.13\sbin\rabbitmq-plugins.bat” enable rabbitmq_management

啟動命名:

net stop RabbitMQ;

net start RabbitMQ

常用指令

rabbitmqctl.bat list_users 檢視使用者和權限

add_user xudong 123456 增加使用者

rabbitmqctl.bat set_user_tags xudong administrator 設定使用者權限

代碼實作

引入pom

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.2.RELEASE</version>
        </dependency>
           

配置rabbitConfig類

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by Administrator on 2019/3/22.
 * 定義隊列queue
 */
@Configuration
public class RabbitConfig {


    public static final String QUEUE_A = "QueueA";
    public static final String QUEUE_B = "QueueB";
    public static final String QUEUE_C = "QueueC";

    public static final String EXCHANGE_A = "exchangeA";
    public static final String EXCHANGE_B = "exchangeB";
    public static final String EXCHANGE_C = "exchangeC";

    public static final String ROUTINGKEY_A = "Routingkey_A";
    public static final String ROUTINGKEY_B = "Routingkey_B";
    public static final String ROUTINGKEY_C = "Routingkey_C";

    /**
     * 擷取交換機: EXCHANGE_A
     * 針對消費者配置交換機
     * 1. 設定交換機類型
     * 2. 将隊列綁定到交換機
     *  FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念
     *  HeadersExchange :通過添加屬性key-value比對
     *  DirectExchange:按照routingkey分發到指定隊列
     *  TopicExchange:多關鍵字比對
     */
    @Bean
    public DirectExchange getExchangeA() {
        return new DirectExchange(EXCHANGE_A);
    }
    /**
     * 擷取隊列 :QUEUE_A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //第一個參數隊列名稱,第二個隊列持久化,遇到當機不會丢失資料
    }

    /**
     * binding:将交換機和隊列通過路由關鍵字綁定,不綁定也可以,用預設的交換機
     * @return
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queueC()).to(getExchangeC()).with(RabbitConfig.ROUTINGKEY_C);
    }
    /**
     * 擷取隊列 :QUEUE_B
     * @return
     */
    @Bean
    public Queue queueB() {
        return new Queue(QUEUE_B, true); //第一個參數隊列名稱,第二個隊列持久化,遇到當機不會丢失資料
    }
    /**
     * 擷取隊列 :QUEUE_C
     * @return
     */
    @Bean
    public Queue queueC() {
        return new Queue(QUEUE_C, true); //第一個參數隊列名稱,第二個隊列持久化,遇到當機不會丢失資料
    }
    @Bean
    public DirectExchange getExchangeC() {
        return new DirectExchange(EXCHANGE_C);
    }

}
           

application.properties

##rabbitMq
spring.rabbitmq.username=xudong
spring.rabbitmq.password=123456
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

# 手動ACK 不開啟自動ACK模式,目的是防止報錯後未正确處理消息丢失 預設 為 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
           

消費者

package com.example.listener;

import com.example.config.RabbitConfig;
import com.example.model.Student;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * Created by Administrator on 2019/3/22.
 */
@Component
public class BookHandler {
    private static final Logger log = LoggerFactory.getLogger(BookHandler.class);

    /**
     * <p>TODO 該方案是 spring-boot-data-amqp 預設的方式,不太推薦。具體推薦使用  listenerManualAck()</p>
     * 預設情況下,如果沒有配置手動ACK, 那麼Spring Data AMQP 會在消息消費完畢後自動幫我們去ACK
     * 存在問題:如果報錯了,消息不會丢失,但是會無限循環消費,一直報錯,如果開啟了錯誤日志很容易就吧磁盤空間耗完
     * 解決方案:手動ACK,或者try-catch 然後在 catch 裡面講錯誤的消息轉移到其它的系列中去
     * spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * <p>
     *
     * @param stu 監聽的内容
     */
    @RabbitListener(queues = {RabbitConfig.QUEUE_B})
    public void listenerAutoAck(Student stu, Message message, Channel channel) {
        // TODO 如果手動ACK,消息會被監聽消費,但是消息在隊列中依舊存在,如果 未配置 acknowledge-mode 預設是會在消費完畢後自動ACK掉
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("[listenerAutoAck 監聽的消息] - [{}]", stu.toString());
            // TODO 通知 MQ 消息已被成功消費,可以ACK了
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            try {
                // TODO 處理失敗,重新壓入MQ
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

    @RabbitListener(queues = {RabbitConfig.QUEUE_C})
    public void listenerManualAck(Student stu, Message message, Channel channel) {
        log.info("[listenerManualAck 監聽的消息] - [{}]", stu.toString());
        try {
            // TODO 通知 MQ 消息已被成功消費,可以ACK了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            // TODO 如果報錯了,那麼我們可以進行容錯處理,比如轉移目前消息進入其它隊列
            try {
                // TODO 處理失敗,重新壓入MQ
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }
}
           

controller端添加生産消息

package com.example.controller;

import com.example.config.RabbitConfig;
import com.example.listener.BookHandler;
import com.example.model.Student;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;



/**
 * Created by Administrator on 2019/3/22.
 */
@Controller
@RequestMapping("/rabbigMq")
@Api(value="StudentRabbitMqController",description = "rabbitMq測試學生")
public class StudentRabbitMqController {
    private final RabbitTemplate rabbitTemplate;
    private static final Logger log = LoggerFactory.getLogger(StudentRabbitMqController.class);
    @Autowired
    public StudentRabbitMqController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); 對應 {@link BookHandler#listenerAutoAck}
     * this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); 對應 {@link BookHandler#listenerManualAck}
     */
    @ApiOperation(value = "rabbitmq消息隊列")
    @RequestMapping(value = "/stu",method = {RequestMethod.GET})
    @ResponseBody
    public String proMsg() {
        Student stu = new Student();
        stu.setId((short)1);
        stu.setName("學習mq的旭東");
        log.info("生産者生産消息:"+stu);
        this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_B, stu);
        this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_C, stu);
        return "success";
    }
}

           

啟動springboot,用swagger調用

繼續閱讀