天天看點

RabbitMQ生産者消費者模型建構(二)

ConnectionFactory:擷取連接配接(位址,端口号,虛拟主機,使用者名,密碼)

Producer&Consumer:生産和消費者

Connection:一個連接配接

Channel:資料通信信道,可發送、接收消息

Queue:消息存儲隊列

生産者

/建立連接配接工廠,進行設定
 ConnectionFactory connectionFactory = new ConnectionFactory();
 connectionFactory.setHost("192.168.52.1");
 connectionFactory.setPort(5672);
 connectionFactory.setVirtualHost("/");
 connectionFactory.setUsername("guest");
 connectionFactory.setPassword("guest");

 //建立連接配接
 Connection connection = connectionFactory.newConnection();

 //建立通道
 Channel channel = connection.createChannel();

 //發送資料
 String str = "hello world";
 //第一個參數是exchange,第二個是路由key,第三個是消息頭,第四個是消息主體
 channel.basicPublish("", "test001", null, str.getBytes());

 //關閉連接配接
 channel.close();
 connection.close();
           

消費者

//1 建立一個ConnectionFactory, 并進行配置
 ConnectionFactory connectionFactory = new     ConnectionFactory();
 connectionFactory.setHost("192.168.52.1");
 connectionFactory.setPort(5672);
 connectionFactory.setVirtualHost("/");
        
 //2 通過連接配接工廠建立連接配接
 Connection connection = connectionFactory.newConnection();
        
 //3 通過connection建立一個Channel
 Channel channel = connection.createChannel();
        
 //4 聲明(建立)一個隊列
 String queueName = "test001";
 //String queue, boolean durable, boolean exclusive, boolean  autoDelete, Map<String, Object> arguments
 //第一個參數為隊列名稱,第二個參數為 (true持久化,伺服器重新開機隊列也不會消失), 當第三個參數為true時,獨占(順序隊列,這個隊列隻有這個channel可以連接配接,就像加了鎖一樣,),當第四個參數為true時,如果隊列脫離了exchange了,自動删除
 channel.queueDeclare(queueName, true, false, false, null);
        
 //5 建立消費者
 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        
 //6 設定Channel
 第二個true是自動簽收的意思,當消費者收到消息會發送一條已經收到的ack确認資訊
 channel.basicConsume(queueName, true, queueingConsumer);
        
 while(true){
    //7 擷取消息
    
    Delivery delivery = queueingConsumer.nextDelivery();
    String msg = new String(delivery.getBody());
    System.err.println("消費端: " + msg);
    Map<String, Object> headers = delivery.getProperties().getHeaders();
    System.err.println("headers get my1 value: " + headers.get("my1"));
            
    //Envelope envelope = delivery.getEnvelope();
 }
           

pom.xml

//pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>2.1.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--     <dependency>
               <groupId>com.rabbitmq</groupId>
               <artifactId>amqp-client</artifactId>
               <version>3.6.5</version>
             </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.4.2</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
        </plugins>
    </build>