1. 配置:
(1)最簡單的java配置法:
public class Test {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new CachingConnectionFactory();
((CachingConnectionFactory)connectionFactory).setUsername("guest");
((CachingConnectionFactory)connectionFactory).setPassword("guest");
((CachingConnectionFactory)connectionFactory).setHost("127.0.0.1");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
System.out.println(foo);
}
}
(2)使用springxml的配置方法:
private static void xmlConfig() {
ApplicationContext context = new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");//routingKey, message
String foo = (String) template.receiveAndConvert("myqueue");
System.out.println(foo);
}
xml配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="127.0.0.1"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
</beans>
(3)使用Java Configuration配置方法:
private static void configurationJava() {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
System.out.println(foo);
}
configuration配置類:
package com.cisco.estore.configuration;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("127.0.0.1");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
2. 基礎概念:
(1)目的:
元件解耦,Queue可以均攤分給不同消費者
(2)概念
A. ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
• Connection是RabbitMQ的socket連結,它封裝了socket協定相關部分邏輯。
• ConnectionFactory為Connection的制造工廠。
• Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、釋出消息等。
B. Exchange
C. Exchange Type
有fanout、direct、topic、headers這四種類型
Fanout: 所有發到Exchange的消息路由到所有與它綁定的Queue
Direct: 把消息路由到binding key和routing key完全比對的Queue中
如果routingkey=error發送到exchange,消息會發到queue1和queue2。如果routingkey=info則隻會發送到Queue2
Topic:
• routing key為一個句點号“. ”分隔的字元串(我們将被句點号“. ”分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
• binding key與routing key一樣也是句點号“. ”分隔的字元串
• binding key中可以存在兩種特殊字元“*”與“#”,用于做模糊比對,其中“*”用于比對一個單詞,“#”用于比對多個單詞(可以是零個)
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(隻會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都比對);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将會被丢棄,因為它們沒有比對任何bindingKey。
回執機制:
○ 用戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協定中定義了14中properties,這些屬性會随着消息一起發送)中設定兩個值replyTo(一個Queue名稱,用于告訴伺服器處理完成後将通知我的消息發送到這個Queue中)和correlationId(此次請求的辨別号,伺服器處理完成後需要将此屬性返還,用戶端将根據這個id了解哪條請求被成功執行了或執行失敗)
○ 伺服器端收到消息并處理
○ 伺服器端處理完消息後,将生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
○ 用戶端之前已訂閱replyTo指定的Queue,從中收到伺服器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理。