天天看點

RabbitMQ消息隊列

RabbitMQ簡介

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實作

RabbitMQ消息隊列

核心概念

Message

消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成, 這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可 能需要持久性存儲)等。

Publisher

消息的生産者,也是一個向交換器釋出消息的用戶端應用程式。

Exchange類型

交換器,用來接收生産者發送的消息并将這些消息路由給伺服器中的隊列。

Exchange有4種類型:direct(預設),fanout,topic,和headers,不同類型的Exchange轉發消息的政策有所差別

Direct Exchange

消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一緻, 交換器就将消息發到對應的隊列中。路由鍵與隊列名完全比對,如果一個隊列綁定到交換機要求路由鍵為“dog”,則隻轉發 routingkey 标記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard” 等等。它是完全比對、單點傳播的模式。

RabbitMQ消息隊列

Fanout Exchange

每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,隻是簡單的将隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。fanout 類型轉發消息是最快的。

RabbitMQ消息隊列

Topic Exchange

topic 交換器通過模式比對配置設定消息的路由鍵屬性,将路由鍵和某個模式進行比對,此時隊列需要綁定到一個模式上。 它将路由鍵和綁定鍵的字元串切分成單詞,這些單詞之間用點隔開。它同樣也會識别兩個通配符:符号“#”和符号 。比對0個或多個單詞,比對一個單詞。

RabbitMQ消息隊列

Queue

消息隊列,用來儲存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直 在隊列裡面,等待消費者連接配接到這個隊列将其取走。

Binding

綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵将交換器和消息隊列連接配接起來的路由規則,是以可以将交 換器了解成一個由綁定構成的路由表。

Exchange和Queue的綁定可以是多對多的關系。

Connection

網絡連接配接,比如一個TCP連接配接。

Channel

信道,多路複用連接配接中的一條獨立的雙向資料流通道。信道是建立在真實的TCP連接配接内的虛拟連接配接,AMQP指令都是通過信道 發出去的,不管是釋出消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于作業系統來說建立和銷毀TCP都 是非常昂貴的開銷,是以引入了信道的概念,以複用一條TCP連接配接。

Consumer

消息的消費者,表示一個從消息隊列中取得消息的用戶端應用程式。

Virtual Host

虛拟主機,表示一批交換器、消息隊列和相關對象。虛拟主機是共享相同的身份認證和加 密環境的獨立伺服器域。每個 vhost 本質上就是一個mini版的RabbitMQ 伺服器,擁 有自己的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在連接配接時 指定,RabbitMQ 預設的vhost是/。

Broker

表示消息隊列伺服器實體

Docker安裝RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p  25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
           

通路15672端口

RabbitMQ消息隊列

https://www.rabbitmq.com/networking.html

SpringCloud整合RabbitMQ

引入RabbitMQ包

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
           

引入RabbitMQ,RabbitAutoConfiguration就會自動生效

給容器中自動配置了RabbitTemplate、AmqpAdmin等等

配置檔案

spring.rabbitmq.host=192.168.195.100
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
           

測試類

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void sendMessageTest() {
        //因為存到rabbit中是經過序列化的,是以加上配置轉成json發出去
        OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setCreateTime(new Date());
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
        log.info("消息發送成功");
    }

    @Test
    void createExchange() {
        //建立了一個Direct類型的交換機  是否持久化 是否自動删除
        DirectExchange directExchange=new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange建立成功");

    }

    @Test
    void createQueue() {
        Queue queue=new Queue("hello-java-Queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue建立成功");
    }

    @Test
    void createBinding() {
        //将exchange指定的交換機和Directnation目的地進行綁定,使用routingkey作為路由鍵
        Binding binding=new Binding("hello-java-Queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello-java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding綁定成功");
    }
}
           

先建立交換機,然後建立對隊列,綁定路由鍵,利用

rabbitTemplate

發送消息

RabbitMQ消息隊列

@RabbitListenter&@RabbitHandler接收消息

@RabbitListenter監聽消息

@RabbitListener(queues = {"hello-java-Queue"})
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel)
    {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
    }
           
RabbitMQ消息隊列

如果有多個用戶端,隻有一個會收到消息,并且隻有當一個消息處理完才會收到下一個消息

如果需要監聽一個隊列裡的多個消息,消息的類型都不一樣利用@RabbitHandler

監聽

hello-java-Queue

隊列裡不同的消息

@RabbitListener(queues = {"hello-java-Queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    @RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content)
    {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
    }

    @RabbitHandler
    public  void  recieveMessage2(OrderEntity orderEntity)
    {
        System.out.println("接收到消息内容:"+orderEntity);
    }

}
           

控制器

@Slf4j
@Controller
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMq")
    public String sendMessageTest() {
        for (int i = 0; i < 10; i++) {
            if(i%2==0) {
                //因為存到rabbit中是經過序列化的,是以加上配置轉成json發出去
                OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
                log.info("消息發送成功");
            }
            else {
                OrderEntity orderEntity=new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderEntity);
                log.info("消息發送成功");
            }
        }
        return  "";

    }
}
           
RabbitMQ消息隊列

RabbitMQ消息确認機制-可靠抵達

保證消息不丢失,可靠抵達,可以使用事務消息,性能下降250倍,為此引入确認機制

publisher confirmCallback 确認模式

publisher returnCallback 未投遞到 queue 退回模式

consumer ack機制

RabbitMQ消息隊列

可靠抵達-ConfirmCallback

如果要使用confirmCallback ,需要配置

#開啟發送端确認
spring.rabbitmq.publisher-confirm-type=correlated
           
  1. 在建立 connectionFactory 的時候設定 PublisherConfirms(true) 選項,開啟 confirmcallback 。
  2. CorrelationData:用來表示目前消息唯一性。
  3. 生産者隻要把消息發送給Broker,消息隻要被 broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調用 confirmCallback。
  4. 被 broker 接收到隻能表示 message 已經到達伺服器,并不能保證消息一定會被投遞到目标 queue 裡。是以需要用到接下來的 returnCallback 。
@PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 目前消息的唯一關聯資料(消息的唯一id)
            * @param ack 消息是否成功收到
            * @param cuase 失敗的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });
    }
           

可靠抵達-ReturnCallback

開啟發送消息抵達隊列的确認

spring.rabbitmq.publisher-returns=true
#隻要抵達隊列,以異步發動有限回調我們這個returnconfig
spring.rabbitmq.template.mandatory=true
           

隻有當消息沒有抵達隊列才會觸發方法

@PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 目前消息的唯一關聯資料(消息的唯一id)
            * @param ack 消息是否成功收到
            * @param cuase 失敗的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            //投遞失敗的詳細資訊 回複的狀态碼 回複的文本内容 當時這個消息給給哪個交換機 當時消息的路由鍵
            @Override
            public void returnedMessage(Message message, int replaycode, String replytext, String exchange, String routekey) {
                System.out.println("Fail...message"+message+",[replaycode]"+replaycode+",[replytext]"+replytext+",[exchange]"+exchange+",[routekey]"+routekey);
            }
        });
    }
           

可靠抵達-Ack消息确認機制

在不開啟手動确認的時候,發送消息突然伺服器關機會導緻消息丢失,是以需要開啟手動模式保證消息的可達性

#手動确認消息達到
spring.rabbitmq.listener.simple.acknowledge-mode=manual
           

消費者手動确認模式下 隻要沒有明确确認消息,就一直是unached狀态,即使關機 消息也不會丢失,會重新變為Ready

@RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //簽收消息
        channel.basicAck(deliveryTag,false);//true就是重新發回伺服器
        System.out.println("消息簽收"+deliveryTag);
    }


    @RabbitHandler
    public  void  recieveMessage2(Message message,OrderEntity orderEntity,Channel channel) throws IOException {

        System.out.println("接收到消息内容:"+orderEntity);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //簽收消息
        channel.basicNack(deliveryTag,false,true);// 退貨  true就是重新發回伺服器
        System.out.println("沒有簽收"+deliveryTag);
    }
           

消息處理成功,ack(),接受下一個消息,此消息broker就會移除

消息處理失敗,nack()/reject(),重新發送給其他人進行處理,或者容錯處理後ack

消息一直沒有調用ack/nack方法,broker認為此消息正在被處理,不會投遞給别人,此時用戶端斷開,消息不會被broker移除,會投遞給别人

如何簽收

channel.basicAck(deliveryTag,false) 簽收
channel.basicNack(deliveryTag,false,true); 拒簽