天天看點

RabbitMQ:消費端自定義監聽

消費端自定義監聽

之前的例子中,消費端實作監聽綁定的​

​Queue​

​​是否有準備好了的消息時,是通過一個​

​while​

​死循環來實作的,看起來很不優雅,如下所示:

// 接收消息
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }      

首先我們來看一下​

​basicConsume()​

​​函數,之前我們都是把​

​consumer​

​​(不是自定義的消費者)傳給​

​callback​

​​,是以實作消費端消費消息,都是在​

​while​

​​死循環裡面,通過​

​QueueingConsumer.Delivery​

​​來接收消息。如果想實作消費端自定義監聽,我們需要自己實作​

​Consumer​

​​接口,并且将該實作類的執行個體傳給​

​callback​

​​,但是由于​

​Consumer​

​​接口的方法太多了,實作起來太繁瑣,雖然需要實作的方法不多,但是代碼看起來也不簡潔,是以我們換一種方法,通過繼承​

​Consumer​

​​接口的實作類​

​DefaultConsumer​

​​,并且隻要重寫​

​handleDelivery()​

​方法即可。

/**
     * Start a non-nolocal, non-exclusive consumer, with
     * a server-generated consumerTag.
     * @param queue the name of the queue
     * @param autoAck true if the server should consider messages
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param callback an interface to the consumer object
     * @return the consumerTag generated by the server
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
     */
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;      

​DefaultConsumer​

​類源碼如下:

// Copyright (c) 2007-Present Pivotal Software, Inc.  All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2.  For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].


package com.rabbitmq.client;

import java.io.IOException;

/**
 * Convenience class providing a default implementation of {@link Consumer}.
 * We anticipate that most Consumer implementations will subclass this class.
 */
public class DefaultConsumer implements Consumer {
    /** Channel that this consumer is associated with. */
    private final Channel _channel;
    /** Consumer tag for this consumer. */
    private volatile String _consumerTag;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     * @param channel the channel to which this consumer is attached
     */
    public DefaultConsumer(Channel channel) {
        _channel = channel;
    }

    /**
     * Stores the most recently passed-in consumerTag - semantically, there should be only one.
     * @see Consumer#handleConsumeOk
     */
    public void handleConsumeOk(String consumerTag) {
        this._consumerTag = consumerTag;
    }

    /**
     * No-op implementation of {@link Consumer#handleCancelOk}.
     * @param consumerTag the defined consumer tag (client- or server-generated)
     */
    public void handleCancelOk(String consumerTag) {
        // no work to do
    }

    /**
     * No-op implementation of {@link Consumer#handleCancel(String)}
     * @param consumerTag the defined consumer tag (client- or server-generated)
     */
    public void handleCancel(String consumerTag) throws IOException {
        // no work to do
    }

    /**
     * No-op implementation of {@link Consumer#handleShutdownSignal}.
     */
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        // no work to do
    }

     /**
     * No-op implementation of {@link Consumer#handleRecoverOk}.
     */
    public void handleRecoverOk(String consumerTag) {
        // no work to do
    }

    /**
     * No-op implementation of {@link Consumer#handleDelivery}.
     */
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body)
        throws IOException
    {
            // no work to do
    }

    /**
    *  Retrieve the channel.
     * @return the channel this consumer is attached to.
     */
    public Channel getChannel() {
        return _channel;
    }

    /**
    *  Retrieve the consumer tag.
     * @return the most recently notified consumer tag.
     */
    public String getConsumerTag() {
        return _consumerTag;
    }
}      

生産端

package com.kaven.rabbitmq.api.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();


        // 4 發送消息
        for (int i = 0; i < 5; i++) {
            String msg = "RabbitMQ: consumer message" + i;
            channel.basicPublish(exchange , routingKey , null , msg.getBytes());
        }

        // 5 關閉連接配接
        channel.close();
        connection.close();
    }
}      

消費端

重點是​

​MyConsumer​

​​類,它來繼承​

​DefaultConsumer​

​類,實作消費端自定義監聽的邏輯。

package com.kaven.rabbitmq.api.consumer;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("------------ consumer message -----------");
        System.out.println("consumerTag:" + consumerTag);
        System.out.println("envelope:" + envelope);
        System.out.println("properties:" + properties);
        System.out.println("body:" + new String(body));
    }
}      
package com.kaven.rabbitmq.api.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 隊列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 建立Queue
        channel.queueDeclare(queueName , true , false , false , null);

        // 5 消費端開始消費資訊
        channel.basicConsume(queueName , true , new MyConsumer(channel));
    }
}      

測試

啟動生産端和消費端,消費端可以接收到消息,消息在​

​body​

​屬性裡面,如下所示:

------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message0
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=2, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message1
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=3, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message2
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=4, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message3
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=5, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message4