天天看點

RabbitMQ 交換器、持久化

 一、 交換器

  RabbitMQ交換器(Exchange)分為四種

  1.   direct       
  2.   fanout
  3.   topic
  4.   headers
  •  direct

   預設的交換器類型,消息的RoutingKey與隊列的bindingKey比對,消息就投遞到相應的隊列

  •  fanout

  一種釋出/訂閱模式的交換器,釋出一條消息時,fanout把消息廣播附加到fanout交換器的隊列上  

  接收類(訂閱): 

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//一旦建立exchange,RabbitMQ不允許對其改變,否則報錯
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");//綁定是交換器與隊列之間的關系,可以了解為,隊列對此交換器的消息感興趣

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}
           

  釋出類: 

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class ReceiveLog {

    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = "hi";

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
           
  • topic

  topic類似于fanout交換器,但更加具體化,用routingKey進行規則比對,更靈活的比對出使用者想要接收的消息

  routingKey形如:com.company.module.demo,具體比對規則:

    "*"與"#"可以比對任意字元,差別是"*"隻能比對由"."分割的一段字元,而"#"可以比對所有字元   

   釋出一條"com.abc.test.push"的消息,能比對的routingKey:

com.abc.test.*
#.test.push
#      

  不能比對的:

com.abc.*
*.test.push
*      

 釋出類:

  聲明隊列時,需要注意隊列的屬性,雖然隊列的聲明由消費者或生産者完成都可以,但如果由消費者聲明,由于生産者生産消息時,可能隊列還沒有聲明,會造成消息丢失,是以推薦由生産者聲明隊列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
import com.rabbitmq.client.ConnectionFactory;
 
import java.io.IOException;
 
public class RabbitMqSendTest {
    private static String queue = "test_queue";
    private static String exchange = "TestExchange";
    private static String routingKey = "abc.test";
    public static void main(String[] args) {
        ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        factory.setHost("172.16.67.60");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection mqConnection = null;
        try {
            mqConnection = factory.newConnection();
            Channel mqChannel = mqConnection.createChannel();
            if (null != mqChannel && mqChannel.isOpen()) {
                mqChannel.exchangeDeclare(exchange, "topic");
//                String queueName = mqChannel.queueDeclare().getQueue();
//                mqChannel.queueBind(queueName, exchange, routingKey);
//聲明隊列名稱與屬性
//durable持久隊列,mq重新開機隊列可恢複 exclusive獨占隊列,僅限于聲明它的連接配接使用操作
//autoDelete 自動删除 arguments 其他屬性
                mqChannel.queueDeclare(queue, false, false, false, null);
                mqChannel.queueBind(queue, exchange, routingKey);
 
                //*******************************************
                    mqChannel.basicPublish(exchange, routingKey, null,
                            ("hello").getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
               try {
                   mqConnection.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
        }
 
    }
}
           

接收類

import com.rabbitmq.client.*;
import java.io.IOException;

public class ReceiveTopic {
    private static String queue = "consume_queue";
    private static String exchange = "TestExchange";
    private static String routingKey = "*.test";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.67.60");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

//        channel聲明Exchange,名稱與類型
        channel.exchangeDeclare(exchange, "topic");
//        String queuename = channel.queueDeclare().getQueue();

        channel.queueDeclare(queue, false, false, false, null);
        channel.queueBind(queue, exchange, "*.test");      //消費者指定消息隊列,并選擇特定的RoutingKey
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer client = new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + msgString + "'");
            }
        };
        channel.basicConsume(queue, true,client);
        System.out.println();
    }
}
           

二、持久化

  RabbitMQ預設情況下重新開機消息伺服器時,會丢失消息,為了盡量保證消息在伺服器當機時不丢失,就需要把消息持久化,但是也隻是盡量不丢失,由于涉及磁盤寫入,當消息量巨大時,mq性能也會被嚴重拉低。

轉載于:https://www.cnblogs.com/castielangel/p/9952069.html