天天看點

SpringBoot使用RabbitMQ消息隊列使用Direct模式

版權聲明:本文為部落客原創文章,遵循 CC 4.0 BY-SA 版權協定,轉載請附上原文出處連結和本聲明。

本文連結:https://blog.csdn.net/lizc_lizc/article/details/80722090

RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。消息中間件主要用于元件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

RabbitMQ是一個開源的AMQP實作,伺服器端用Erlang語言編寫,支援多種用戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不俗。

RabbitMQ基本概念

1.Message

消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。

2.Publisher

消息的生産者,也是一個向交換器釋出消息的用戶端應用程式。

3.Exchange

交換器,用來接收生産者發送的消息并将這些消息路由給伺服器中的隊列。

4.Binding

綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵将交換器和消息隊列連接配接起來的路由規則,是以可以将交換器了解成一個由綁定構成的路由表。

5.Queue

消息隊列,用來儲存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接配接到這個隊列将其取走。

6.Connection

網絡連接配接,比如一個TCP連接配接。

7.Channel

信道,多路複用連接配接中的一條獨立的雙向資料流通道。信道是建立在真實的TCP連接配接内地虛拟連接配接,AMQP 指令都是通過信道發出去的,不管是釋出消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,是以引入了信道的概念,以複用一條 TCP 連接配接。

8.Consumer

消息的消費者,表示一個從消息隊列中取得消息的用戶端應用程式。

9.Virtual Host

虛拟主機,表示一批交換器、消息隊列和相關對象。虛拟主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接配接時指定,RabbitMQ 預設的 vhost 是 / 。

10.Broker

表示消息隊列伺服器實體。

Exchange 類型

Exchange分發消息時根據類型的不同分發政策有差別,目前共四種類型:direct、fanout、topic、headers 。下面隻講前三種模式。

1.Direct模式

消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一緻, 交換器就将消息發到對應的隊列中。路由鍵與隊列名完全比對

2.Topic模式

topic 交換器通過模式比對配置設定消息的路由鍵屬性,将路由鍵和某個模式進行比對,此時隊列需要綁定到一個模式上。它将路由鍵和綁定鍵的字元串切分成單詞,這些單詞之間用點隔開。它同樣也會識别兩個通配符:符号“#”和符号“*”。#比對0個或多個單詞,*比對一個單詞。

3.Fanout模式

每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,隻是簡單的将隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。fanout 類型轉發消息是最快的。

SpringBoot整合RabbitMQ

pom.xml

中添加

spring-boot-starter-amqp

的依賴

<dependencies>
		<!-- 結合注解@ConfigurationProperties 将bean裡面的屬性和yml中的屬性自動綁定 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<!--RabbitMQ消息隊列-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
 
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
           

application.yml

中配置

rabbitmq

相關内容

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
           

使用Direct模式

1.配置隊列

package com.jeesite.modules.zh.rabbitmq.Direct;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Scope;

/**
 * 第一步
 * 配置隊列
 */
@Configuration
public class RabbitMQConfig {
    static final String QUEUE = "direct_queue";
    static final String MIAOSHA = "miao_sha";

    @Resource
    private RabbitConstants rabbitConstants;

    /**
     * Direct模式
     * @return
     */
    @Bean
    public Queue directQueue() {
        // 第一個參數是隊列名字, 第二個參數是指是否持久化
        return new Queue(QUEUE, true);
    }

    @Bean Queue miaoSha(){
        // 第一個參數是隊列名字, 第二個參數是指是否持久化
        return new Queue(MIAOSHA, true);
    }

}
           

2.建立一個User實體類

package com.lzc.rabbitmq.dataobject;
 
import lombok.Data;
import java.io.Serializable;
 
@Data
public class User implements Serializable {
 
    private static final long serialVersionUID = -1262627851741431084L;
 
    private String userId;
 
    private String name;
}

           

3.接收者

package com.jeesite.modules.zh.rabbitmq.Direct;


import com.jeesite.common.mapper.JsonMapper;
import com.jeesite.modules.zh.dao.ZhMsActivityDao;
import com.jeesite.modules.zh.entity.ZhMsActivity;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 第三步
 * 接收者
 */
@Component
@Slf4j
public class Receiver {

    @Autowired
    ZhMsActivityDao zhMsActivityDao;

    private static Logger logger = LoggerFactory.getLogger(JsonMapper.class);

    // queues是指要監聽的隊列的名字
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void receiverDirectQueue(User user) {
        logger.info("【receiverDirectQueue監聽到消息】"+user.toString());
    }

    //miao_sha是指要監聽的隊列名字
    @RabbitListener(queues = RabbitMQConfig.MIAOSHA)
    public void receiverDirectQueueMiaoShao(String str) {
        ZhMsActivity ms = new ZhMsActivity();
        ms.setId("1175976705438756864");
        ms = zhMsActivityDao.get(ms);
        if(ms.getSyNum()>0){
            ms.setSyNum(ms.getSyNum()-1);
            zhMsActivityDao.update(ms);
            logger.info("【receiverDirectQueueMiaoShao】"+str+"-----搶單成功");
        }else{
            logger.info("【receiverDirectQueueMiaoShao】"+str+"-----搶單失敗");
        }
    }


}


           

4.發送者

package com.jeesite.modules.zh.rabbitmq.Direct;

import com.jeesite.common.idgen.IdGen;
import com.jeesite.modules.zh.dao.ZhMsActivityDao;
import com.jeesite.modules.zh.entity.ZhMsActivity;
import com.jeesite.modules.zh.utils.AjaxBeanUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * 第五步
 * 測試,通路http://localhost:9999/f/sendDirectQueue,檢視日志輸出
 */
@RestController
@RequestMapping(value = "${frontPath}/direct/")
public class DirectTestController {

    @Autowired
    private Sender sender;
    @Autowired
    ZhMsActivityDao zhMsActivityDao;

    @GetMapping("/sendDirectQueue")
    public Object sendDirectQueue(){
        sender.sendDirectQueue();
        return "ok";
    }

    @GetMapping("/sendPhone")
    public Object sendPhone(){
        ZhMsActivity ms = new ZhMsActivity();
        ms.setId("1175976705438756864");
        ms = zhMsActivityDao.get(ms);
        if(ms.getSyNum()>0){
            sender.sendDirectMiaoShaPhone("模拟使用者"+IdGen.nextId());
            return "ok";
        }else {
            return "false";
        }
    }

}

           

6.日志輸出

2018-06-18 01:18:54.901  INFO 8772 --- [io-8080-exec-10] com.lzc.rabbitmq.config.Sender           : 【sendDirectQueue已發送消息】
2018-06-18 01:18:54.997  INFO 8772 --- [cTaskExecutor-1] com.lzc.rabbitmq.config.Receiver         : 【receiverDirectQueue監聽到消息】User(userId=123456, name=lizhencheng)
           

注意:發送者與接收者的Queue名字一定要相同,否則接收收不到消息

繼續閱讀