消費端自定義監聽
之前的例子中,消費端實作監聽綁定的
Queue
是否有準備好了的消息時,是通過一個
while
死循環來實作的,看起來很不優雅,如下所示:
// 接收消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
首先我們來看一下
basicConsume()
函數,之前我們都是把
consumer
(不是自定義的消費者)傳給
callback
,是以實作消費端消費消息,都是在
while
死循環裡面,通過
QueueingConsumer.Delivery
來接收消息。如果想實作消費端自定義監聽,我們需要自己實作
Consumer
接口,并且将該實作類的執行個體傳給
callback
,但是由于
Consumer
接口的方法太多了,實作起來太繁瑣,雖然需要實作的方法不多,但是代碼看起來也不簡潔,是以我們換一種方法,通過繼承
Consumer
接口的實作類
DefaultConsumer
,并且隻要重寫
handleDelivery()
方法即可。
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback an interface to the consumer object
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
DefaultConsumer
類源碼如下:
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.client;
import java.io.IOException;
/**
* Convenience class providing a default implementation of {@link Consumer}.
* We anticipate that most Consumer implementations will subclass this class.
*/
public class DefaultConsumer implements Consumer {
/** Channel that this consumer is associated with. */
private final Channel _channel;
/** Consumer tag for this consumer. */
private volatile String _consumerTag;
/**
* Constructs a new instance and records its association to the passed-in channel.
* @param channel the channel to which this consumer is attached
*/
public DefaultConsumer(Channel channel) {
_channel = channel;
}
/**
* Stores the most recently passed-in consumerTag - semantically, there should be only one.
* @see Consumer#handleConsumeOk
*/
public void handleConsumeOk(String consumerTag) {
this._consumerTag = consumerTag;
}
/**
* No-op implementation of {@link Consumer#handleCancelOk}.
* @param consumerTag the defined consumer tag (client- or server-generated)
*/
public void handleCancelOk(String consumerTag) {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleCancel(String)}
* @param consumerTag the defined consumer tag (client- or server-generated)
*/
public void handleCancel(String consumerTag) throws IOException {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleShutdownSignal}.
*/
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleRecoverOk}.
*/
public void handleRecoverOk(String consumerTag) {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
// no work to do
}
/**
* Retrieve the channel.
* @return the channel this consumer is attached to.
*/
public Channel getChannel() {
return _channel;
}
/**
* Retrieve the consumer tag.
* @return the most recently notified consumer tag.
*/
public String getConsumerTag() {
return _consumerTag;
}
}
生産端
package com.kaven.rabbitmq.api.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 發送消息
for (int i = 0; i < 5; i++) {
String msg = "RabbitMQ: consumer message" + i;
channel.basicPublish(exchange , routingKey , null , msg.getBytes());
}
// 5 關閉連接配接
channel.close();
connection.close();
}
}
消費端
重點是
MyConsumer
類,它來繼承
DefaultConsumer
類,實作消費端自定義監聽的邏輯。
package com.kaven.rabbitmq.api.consumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------ consumer message -----------");
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
}
package com.kaven.rabbitmq.api.consumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 隊列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 建立Queue
channel.queueDeclare(queueName , true , false , false , null);
// 5 消費端開始消費資訊
channel.basicConsume(queueName , true , new MyConsumer(channel));
}
}
測試
啟動生産端和消費端,消費端可以接收到消息,消息在
body
屬性裡面,如下所示:
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message0
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=2, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message1
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=3, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message2
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=4, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message3
------------ consumer message -----------
consumerTag:amq.ctag-CebIvWwveIxajYqFBqoZmA
envelope:Envelope(deliveryTag=5, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: consumer message4