天天看點

RabbitMQ:交換機(default exchange)

首先看看​

​AMQP​

​​協定,對​

​RabbitMQ​

​的架構會更了解。

​​深入了解AMQP協定​​

建立一個​

​Maven​

​項目,根據自己伺服器RabbitMQ的版本導入相應的包。

RabbitMQ:交換機(default exchange)
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.6.5</version>
    </dependency>      

預設交換機

預設交換機(default exchange)實際上是一個由消息代理預先聲明好的沒有名字(名字為空字元串)的直連交換機(direct exchange)。

它有一個特殊的屬性使得它對于簡單應用特别有用處:那就是每個建立隊列(queue)都會自動綁定到預設交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。

舉個栗子:當你聲明了一個名為 “search-indexing-online” 的隊列,AMQP 代理會自動将其綁定到預設交換機上,綁定(binding)的路由鍵名稱也是為 “search-indexing-online”。是以,當攜帶着名為 “search-indexing-online” 的路由鍵的消息被發送到預設交換機的時候,此消息會被預設交換機路由至名為 “search-indexing-online” 的隊列中。換句話說,預設交換機看起來貌似能夠直接将消息投遞給隊列,盡管技術上并沒有做相關的操作。

生産端

package com.kaven.rabbitmq.exchange.defaultExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DefaultProducer {

    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立一個連接配接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 通過連接配接工廠建立連接配接
        Connection connection = connectionFactory.newConnection();

        // 3 通過連接配接建立一個Channel
        Channel channel = connection.createChannel();

        // 4 通過Channel 發送資料
        for (int i = 0; i < 5; i++) {
            String msg = "Hello RabbitMQ:" + i;
            // 1 default的exchange ,2 routingKey
            channel.basicPublish(exchange,routingKey, null , msg.getBytes());
        }

        // 5 關閉相關的連接配接
        channel.close();
        connection.close();
    }
}      

消費端

package com.kaven.rabbitmq.exchange.defaultExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DefaultConsumer {

    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // 定義的隊列名
    private static String queueName = "test";

    // default exchange
    private static String exchange = "";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立一個連接配接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 通過連接配接工廠建立連接配接
        Connection connection = connectionFactory.newConnection();

        // 3 通過連接配接建立一個Channel
        Channel channel = connection.createChannel();

        // 4 聲明(建立)一個隊列
        // 隊列名、持久化、獨占、自動删除、參數
        channel.queueDeclare(queueName , true,false , false , null);

        // 5 建立消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6 設定Channel
        // 隊列名、自動回複ACK(收到消息後)、消費者
        channel.basicConsume(queueName , true , queueingConsumer);

        // 7 擷取消息
        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費端:" + msg);
        }
    }
}      

測試一

先啟動消費端。

看看​​

​RabbitMQ Management​

​​。

有了一個新的​​

​Connection​

​​、​

​Channel​

​​、​

​Queue​

​​、​

​Consumer​

​。

RabbitMQ:交換機(default exchange)
RabbitMQ:交換機(default exchange)
RabbitMQ:交換機(default exchange)

隊列的​

​Name​

​顯然和我們定義的一樣。

RabbitMQ:交換機(default exchange)

我們再來啟動生産端。

消費端成功收到消息。

消費端:Hello RabbitMQ:0
消費端:Hello RabbitMQ:1
消費端:Hello RabbitMQ:2
消費端:Hello RabbitMQ:3
消費端:Hello RabbitMQ:4      

​RabbitMQ Management​

​​也會有反應,有顯示相應的​

​rate​

​。

RabbitMQ:交換機(default exchange)

測試二

先啟動生産端。

​RabbitMQ Management​

​​也會有反應,有相應的​

​Ready=5​

​等。

RabbitMQ:交換機(default exchange)

再啟動消費端。

也成功收到了消息。

消費端:Hello RabbitMQ:0
消費端:Hello RabbitMQ:1
消費端:Hello RabbitMQ:2
消費端:Hello RabbitMQ:3
消費端:Hello RabbitMQ:4      

​RabbitMQ Management​

​​也會有反應,有相應的​

​Ready=0​

​,因為消費端已經全部收到了。

RabbitMQ:交換機(default exchange)