天天看點

RabbitMQ學習——常見概念詳解

文章目錄

  • ​​Exchange​​
  • ​​描述​​
  • ​​結構圖​​
  • ​​交換機屬性​​
  • ​​Direct Exchange​​
  • ​​Topic Exchange​​
  • ​​Fanout Exchange​​
  • ​​Binding​​
  • ​​Queue​​
  • ​​屬性​​
  • ​​Message​​
  • ​​屬性​​
  • ​​Virtual host - 虛拟主機​​
  • ​​項目代碼​​

Exchange

描述

用于接收消息,并根據路由鍵轉發消息到所綁定的隊列

結構圖

RabbitMQ學習——常見概念詳解

藍色框表示發送消息,然後消息通過路由關系路由到Queue1或Queue2

綠色框表示接收消息,消費者與隊列建立監聽,去消費消息

黃色框表示路由鍵綁定關系

交換機屬性

  • Name:交換機名稱
  • Type:交換機類型direct、topic、fanout、headers
  • Durablity:是否需要持久化 true|false
  • Auto Delete:當最後一個綁定到Exchange上的隊列删除後,自動删除該Exchange
  • Internal:目前Exchange是否用于RabbitMQ内部使用,預設為false
  • Arguments:擴充參數,用于擴充AMQP協定自制定使用

Direct Exchange

  • 所有發送到Direct Exchange的消息被轉發到RoutingKey中指定的Queue
RabbitMQ學習——常見概念詳解

如圖所示,路由到隊列名為Key的隊列中去了

生産者代碼:

public class Producer4DirectExchange {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();

        //聲明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";

        //發送
        String msg = "Hello, I am Producer4DirectExchange";
        channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
    }
}      

消費者代碼:

public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();
        //聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";

        //聲明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);

        //聲明一個隊列
        channel.queueDeclare(queueName,false,false,false,null);

        //建立一個綁定關系
        channel.queueBind(queueName,exchangeName,routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName,true,consumer);
        while (true) {
            //阻塞擷取消息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:"+msg);
        }


    }      

公共類:

public class ConnectionUtil {
    public static final String MQ_HOST = "192.168.222.101";
    public static final String MQ_VHOST = "/";
    public static final int MQ_PORT = 5672;

    public static Connection getConn() {
        //1. 建立一個ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(MQ_HOST);//配置host
        connectionFactory.setPort(MQ_PORT);//配置port
        connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

        //2. 通過連接配接工廠建立連接配接
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}      

登入控制台,可以看到名為​

​test_direct_exchange​

​​的交換機通過路由鍵​

​test.direct​

​​綁定到​

​test_direct_queue​

RabbitMQ學習——常見概念詳解

Topic Exchange

  • 所有發送到Topic Exchange的消息都被轉發到所有關心RoutingKey中指定Topic的隊列上
  • Exchange将RoutingKey和某Topic進行模糊比對,此時隊列需要綁定一個Topic

可以使用通配符進行模糊比對:

​​

​#​

​​ 比對一個或多個詞(單詞的意思,不是一個字元)

​​

​*​

​​比對一個詞

​​

​log.#​

​​ 能比對到"log.info.a"

​​

​log.*​

​ 隻比對"log.error"
RabbitMQ學習——常見概念詳解

如上圖,比如,user.news和user.weather能路由到第一個隊列

生成者代碼:

public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();

        //聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.find.abc";


        //發送
        String msg = "Hello, I am Producer4TopicExchange";
        channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
        channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
        channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());

        CloseTool.closeElegantly(channel,connection);
    }
}      

消費者代碼:

public class Consumer4TopicExchange {
    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();
        //聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";

        //聲明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);

        //聲明一個隊列
        channel.queueDeclare(queueName,false,false,false,null);

        //建立一個綁定關系
        channel.queueBind(queueName,exchangeName,routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName,true,consumer);
        while (true) {
            //阻塞擷取消息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:"+msg);
        }

    }
}      

先啟動消費者,然後發現下面兩個綁定關系:

RabbitMQ學習——常見概念詳解

隊列

RabbitMQ學習——常見概念詳解

再啟動生産者,從消費者端可以看到如下輸出:

收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange      

說明3條消息都收到了,接下來,我們改一下消費者的路由鍵,改為:​

​String routingKey = "user.*";​

再次啟動

收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange      

奇怪,怎麼還是收到3條,帶着疑問我們去看控制台

RabbitMQ學習——常見概念詳解

可以看到,之前的路由規則綁定還在。是以可以解釋為啥能收到3條。

點選unbind解綁"user.#"

然後繼續操作一把,檢視輸出

收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
      

此時,隻收到兩條了。

Fanout Exchange

  • 不處理路由鍵,隻需要簡單的将隊列綁定到交換機上
  • 發送到交換機的消息都會被轉發到與該互動機綁定的所有隊列上
  • 轉發消息是最快的
RabbitMQ學習——常見概念詳解

消息不走路由鍵,隻要隊列與交換機有綁定關系就能收到。

生産者:

public class Producer4FanoutExchange {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();

        //聲明
        String exchangeName = "test_fanout_exchange";
        String routingKey = "nothing";

        //發送
        String msg = "Hello, I am Producer4FanoutExchange";
        channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());

        CloseTool.closeElegantly(channel,connection);
    }
}      

消費者:

public class Consumer4FanoutExchange {
    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();
        //聲明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";//不設定路由鍵

        //聲明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        //聲明一個隊列
        channel.queueDeclare(queueName,false,false,false,null);
        //建立一個綁定關系
        channel.queueBind(queueName,exchangeName,routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數:隊列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName,true,consumer);
        while (true) {
            //阻塞擷取消息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:"+msg);
        }



    }
}      
RabbitMQ學習——常見概念詳解

RabbitMQ學習——常見概念詳解

Binding

  • Exchange和Exchange、Queue之間的連接配接關系
  • Binding中可以包含RoutingKey或參數

Queue

  • 消息隊列,實際存儲消息

屬性

  • Durability:是否持久化
  • Auto delete: 若為yes,代表當最後一個監聽被移除後,該Queue會自動被删除

Message

  • 伺服器和應用程式之間傳遞的資料
  • 本質就是一段資料,由Properties和Payload(Body)組成

屬性

  • delivery mode
  • headers(自定義屬性放到這裡面)
  • content_type
  • content_encoding
  • priority
  • correlation_id
  • reply_to:指令消息失敗傳回的隊列
  • expiration:過期時間
  • message_id:消息ID

Producer:

public class Producer {
    public static final String MQ_HOST = "192.168.222.101";
    public static final String MQ_VHOST = "/";
    public static final int MQ_PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立一個ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(MQ_HOST);//配置host
        connectionFactory.setPort(MQ_PORT);//配置port
        connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

        //2. 通過連接配接工廠建立連接配接
        Connection connection = connectionFactory.newConnection();
        //3. 通過connection建立一個Channel
        Channel channel = connection.createChannel();

        Map<String,Object> headers = new HashMap<>();
        headers.put("var1","abc");
        headers.put("var2","sdd");
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2) //2:持久化投遞;1:非持久化(未消費的消息重新開機後就沒了)
                .contentEncoding("UTF-8")
                .expiration("5000")//5s
                .headers(headers)
                .build();

        //4. 通過Channel發送資料
        for (int i = 0; i < 10; i++) {
            String message = "Hello" + i;
            //exchange為"",則通過routingKey取尋找隊列
            channel.basicPublish("","testQueue",properties,message.getBytes());
        }

        //5. 關閉連接配接
        channel.close();
        connection.close();

    }
}      
public class Consumer {


    public static final String QUEUE_NAME = "testQueue";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConn();

        //3. 通過connection建立一個Channel
        Channel channel = connection.createChannel();

        //4. 聲明(建立)一個隊列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //5. 建立消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //6. 設定Channel
        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);

        int num = 0;
        //7. 擷取消息
        while (true) {
            //nextDelivery 會阻塞直到有消息過來
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("收到:" + message);
            Map<String,Object > headers = delivery.getProperties().getHeaders();
            System.out.println("headers get va1 :" + headers.get("var1"));
        }


    }
}      

Virtual host - 虛拟主機

  • 虛拟位址,用于進行邏輯隔離,最上層的消息路由
  • 一個Virtuall Host裡面可以有若幹個Exchange和Queue
  • 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue

項目代碼