spring對RabbitMQ做了很好的內建,我們稱之為spring AMQP,其官方文檔寫得十分詳盡,文檔位址:https://docs.spring.io/spring-amqp/reference/htmlsingle/。
由于英文水準一般,看得是十分吃力,最後拉到了Sample Applications這一章,這一章主要是通過例子來講解spring-AMQP的使用方法。通過例子我們可以更加快速的了解spring對RabbitMQ的封裝。
1.Hello World同步接收消息例子
helloworld的例子展示了同步和異步的消息接收方式,可以從https://github.com/spring-projects/spring-amqp-samples這邊去下載下傳例子。
首先,建立一個配置檔案類,該類以@Configuration标注,說明它是一個配置檔案類,相當于替代.xml檔案以annotation的形式管理bean.如下所示。
package org.springframework.amqp.helloworld;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HelloWorldConfiguration {
protected final String helloWorldQueueName = "hello.world.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.1.195");
connectionFactory.setPort(5672);
connectionFactory.setUsername("xdx");
connectionFactory.setPassword("xxxxx");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
//The routing key is set to the name of the queue by the broker for the default exchange.
template.setRoutingKey(this.helloWorldQueueName);
//Where we will synchronously receive messages from
template.setQueue(this.helloWorldQueueName);
return template;
}
@Bean
// Every queue is bound to the default direct exchange
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
/*
@Bean
public Binding binding() {
return declare(new Binding(helloWorldQueue(), defaultDirectExchange()));
}*/
/*
@Bean
public TopicExchange helloExchange() {
return declare(new TopicExchange("hello.world.exchange"));
}*/
/*
public Queue declareUniqueQueue(String namePrefix) {
Queue queue = new Queue(namePrefix + "-" + UUID.randomUUID());
rabbitAdminTemplate().declareQueue(queue);
return queue;
}
// if the default exchange isn't configured to your liking....
@Bean Binding declareP2PBinding(Queue queue, DirectExchange exchange) {
return declare(new Binding(queue, exchange, queue.getName()));
}
@Bean Binding declarePubSubBinding(String queuePrefix, FanoutExchange exchange) {
return declare(new Binding(declareUniqueQueue(queuePrefix), exchange));
}
@Bean Binding declarePubSubBinding(UniqueQueue uniqueQueue, TopicExchange exchange) {
return declare(new Binding(uniqueQueue, exchange));
}
@Bean Binding declarePubSubBinding(String queuePrefix, TopicExchange exchange, String routingKey) {
return declare(new Binding(declareUniqueQueue(queuePrefix), exchange, routingKey));
}*/
}
該配置類會管理幾個主要的bean,分别是ConnectionFactory,AmqpAdmin,RabbitTemplate,Queue,Binding,TopicExchange。這些類都是spring對rabbitMQ中的類的抽象和封裝,以便用spring的方式來進行管理和操作這些元件。
RabbitTemplate:跟spring對許多其他工具的封裝類似,Spring AMQP也提供了一個模闆了,就是AmqpTemplate。這個模闆在整個Spring AMQP中扮演了發送和接收消息等最主要的角色。并且,它的抽象層級比較高,可以把它了解成接口層的,即它不僅僅隻是為RabbitMQ這種中間件提供的一個模闆,還可以為其他類似的中間件所用。但是,目前而言,它還隻有一個實作,那就是RabbitTemplate.
AmqpAdmin:用于管理Exchange,Queue,Binding這些對象。它的實作類是RabbitAdmin。
在上述配置檔案中,我們執行個體化一個RabbitTemplate,并且使用template.setRoutingKey(this.helloWorldQueueName);規定了生産者的消息會到達的地方,此時Exchange為預設的,routekey為hello.world.queue。然後使用template.setQueue(this.helloWorldQueueName);規定了消費者從哪裡(哪些queue)去取消息。
其實這個設定跟我們之前介紹的原生的RabbitMQ十分類似,隻不過spring安排RabbitTemplate這個模闆類對象替我們做了這些事情。
接下來,我們編寫生産者的程式。
package org.springframework.amqp.helloworld;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class Producer {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("send"+amqpTemplate);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
}
首先使用AnnotationConfigApplicationContext拿到HelloWorldConfiguration.class配置檔案所代表的spring容器,然後通過context.getBean(AmqpTemplate.class)來擷取RabbitTemplate對象,注意此處傳入的參數是Bean的Type,根據面向接口程式設計的思想,取得的是AmqpTemplate對象而不是RabbitTemplate。然後調用amqpTemplate的convertAndSend方法,顧名思義,該方法先将傳遞進來的參數進行轉換,将其他類型轉換為Spring-AMQP中的Message類型,然後發送到template所預設的Exchange和Routekey(即上述配置的)。
最後,我們編寫消費者實作類。
package org.springframework.amqp.helloworld;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class Consumer {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("consumer"+amqpTemplate);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
}
消費者與生産者共用一個AmqpTemplate,是以我們在AmqpTemplate的配置檔案中指定了routeKey和queue。然後我們調用了與生産者對應的鏡像方法receiveAndConvert,先接收到消息,再将消息轉換成普通的類型。
以上便是一個簡單的spring-amqp的同步的helloworld例子。分别運作生産者和消費者,可得到與原生RabbitMQ一樣的結果。
2.Hello World異步接收消息例子
上述例子為同步接收消息的例子,同步接收消息是一種輪詢的機制,消息接收器必須一直輪詢看消息池子即隊列中(queue)看有無消息,而異步接收消息則更為智能,消息接收器用一個監聽器,監聽器監聽到消息,再通知給消費者,讓他去處理。關于同步與異步的對比,可以檢視這篇文章。(轉)同步與異步,阻塞與非阻塞
在開始介紹這個例子之前,需要先了解以下幾個元件。
Message Listener:在異步消息接收機制中,一個很重要的元件被引入了,它就是message Listener,它是一個消息消費回調的容器,什麼是回調。請看這篇文章。(轉)了解回調函數,簡答來說,回調就是給被調用者一個讓它再調用的入口,這樣當事件觸發的時候,被調用者調用這個入口就可以了。這在異步通信中很常使用。Message Listener是所有消息監聽器的接口類,他隻有一個方法,void onMessage(Message message);
MessageListenerAdapter:如果你想讓消息傳遞API和程式業務邏輯之間更好的分離,可以使用MessageListenerAdapter這個類,它被廣泛的應用在Message-driven POJO(消息驅動pojo)。要執行個體化一個MessageListenerAdapter,你隻需要傳入一個被該MessageListenerAdapter執行個體調用的委托對象(delegate)。然後該監聽器就可以調用該委托對象的方法了(這就是一個回調函數)。預設被調用的方法名是handleMessage。當然你可以指定特定的方法。如下所示。
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
這樣,當有消息到來的時候,監聽器監聽到消息到來,就可以回調委托對象中的函數了。
在這裡,我們将主應用程式了解為調用者,而監聽器了解為被調用者,somePojo了解為回調對象,myMethod了解為回調函數。主應用程式先生成了一個監聽器對象,用于監聽消息,在調用監聽器的時候,在它内部放了一個回調函數,這樣當監聽器監聽到有消息的時候,就會去調用這個回調函數替主應用程式做它想做的事情。
Container:Container可以視作為Queue和Listener之間的橋梁,主要用它來管理監聽器的生命周期,并且定義監聽器去監聽哪些Queue中的消息。它的一個實作類是
SimpleMessageListenerContainer
。
鋪墊許久,現在來看異步監聽消息的例子吧。
1.首先,寫委托類的代碼,也就是回調函數的代碼。這是一個pojo,唯一需要注意的是,它的方法名稱為handleMessage。
package org.springframework.amqp.helloworld.async;
public class HelloWorldHandler {
//注意,預設監聽器調用的回調函數名字就是handleMessage
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
2.生産者配置程式
package org.springframework.amqp.helloworld.async;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
@Configuration
public class ProducerConfiguration {
protected final String helloWorldQueueName = "hello.world.queue";
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
System.out.println("sender:"+template);
return template;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.1.195");
connectionFactory.setPort(5672);
connectionFactory.setUsername("xdx");
connectionFactory.setPassword("xxxxx");
return connectionFactory;
}
@Bean
public ScheduledProducer scheduledProducer() {
return new ScheduledProducer();
}
@Bean
public BeanPostProcessor postProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
}
值得注意的是,這邊使用spring的任務管理器實作一個功能,就是每隔3秒鐘發一條消息,具體如何實作我們無需關心,因為這不是重點。連續發消息隻是為了更加明顯的驗證這邊的消費者是一個消息驅動的消費者(message-driven consumer)。
3.消費者配置程式
package org.springframework.amqp.helloworld.async;
import org.springframework.amqp.helloworld.HelloWorldConfiguration;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfiguration extends HelloWorldConfiguration {
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
}
消費者配置程式首先繼承自HelloWorldConfiguration,是以他擁有RabbitTemplate、connectionFactory等bean,接着,它定義一個SimpleMessageListenerContainer對象,并且注入connectionFactory和Queue,然後注入一個監聽器,這個監聽器的委托對象就是我們剛才建立的HelloWorldHandler對象,回調函數即為HelloWorldHandler。
接下來看看生産者程式和消費者程式。
package org.springframework.amqp.helloworld.async;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class Producer {
public static void main(String[] args) throws Exception {
new AnnotationConfigApplicationContext(ProducerConfiguration.class);
}
}
package org.springframework.amqp.helloworld.async;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class Consumer {
public static void main(String[] args) {
new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
}
}
可以看到除了載入配置檔案生成spring容器之外,并未做其他事情,這是因為生産者的@Scheduled标注的方法會自動執行,而消費者中的
SimpleMessageListenerContainer
也是一個跟spring容易有相同生命周期的元件,它預設也會自動執行。
運作這兩個main函數,可以看到消費者的控制台每隔3s列印一條資訊。

轉載于:https://www.cnblogs.com/roy-blog/p/8085391.html