天天看點

RabbitMQ 配置和基本原理

    1. 配置:

    (1)最簡單的java配置法:        

public class Test {
            public static void main(String[] args) {
                ConnectionFactory connectionFactory = new CachingConnectionFactory();
                ((CachingConnectionFactory)connectionFactory).setUsername("guest");
                ((CachingConnectionFactory)connectionFactory).setPassword("guest");
                ((CachingConnectionFactory)connectionFactory).setHost("127.0.0.1");
                
                AmqpAdmin admin = new RabbitAdmin(connectionFactory);
                admin.declareQueue(new Queue("myqueue"));
                AmqpTemplate template = new RabbitTemplate(connectionFactory);
                template.convertAndSend("myqueue", "foo");
                String foo = (String) template.receiveAndConvert("myqueue");
                System.out.println(foo);
            }
        }
           

    (2)使用springxml的配置方法:        

private static void xmlConfig() {
            ApplicationContext context =  new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
            AmqpTemplate template = context.getBean(AmqpTemplate.class);
            template.convertAndSend("myqueue", "foo");//routingKey, message
            String foo = (String) template.receiveAndConvert("myqueue");
            System.out.println(foo);
        }
           

         xml配置:     

<beans xmlns="http://www.springframework.org/schema/beans"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:rabbit="http://www.springframework.org/schema/rabbit"
               xsi:schemaLocation="http://www.springframework.org/schema/rabbit
                   http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
                   http://www.springframework.org/schema/beans
                   http://www.springframework.org/schema/beans/spring-beans.xsd">
        
            <rabbit:connection-factory id="connectionFactory"/>
        
            <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
        
            <rabbit:admin connection-factory="connectionFactory"/>
        
            <rabbit:queue name="myqueue"/>
        
            <bean id="connectionFactory"
                  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
                <constructor-arg value="127.0.0.1"/>
                <property name="username" value="guest"/>
                <property name="password" value="guest"/>
            </bean>
        </beans>
           

    (3)使用Java Configuration配置方法:

private static void configurationJava() {
            ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
            AmqpTemplate template = context.getBean(AmqpTemplate.class);
            template.convertAndSend("myqueue", "foo");
            String foo = (String) template.receiveAndConvert("myqueue");
            System.out.println(foo);
        }
           

    configuration配置類:

 package com.cisco.estore.configuration;
        
        import org.springframework.amqp.core.AmqpAdmin;
        import org.springframework.amqp.core.Queue;
        import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
        import org.springframework.amqp.rabbit.connection.ConnectionFactory;
        import org.springframework.amqp.rabbit.core.RabbitAdmin;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        
        @Configuration
        public class RabbitConfiguration {
        
            @Bean
            public ConnectionFactory connectionFactory() {
                CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("127.0.0.1");
                cachingConnectionFactory.setUsername("guest");
                cachingConnectionFactory.setPassword("guest");
                return cachingConnectionFactory;
            }
        
            @Bean
            public AmqpAdmin amqpAdmin() {
                return new RabbitAdmin(connectionFactory());
            }
        
            @Bean
            public RabbitTemplate rabbitTemplate() {
                return new RabbitTemplate(connectionFactory());
            }
        
            @Bean
            public Queue myQueue() {
               return new Queue("myqueue");
            }
        }
           

    2. 基礎概念:

    (1)目的:

    元件解耦,Queue可以均攤分給不同消費者

    (2)概念

        A. ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。

        • Connection是RabbitMQ的socket連結,它封裝了socket協定相關部分邏輯。

        • ConnectionFactory為Connection的制造工廠。

        • Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、釋出消息等。

        B. Exchange

RabbitMQ 配置和基本原理

        C. Exchange Type

        有fanout、direct、topic、headers這四種類型

        Fanout: 所有發到Exchange的消息路由到所有與它綁定的Queue

        Direct:  把消息路由到binding key和routing key完全比對的Queue中

RabbitMQ 配置和基本原理

            如果routingkey=error發送到exchange,消息會發到queue1和queue2。如果routingkey=info則隻會發送到Queue2

        Topic:

            • routing key為一個句點号“. ”分隔的字元串(我們将被句點号“. ”分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” 

            • binding key與routing key一樣也是句點号“. ”分隔的字元串

            • binding key中可以存在兩種特殊字元“*”與“#”,用于做模糊比對,其中“*”用于比對一個單詞,“#”用于比對多個單詞(可以是零個)

RabbitMQ 配置和基本原理

       以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(隻會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都比對);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将會被丢棄,因為它們沒有比對任何bindingKey。

        回執機制:       

        ○ 用戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協定中定義了14中properties,這些屬性會随着消息一起發送)中設定兩個值replyTo(一個Queue名稱,用于告訴伺服器處理完成後将通知我的消息發送到這個Queue中)和correlationId(此次請求的辨別号,伺服器處理完成後需要将此屬性返還,用戶端将根據這個id了解哪條請求被成功執行了或執行失敗)

        ○ 伺服器端收到消息并處理

        ○ 伺服器端處理完消息後,将生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性

        ○ 用戶端之前已訂閱replyTo指定的Queue,從中收到伺服器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理。