天天看點

AMQP簡介與RabbitMQ代碼實戰

一、AMQP簡介

AMQP(Advanced Message Queuing Protocol),進階消息隊列協定。一個提供統一消息服務的應用層标準進階消息隊列協定,面向消息的中間件設計。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

相比較于JMS規範,AMQP有以下優勢:

1. JMS定義的是API規範,而AMQP定義了線路層的協定。即JMS實作所發送的消息不能保證被另外不同的JMS實作使用;

而AMQP的線路層協定規範了消息的格式,這樣不僅能跨AMQP實作,還能跨語言和跨平台。

2. AMQP具有更加靈活和透明的消息模型。

JMS中隻有點對點和釋出-訂閱兩種模式;而AMQP通過将消息生産者與消息隊列解耦實作多種方式來發送消息。

下面介紹下AMQP時如何實作解耦的。

在JMS規範當中,有3個重要元素:

  • 消息生産者
  • 消息消費者
  • 消息通道(主題或隊列)
    AMQP簡介與RabbitMQ代碼實戰
    (JMS)

消息生産者将消息發送到通道中,消費者從通道中取出資料消費。這裡通道具有雙重責任:

1)解耦消息的生産者與消費者;

2)傳遞資料以及确定消息發送地方。

而在AMQP當中,在消息的生産者與通道之間引入了一種機制:Exchange(交換器),解耦了消息的生産者與隊列。

AMQP簡介與RabbitMQ代碼實戰

(AMQP)

可以看到,消息生産者将消息(帶有一個routing key參數)發送到Exchange上,Exchange會綁定一個或多個隊列上,然後Exchange根據不同的路由模式,對比隊列攜帶的routing key參數,負責将資訊發送到不同隊列上。

二、RabbitMQ簡介

RabbitMQ是一個開源的AMQP實作,伺服器端用Erlang語言編寫,支援多種用戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不俗。

1. RabbitMQ系統架構

AMQP簡介與RabbitMQ代碼實戰

(圖檔來源:http://blog.csdn.net/u013256816/article/details/59117354)

如上圖,紅色線包圍的有Exchange和Queue,在服務端,稱作Broker,是由RabbitMQ實作的。剩下的則是用戶端,有Prodcer和Consumer兩種類型。

2. RabbitMQ核心概念

RabbitMQ兩大核心元件是Exchange和Queue。

Queue

Queue是一個不重複,唯一,名字随機的的緩沖區,應用程式在其權限之内可以自由地建立、共享使用和消費消息隊列。

(在RabbitMQ中,隊列的名字是系統随機建立的,且當Consumer與Queue斷開連接配接的時候,Queue會被自動删除,在下一次連接配接時又會自動建立。)

AMQP簡介與RabbitMQ代碼實戰

上圖,兩個隊列的名字分别是“amqp.gen-RQ6…” 和 “amqp.gen-AsB..”,是随機産生的。

Exchange

Exchange稱作交換器,它接收消息和路由消息,然後将消息發送給消息隊列。每個交換器都有獨一無二的名字。

Routing Key

生産者在将消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則。

而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。

在Exchange Type與binding key固定的情況下,生産者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪裡。

RabbitMQ為routing key設定的長度限制為255 bytes。

Binding 和 Binding Key

AMQP簡介與RabbitMQ代碼實戰

每個Exchange都和一個特定的Queue綁定(可以是多對多的關系)。綁定的同時會指定一個binding key。

每個發送給Exchange的消息一般都有一個routing key參數;當隊列與Exchange綁定的binding key與該消息的routing key參數相同的時候,該消息才會被Exchange發給特定的隊列。

(這就好比我們上火車,我們就是消息,而手中的火車票就是routing key 。進站的時候,我們需要找到火車列次(binding key)與我們手中火車票資訊比對的車次才可以進站,即routing key = binding key才可以。)

Exchange Type

AMQP定義了4種不同類型的Exchange,每一種都有不同的路由算法。當消息發送到Exchange時,Exchange 會對比消息的routing key /參數 和 與其綁定的隊列的binding key。如果對比結果滿足相應的算法,那麼消息将會路由到隊列上;否則,将不會被路由到隊列上。

4種标準的AMQP Exchange 如下所示:

(1)Direct(直接式交換器):

如果消息的routing key 與 binding key 直接比對,消息會被路由到該隊列上(可以用此建構點對點傳輸模型)。

如圖:

AMQP簡介與RabbitMQ代碼實戰

生産者P發送消息到交換器X。

如果消息的routing key 是 “orange”,則會被路由到隊列Q1;

如果消息的routing key 是 “black” 或 “green”,則會被路由到隊列Q2。

當然,也可以實作多路綁定,即一個Exchange 和多個Queue綁定時可以有同樣的 binding key。

AMQP簡介與RabbitMQ代碼實戰

上圖中,當一個消息的routing key 是 “black”,則會被同時路由到隊列Q1和隊列Q2。

(2)Topic(主題式交換器):

如果消息的routing key 與 binding key 符合通配符比對的話,消息會路由到該隊列上。

比對規則:

  • binding key與routing key都是用句點号“. ”分隔的字元串:

    如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”;

  • 支援通配符:其中“*”用于比對一個單詞,“#”用于比對多個單詞(可以是零個) 。
AMQP簡介與RabbitMQ代碼實戰

上圖中:

  • routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2;
  • routingKey=”lazy.orange.fox”的消息會路由到Q1和Q2;
  • routingKey=”lazy.brown.fox”的消息會路由到Q2;
  • routingKey=”lazy.pink.rabbit”的消息會路由到Q2(隻會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都比對);
  • routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将會被丢棄,因為它們沒有比對任何bindingKey。

(3)Fanout(廣播式交換器):

不管消息的routing key是什麼,消息都會被路由到所有與該交換器綁定的隊列中。
AMQP簡介與RabbitMQ代碼實戰
上圖中,生産者(P)發送到Exchange(X)的所有消息都會路由到圖中的兩個Queue,并最終被兩個消費者(C1與C2)消費。

(4)Headers

headers類型的Exchange不依賴于routing key與binding key的比對規則來路由消息,而是根據發送的消息内容中的headers屬性進行比對。

在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全比對Queue與Exchange綁定時指定的鍵值對;如果完全比對則消息會路由到該Queue,否則不會路由到該Queue。

三、RabbitMQ實戰

(通過代碼可以發現,RabbitMQ與ActiveMQ最大差別:ActiveMQ更像個救濟站,隻負責将糧食(消息)發送出去,隻要有人來取即可;而RabbitMQ更像是郵局的Postman,負責将郵件(消息)投遞到指定人的手中)。

下面用Java實作RabbitMQ的幾個模式。

所有代碼參考RabbitMQ官網:https://www.rabbitmq.com/getstarted.html(官網給了很詳細的介紹,是學習RabbitMQ不二之選)。

1 預備工作:

首先在MAVEN官網導入RabbitMQ Java Client的依賴包;

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.2.0</version>
</dependency>
           

2 RabbitMQ的HelloWorld:

不使用Exchange,使用RabbitMQ實作消息的發送與接收:

Producer:

package com.wgs.rabbitmq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Producer,發送消息到Queue中
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQProducer {

    private static final String QUEUE_NAME = "RABBITMQ_HELLO";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //在本地機器建立socket連接配接
        connectionFactory.setHost("localhost");
        //建立socket連接配接
        Connection connection = connectionFactory.newConnection();

        //建立Channel,含有處理資訊的大部分API
        Channel channel = connection.createChannel();
        //聲明一個Queue,用來存放消息
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //消息内容
        String message = "hello, little qute rabbitmq!";
        //釋出消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        //釋出消息成功提示資訊
        System.out.println("RABBITMQ用戶端成功發送資訊:" +  message);

        //關閉連接配接
        channel.close();
        connection.close();


    }
}
           

Consumer:

package com.wgs.rabbitmq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Consumer端,從Queue中擷取消息,需要一直是監聽狀态。
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQConsumer {
    private static final String QUEUE_NAME = "RABBITMQ_HELLO";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //在本地機器建立socket連接配接
        connectionFactory.setHost("localhost");
        //建立socket連接配接
        Connection connection = connectionFactory.newConnection();

        /* 建立Channel,含有處理資訊的大部分API */
        Channel channel = connection.createChannel();
        //聲明一個Queue,用來擷取消息。QUEUE_NAME需要與Producer端相同
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //從隊列中異步擷取消息,DefaultConsumer會設定一個回調來緩存消息。
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Consumer擷取消息:" + message );
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);

    }
}
           

3 幾種常見的使用場景:

下面來看看RabbitMQ幾種常見的使用場景的代碼實作:

(1)Direct(直接式交換器):

Producer:

package com.wgs.rabbitmq.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Producer,發送消息到Queue中
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQProducer_Direct {

    private static final String EXCHANGE_NAME = "RABBITMQ_Direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //指定Exchange的Type = "direct"
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String routingKey1 = "error";
        String message1 = "error infomations....";
        String routingKey2 = "warning";
        String message2 = "warning infomations....";
        String routingKey3 = "info";
        String message3 = "info infomations....";

        //指定消息的路由參數:routingKey,并發送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey1, null, message1.getBytes());
        channel.basicPublish(EXCHANGE_NAME, routingKey2, null, message2.getBytes());
        channel.basicPublish(EXCHANGE_NAME, routingKey3, null, message3.getBytes());

        //釋出消息成功提示資訊
        System.out.println("RABBITMQ用戶端成功發送資訊:" +  message1);
        System.out.println("RABBITMQ用戶端成功發送資訊:" +  message2);
        System.out.println("RABBITMQ用戶端成功發送資訊:" +  message3);

        //關閉連接配接
        channel.close();
        connection.close();
    }
}
           

Consumer:

模拟了兩個Consumer,

一個 bindingKey 是”error“;、,隻能收到routing key 是 “error”的消息;

另一個是{“error”, “info”, “warning”},可以收到routing key 是 {“error”, “info”, “warning”}的消息。

Consumer1:

package com.wgs.rabbitmq.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Consumer,從Queue擷取消息
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQConsumer_Direct {

    private static final String EXCHANGE_NAME = "RABBITMQ_Direct";

    private static String bindingKey = "error";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        System.out.println(" ---【開始接收消息,退出請按CTRL+C】---");
        //RabbitMQConsumer_Topic consumer = new RabbitMQConsumer_Topic();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey =  delivery.getEnvelope().getRoutingKey();
            System.out.println(" Consumer1接收消息: '" + routingKey + "':'" + message + "'");
        }

    }
}
           

Consumer2:

package com.wgs.rabbitmq.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Consumer,從Queue擷取消息
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQConsumer2_Direct {

    private static final String EXCHANGE_NAME = "RABBITMQ_Direct";

    private static String[] bindingKeys = new String[]{"error", "info", "warning"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        System.out.println(">>>>" + queueName);

        //綁定:Exchange與Queue綁定
        for(String bindingKey : bindingKeys){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" ---【開始接收消息,退出請按CTRL+C】---");
        //RabbitMQConsumer_Topic consumer = new RabbitMQConsumer_Topic();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey =  delivery.getEnvelope().getRoutingKey();
            System.out.println(" Consumer2接收消息: '" + routingKey + "':'" + message + "'");
        }

    }
}
           

(2)Topic(主題式交換器):

Producer:

package com.wgs.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Producer,發送消息到Queue中
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQProducer_Topic {

    private static final String EXCHANGE_NAME = "RABBITMQ_Topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //指定Exchange的Type = "Topic"
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey1 = "AAA.orange.BBB";
        String message1 = "Q1 infomations....";
        String routingKey2 = "lazy.orange.fox";
        String message2 = "Q1,Q2 infomations....";
        String routingKey3 = "lazy.brown.fox";
        String message3 = "Q2 infomations....";

        //指定消息的路由參數:routingKey,并發送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey1, null, message1.getBytes());
        channel.basicPublish(EXCHANGE_NAME, routingKey2, null, message2.getBytes());
        channel.basicPublish(EXCHANGE_NAME, routingKey3, null, message3.getBytes());

        //釋出消息成功提示資訊
        System.out.println("RABBITMQ用戶端成功發送資訊:" +  message1);
        System.out.println("RABBITMQ用戶端成功發送資訊:" +  message2);
        System.out.println("RABBITMQ用戶端成功發送資訊:" +  message3);

        //關閉連接配接
        channel.close();
        connection.close();


    }
}
           

Consumer:

Topic模式下模拟了兩個Consumer,

一個 bindingKey 是”.orange.“,隻能收到routing key 比對“*.orange.”的消息;

另一個是{“..rabbit”, “lazy.#”},可以收到routing key 是比對{“..rabbit”, “lazy.#”}的消息。

Consumer1:

package com.wgs.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Consumer,從Queue擷取消息
 * Type:Topic
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQConsumer_Topic {

    private static final String EXCHANGE_NAME = "RABBITMQ_Topic";

    private static String bindingKey = "*.orange.*";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //queueName是随機産生的
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        System.out.println(" ---【開始接收消息,退出請按CTRL+C】---");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" Consumer1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);

    }
}
           

Consumer2:

package com.wgs.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Consumer,從Queue擷取消息
 * Type:Topic
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQConsumer2_Topic {

    private static final String EXCHANGE_NAME = "RABBITMQ_Topic";

    private static String[] bindingKeys = new String[]{"*.*.rabbit", "lazy.#"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        //列印出來的queueName:>>>>amq.gen-QjLNUuPTzIHuaBq-TBL6fQ,是随機産生的
        System.out.println(">>>>" + queueName);

        //綁定:Exchange與Queue綁定
        for(String bindingKey : bindingKeys){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" ---【開始接收消息,退出請按CTRL+C】---");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" Consumer1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);

    }
}
           

(3)Fanout(廣播式交換器):

Producer:

package com.wgs.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Producer
 * Type:Fanout
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQProducer_Fanout {

    public static final String EXCHANGE_NAME = "RABBITMQ_Fanout";

    public static void main(String[] args) throws Exception{

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String message = "This message is from Fanout mode.特點是Consumer均可擷取到消息";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        System.out.println("---【Producer發送消息】" + message + "---" );

        channel.close();;
        connection.close();
    }
}
           

Consumer:

兩個Consumer均能接收到消息:

consumer1:

package com.wgs.rabbitmq.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Consumer2
 * Type:Fanout
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQConsumer_Fanout {

    public static final String EXCHANGE_NAME = "RABBITMQ_Fanout";
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //擷取Queue随機名
        String queueName = channel.queueDeclare().getQueue();
        //Binding:綁定Queue與Exchange,此處沒有binding key。
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("---Consumer1 準備接收消息--");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" 【Consumer1 接收消息 】'" + message + "'");
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}
           

consumer2:

package com.wgs.rabbitmq.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Consumer2
 * Type:Fanout
 * Created by GenshenWang.nomico on 2017/11/2.
 */
public class RabbitMQConsumer2_Fanout {

    public static final String EXCHANGE_NAME = "RABBITMQ_Fanout";
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //擷取Queue随機名
        String queueName = channel.queueDeclare().getQueue();
        //Binding:綁定Queue與Exchange,此處沒有binding key。
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("---Consumer2 準備接收消息--");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" 【Consumer2 接收消息 】'" + message + "'");
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}
           

四、Spring整合RabbitMQ—使用RabbitTemplate

上述過程可以發現RabbitMQ發送接收消息的代碼相當繁瑣;類似于JMS,Spring AMQP提供RabbitTemplate來消除RabbitMQ發送和接收消息相關模闆代碼。

下面來看如何實作。

1 首先MAVEN導入相關依賴包:

<dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>4.2.0</version>
</dependency>
<dependency>
     <groupId>org.springframework.amqp</groupId>
     <artifactId>spring-rabbit</artifactId>
     <version>1.4.3.RELEASE</version>
</dependency>
           

2 配置檔案:spring_rabbitmq.xml

(這裡為了排版友善,我将Producer,Consumer的配置内容均放在一個檔案當中。)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 自動裝載bean使能-->
    <context:component-scan base-package="com.wgs.rabbitmq"/>
    <context:annotation-config/>

    <!--                           公共 配置                         -->
    <!--1 配置連接配接工廠-->
    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="host" value="127.0.0.1"></property>
        <property name="port" value="5672"></property>
        <property name="username" value="guest"></property>
        <property name="password" value="guest"></property>
    </bean>
    <!--  <rabbit:connection-factory id="rabbitConnectionFactory"
                                 host="localhost"
                                 port="5672"
                                 username="guest"
                                 password="guest"/>-->
    <!-- 2 Admin元素會建立一個RabbitMQ管理元件,producer中的exchange,queue會自動的利用該admin自動在spring中生成 -->
    <rabbit:admin  id="connAdmin" connection-factory="rabbitConnectionFactory"></rabbit:admin>

    <!--3 配置RabbitTemplate -->
    <rabbit:template id="amqpTemplate"  connection-factory="rabbitConnectionFactory" exchange="RABBITMQ_EXCHANGE_Direct"   />

    <!--                           Producer端 配置                         -->
    <!-- 1 聲明隊列 : autoDelete:是否自動删除 durable:持久化  -->
    <rabbit:queue name="RABBITMQ_Q1" durable="true" declared-by="connAdmin" auto-delete="false" exclusive="false" />
    <rabbit:queue name="RABBITMQ_Q2" durable="true" declared-by="connAdmin" auto-delete="false" exclusive="false" />

    <!-- 2 聲明Exchange、Binding -->
    <rabbit:direct-exchange name="RABBITMQ_EXCHANGE_Direct" declared-by="connAdmin"  durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="RABBITMQ_Q1" key="error"></rabbit:binding>
            <rabbit:binding queue="RABBITMQ_Q2" key="warning"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="rabbitMQProducerServiceImpl" class="com.wgs.rabbitmq.producer.RabbitMQProducerServiceImpl"></bean>

    <!--                          Consumer端 配置                         -->
   <bean id="rabbitMQConsumer1" class="com.wgs.rabbitmq.consumer.RabbitMQConsumer1"></bean>
   <bean id="rabbitMQConsumer2" class="com.wgs.rabbitmq.consumer.RabbitMQConsumer2"></bean>

    <rabbit:listener-container connection-factory="rabbitConnectionFactory" >
        <rabbit:listener queues="RABBITMQ_Q1"  ref="rabbitMQConsumer1"></rabbit:listener>
        <rabbit:listener queues="RABBITMQ_Q2"  ref="rabbitMQConsumer2"></rabbit:listener>
    </rabbit:listener-container>


</beans>
           

3 代碼實作

Producer:

(1)先寫一個接口:RabbitMQProducerService

package com.wgs.rabbitmq.producer;

import org.springframework.stereotype.Service;

/**
 * Producer
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/3.
 */
public interface RabbitMQProducerService {
    /**
     * 發送消息
     * @param routingKey
     * @param msg 發送的消息内容
     */
    void sendMessage(String routingKey, Object msg);
}

           

(2)接口的實作:RabbitMQProducerServiceImpl

package com.wgs.rabbitmq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * Producer
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/3.
 */
public class RabbitMQProducerServiceImpl implements  RabbitMQProducerService{

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendMessage( String routingKey, Object msg) {
        rabbitTemplate.convertAndSend(routingKey, msg);
    }
}
           

(3)測試:RabbitMQProducerMain

package com.wgs.rabbitmq.producer;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Producer Test
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/3.
 */
public class RabbitMQProducerMain {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring_rabbitmq.xml");
        RabbitMQProducerService service = context.getBean(RabbitMQProducerService.class);

        System.out.println("Producer開始發送消息。。。");

        service.sendMessage("error", "111 hello mq");
        service.sendMessage("warning", "222 hello mq");
        System.out.println("Producer消息發送成功。。。");

        context.close();

    }
}

           

Consumer:

Consumer實作監聽器類即可。這樣在監聽到有消息到達的時候即可顯示消息。

Consumer1:

package com.wgs.rabbitmq.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;

/**
 * Consumer
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/3.
 */
public class RabbitMQConsumer1 implements MessageListener{

    public void onMessage(Message message) {
        System.out.println("RabbitMQConsumer1接收到消息>>>" + message);
        System.out.println("RabbitMQConsumer1接收到消息 " + new String(message.getBody()));
    }

}
           

Consumer2:

package com.wgs.rabbitmq.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;

/**
 * Consumer
 * Type:Direct
 * Created by GenshenWang.nomico on 2017/11/3.
 */
public class RabbitMQConsumer2 implements MessageListener{

    public void onMessage(Message message) {
        System.out.println("RabbitMQConsumer2接收到消息 " + new String(message.getBody()));
    }
}
           

4 測試:

啟動RabbitMQProducerMain,發送消息,在Consumer端即可接收到消息:

接收到 的消息Message内容:

(Body:’111 hello mq’MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=RABBITMQ_EXCHANGE_Direct, receivedRoutingKey=error, deliveryTag=1, messageCount=0])

直接顯示消息的body即可,可以看到

AMQP簡介與RabbitMQ代碼實戰

在Type = Direct模式下,

routingKey = “error”的消息發送到Q1;routingKey = “warning”的消息發送到Q2。

2017/11/4 in NJ.

參考:

http://blog.csdn.net/u013256816/article/details/59117354

http://blog.csdn.net/u013256816/article/details/59117354