天天看點

java-rabbitmq-官網執行個體01

java-rabbitmq-官網執行個體01

描述:最簡單執行個體,使用非持久化隊列,生産者釋出消息,MQ 将消息推送給消費者消費,之後 MQ 在隊列中删除該消息

依次運作:D1_Send.main();D1_Recv.main();

package com.example.tutorials;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.util.Scanner;


/**
 * 最簡單執行個體,使用非持久化隊列,生産者釋出消息,MQ 将消息推送給消費者消費,之後 MQ 在隊列中删除該消息
 * 發送消息到"預設交換器",預設交換器是一個無名直連交換器
 * @create 2017-08-29
 * amqp-client 4.2.0
 **/
public class D1_Send {
    private final static String QUEUE_NAME = "hello";


    /**
     * 生産者, "Hello World!"
     * @param argv
     * @throws Exception
     */
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        //設定登入賬号
        factory.setHost(ServerInfo.host);
        factory.setPort(ServerInfo.port);
        factory.setUsername(ServerInfo.uName);
        factory.setPassword(ServerInfo.uPwd);
        //連結伺服器
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //定義一個隊列
        boolean duiable=false;//持久化
        boolean exclusive = false;//排他隊列
        boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
        channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);


        //發送消息
        System.out.println("輸入要發送的消息,退出輸入 x ");
        String message = "Hello World!";
        do{
            Scanner scanner = new Scanner(System.in);
            message = scanner.next();
            channel.basicPublish(""
                    , QUEUE_NAME
                    , null
                    , message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }while(!"x".equals(message));
        //關閉連結
        channel.close();
        connection.close();
    }
}      
package com.example.tutorials;


import com.rabbitmq.client.*;


import java.io.IOException;


/**
 * 最簡單執行個體,使用非持久化隊列,生産者釋出消息,MQ 将消息推送給消費者消費,之後 MQ 在隊列中删除該消息
 * 發送消息到"預設交換器",預設交換器是一個無名直連交換器
 * @create 2017-08-29
 * amqp-client 4.2.0
 **/
public class D1_Recv {
    private final static String QUEUE_NAME = "hello";


    /**
     * 消費者, "Hello World!"
     * @param argv
     * @throws Exception
     */
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        //設定登入賬号
        factory.setHost(ServerInfo.host);
        factory.setPort(ServerInfo.port);
        factory.setUsername(ServerInfo.uName);
        factory.setPassword(ServerInfo.uPwd);
        //連結伺服器
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //定義一個隊列
        boolean duiable=false;//持久化
        boolean exclusive = false;//排他隊列
        boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
        channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);


        //接收消息
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        //啟動消費者,接收消息
        boolean autoAck=true;//自動應答,true=自動發送應答;false=手動發送應答;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}      

繼續閱讀