天天看点

Rabbit MQ 客户端发送消息1. 连接 Rabbit MQ2. 使用交换器和队列3. 发送消息 channel.basicPublish 方法

Rabbit MQ 客户端发送消息

  • 1. 连接 Rabbit MQ
  • 2. 使用交换器和队列
    • 2.1 Channel 接口的 API 方法重载
      • 2.1.1 关于 exchangeDeclare 方法
      • 2.1.2 关于 queueDeclare 方法
      • 2.1.3 关于 queueBind 方法
  • 3. 发送消息 channel.basicPublish 方法

1. 连接 Rabbit MQ

直接上核心代码:

@Component
public class RabbitMQConfig {

    /**
     * 注入配置文件属性
     */
    @Value("${spring.rabbitmq.addresses}")
    String addresses;//MQ地址
    @Value("${spring.rabbitmq.username}")
    String username;//MQ登录名
    @Value("${spring.rabbitmq.password}")
    String password;//MQ登录密码
    @Value("${spring.rabbitmq.virtual-host}")
    String vHost;//MQ的虚拟主机名
    @Value("${spring.rabbitmq.port}")
    String port;//MQ的虚拟主机名
    
    @Bean
    public Connection getConnection() throws Exception {
       ConnectionFactory factory = new ConnectionFactory();
       factory.setUsername(username);
       factory.setPassword(password);
       factory.setVirtualHost(vHost);
       factory.setHost(addresses);
       factory.setPort(port);
       Connection conn = factory.newConnection();    
       return conn;  
   }

}

           

也可以采用 URI 的方式来实现,实例代码如下:

ConnectionFactory factory = new ConnectionFactory();
   factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost");
   Connection conn = factory.newConnection();
   /** Connection 接口被用来创建一个 Channel , Channel 可以用来发送或接收消息*/ 
   Channel channel = conn.createChannel();  

           

注意要点:

       Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每个线程开辟一个 Channel 。某些情况下 Channel 的操作可以并发运行,但在其他情况下会导致在网络出现错误的通信帧交错,同时也会影响发送方确认(publisherconfirm)机制的运行,所以多线程间共享 Channel 实例是非线程安全的。

2. 使用交换器和队列

       交换器和队列是 AMQP 中 high-level 层面的构建模块,应用程序需确保在使用它们的时候就已经存在了,故在使用前需要先声明(declare)它们。

// 声明交换器 (非持久化的、非自动删除的、绑定类型为 direct 的交换器)
   channel.exchangeDeclare(exchangeName, "direct", true);
   // 队列名 (创建了一个非持久化的、排他的、自动删除的队列,该队列的名称是由 Rabbit MQ 自动生成的)
   String queueName = channel.queueDeclare().getQueue();
   // 使用路由键(routingKey)将队列和交换器绑定起来
   channel.queueBind(queueName, exchangeName, routingKey);

           

       以上声明的队列有以下特性:只对当前应用中同一个 Connection 层面可用,同一个 Connection 的不同 Channel 可共用,并且也会在应用连接断开时自动删除。

       如果要在应用中共享一个队列,可以做以下声明:

/** 声明一个持久化、非排他的、非自动删除的队列,此处的队列名称不是 Rabbit MQ 自动生成 */
   channel.exchangeDeclare(exchangeName, "direct", true);
   channel.queueDeclare(queueName, ture, false, false, null);
   channel.queueBind(queueName, exchangeName, routingKey);
   
           
       生产者和消费者都可以声明一个交换器或者队列。如果尝试声明一个已经存在的交换器或者队列,只需要声明的参数完全匹配现存的交换器或者队列, Rabbit MQ 就可以啥都不做,并成功返回。如果声明的参数不匹配则会抛出异常。

2.1 Channel 接口的 API 方法重载

       Channel 的 API 方法都是可以重载的,例如:exchangeDeclare、queueDeclare 等。根据参数不同,可以有不同的重载形式,可根据具体业务需要,选择性进行调用。

2.1.1 关于 exchangeDeclare 方法

exchangeDeclare 有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成。

/** 返回值 Exchange.DeclareOk 是用来标识成功声明了一个交换器 */
  Exchange.DeclareOk exchangeDeclare(Stirng exchange, String type, boolean durable,
                boolean autoDelete, boolean internal, Map<String, Oject> arguments)
                throws IOException;
                
           

上面方法中,各参数详细说明如下所述:

  • exchange:交换器的名称。
  • type:交换器的类型,常见的有 fanout、direct、topic.
  • durable:设置是否持久化。durable 设置为 true 时表示持久化,否则非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete:设置是否自动删除。 autoDelete 设置为 true 时表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。需要注意的是,不能错误的把这个参数理解为:“当与此交换器连接的客户端都断开时,Rabbit MQ 会自动删除本交换器”。
  • internal:设置是否是内置的。如果设置为 true ,则表示为内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如 alternate-exchange 等。

exchangeDeclare 的其他几个常用重载方法如下:(这里仅为部分重载方法)

  1. Exchange.DeclareOk exchangeDeclare (String exchange, String type) throws IOException;
  2. Exchange.DeclareOk exchangeDeclare (String exchange, String type, boolean durable) throws IOException;
  3. Exchange.DeclareOk exchangeDeclare (String exchange, String type, boolean durable, boolean autoDelete, Map<Stirng, Object> arguments) throws IOException;

关于删除交换器的方法:

  • Exchange.DeleteOk exchangeDelete (String exchange) throws IOException;
  • Exchange.DeleteOk exchangeDelete (String exchange, boolean ifUnused) throws IOException;
以上方法参数说明:
  • exchange:表示交换器名称。
  • ifUnused:用来设置是否在交换器没有被使用的情况下删除。如果 ifUnused 设置为 true,则只有在此交换器没有被使用的情况下才会被删除;若为 false 则无论如何这个交换器都要被删除。

2.1.2 关于 queueDeclare 方法

queueDeclare 方法只有两个重载方法:

/**该不带参数的方法,默认创建一个由Rabbit MQ命名的排他的、自动删除的、非持久化队列(匿名队列)*/
    Queue.DeclareOk queueDeclare() throws IOException;  

    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, 
             boolean autoDelete, Map<String, Object> arguments) throws IOException;

           

以下是 queueDeclare 方法中参数的详细说明:

  • queue:队列的名称。
  • durable:设置是否持久化。设置为 true 时代表持久化,持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
  • exclusive:设置是否排他。设为 true 时表示排他的,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。需要注意的是:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一个连接创建的排他队列;“首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭 或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  • autoDelete:设置是否自动删除。设为 true 时表示队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:不要错误的理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或没有消费者客户端与这个队列连接时,都不会自动删除该队列。
  • arguments:设置队列的其他一些参数。

要点:

       生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式后才能声明队列。

/** 该方法用来检测相应的队列是否存在,如果存在则正常返回,
    如果不存在则抛出异常:404 channel exception.同时,
    Channel 也会被关闭。 */
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

           

关于删除队列的方法:

Queue.DeleteOk queueDelete(String queue) throws IOException;
	
	Queue.DeleteOk queueDelete(String queue, boolean ifUnused,
	              boolean ifEmpty) throws IOException;

           
以上方法参数说明:
  • queue:表示队列名称。
  • ifUnused:用来设置是否在队列没有被使用的情况下删除。如果 ifUnused 设置为 true,则只有在此队列没有被使用的情况下才会被删除;若为 false 则无论如何这个队列都要被删除。
  • ifEmpty:设置为 true 时表示队列为空(队列里面没有任何消息堆积)的情况下才能被删除。
/** 该方法用来清空队列中的内容,但是不删除队列本身。 */
		Queue.PurgeOk queueOurge(String queue) throws IOException;

           

2.1.3 关于 queueBind 方法

queueBind 方法用于将交换器和队列进行绑定。以下是可以重载的方法:

Queue.BindOk queueBind(String queue, String exchange,
		    String routingKey) throws IOException;
		    
		Queue.BindOk queueBind(Stirng queue, String exchange, String routingKey, 
		    Map<String, Object> arguments) throws IOException;    
		
		void queueBindNoWait(String queue, String exchange, Stirng routingKey,
		    Map<String, Object> arguments) throws IOException;

           

以下是 queueBind 方法中参数的详细说明:

  • queue:队列名称。
  • exchange:交换器的名称。
  • routingKey:用来绑定队列和交换器的路由键。
  • arguments:定义绑定的一些参数。

将已经绑定的队列和交换器进行解绑:

Queue.UnbindOk queueUnbind(String queue, String exchange,
				    String routingKey) throws IOException;
				    
		Queue.UnbindOk queueUnbind(Stirng queue, String exchange,  
			  String routingKey, Map<String, Object> arguments) throws IOException; 

           

3. 发送消息 channel.basicPublish 方法

Rabbit MQ 发送消息使用 channel.basicPublish 方法,其有以下几个重载方法:

  • void basicPublish (String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  • void basicPublish (String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
  • void basicPublish (String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

以上方法具体参数说明如下:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 Rabbit MQ 默认的交换器中。
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列中。
  • props:消息的基本属性集。其包含14个属性成员:contentType、contentEncoding、headers(Map<String, Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId.
  • byte[] body:消息体(payload),真正需要发送的消息。
  • mandatory:该参数设置为 true 时,当交换器无法根据自身的类型和路由键找到一个符合条件的队列时,Rabbit MQ 会调用 Basic.Return 命令将消息返回给生产者。当设置为 false 时,出现上述情形时,消息直接被丢弃。
  • immediate:该参数设置为 true 时,如果交换器在将消息路由到队列时,发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回给生产者。Rabbit MQ 3.0 版本开始去掉了对 immediate 参数的支持。

示例1:

/** 发送一条简单的消息: Hello, World !!! */
	byte[] messageBodyBytes = "Hello, World !!!".getBytes();
	channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

           

示例2:

/** 发送一条简单的消息: Hello, World !!! */
	byte[] messageBodyBytes = "Hello, World !!!".getBytes();
	channel.basicPublish(exchangeName, routingKey,
	        new AMQP.BasicProperties.Builder())
	          .contentType("text/plain")
	          .deliveryMode(2)   //消息的投递模式设置为2,即消息会被持久化(存入磁盘)在服务器中
	          .priority(1)       //该消息的优先级设置为 1
	          .userId("hidden")
	          .build()),
	        messageBodyBytes); 
          
           

示例3:

/** 发送一条简单的消息: "Hello, World !!!",带有过期时间(expiration)  */
	byte[] messageBodyBytes = "Hello, World !!!".getBytes();
	channel.basicPublish(exchangeName, routingKey,
	        new AMQP.BasicProperties.Builder())
	          .expiration("60000")
	          .build()),
	        messageBodyBytes); 
          
           

示例4:

/** 发送一条简单的消息: "Hello, World !!!",带有 headers  */
	byte[] messageBodyBytes = "Hello, World !!!".getBytes();
	Map<String, Object> headers = new HashMap<String, Object>();
	headers.put("localtion", "here");
	headers.put("time", "today");
	channel.basicPublish(exchangeName, routingKey,
	        new AMQP.BasicProperties.Builder())
	          .headers(headers)
	          .build()),
	        messageBodyBytes); 
          
           

继续阅读