天天看点

RabbitMQ生产者消费者模型构建(二)

ConnectionFactory:获取连接(地址,端口号,虚拟主机,用户名,密码)

Producer&Consumer:生产和消费者

Connection:一个连接

Channel:数据通信信道,可发送、接收消息

Queue:消息存储队列

生产者

/创建连接工厂,进行设置
 ConnectionFactory connectionFactory = new ConnectionFactory();
 connectionFactory.setHost("192.168.52.1");
 connectionFactory.setPort(5672);
 connectionFactory.setVirtualHost("/");
 connectionFactory.setUsername("guest");
 connectionFactory.setPassword("guest");

 //创建连接
 Connection connection = connectionFactory.newConnection();

 //创建通道
 Channel channel = connection.createChannel();

 //发送数据
 String str = "hello world";
 //第一个参数是exchange,第二个是路由key,第三个是消息头,第四个是消息主体
 channel.basicPublish("", "test001", null, str.getBytes());

 //关闭连接
 channel.close();
 connection.close();
           

消费者

//1 创建一个ConnectionFactory, 并进行配置
 ConnectionFactory connectionFactory = new     ConnectionFactory();
 connectionFactory.setHost("192.168.52.1");
 connectionFactory.setPort(5672);
 connectionFactory.setVirtualHost("/");
        
 //2 通过连接工厂创建连接
 Connection connection = connectionFactory.newConnection();
        
 //3 通过connection创建一个Channel
 Channel channel = connection.createChannel();
        
 //4 声明(创建)一个队列
 String queueName = "test001";
 //String queue, boolean durable, boolean exclusive, boolean  autoDelete, Map<String, Object> arguments
 //第一个参数为队列名称,第二个参数为 (true持久化,服务器重启队列也不会消失), 当第三个参数为true时,独占(顺序队列,这个队列只有这个channel可以连接,就像加了锁一样,),当第四个参数为true时,如果队列脱离了exchange了,自动删除
 channel.queueDeclare(queueName, true, false, false, null);
        
 //5 创建消费者
 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        
 //6 设置Channel
 第二个true是自动签收的意思,当消费者收到消息会发送一条已经收到的ack确认信息
 channel.basicConsume(queueName, true, queueingConsumer);
        
 while(true){
    //7 获取消息
    
    Delivery delivery = queueingConsumer.nextDelivery();
    String msg = new String(delivery.getBody());
    System.err.println("消费端: " + msg);
    Map<String, Object> headers = delivery.getProperties().getHeaders();
    System.err.println("headers get my1 value: " + headers.get("my1"));
            
    //Envelope envelope = delivery.getEnvelope();
 }
           

pom.xml

//pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>2.1.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--     <dependency>
               <groupId>com.rabbitmq</groupId>
               <artifactId>amqp-client</artifactId>
               <version>3.6.5</version>
             </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.4.2</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
        </plugins>
    </build>