天天看點

java-rabbitmq-執行個體pull模式拉取消息

java-rabbitmq-執行個體pull模式拉取消息

描述:

手動拉取指定隊列的消息。

運作:

D7_PullSend.main();

D7_PullRecv.main();

package com.example.tutorials;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;


/**
 * 使用 channel.basicGet() 方法,拉取指定隊列中的内容
 * @create 2017-09-05
 * amqp-client 4.2.0
 **/
public class D7_PullSend {
    private final static String QUEUE_NAME = "pull_queue";


    /**
     * 生産者,
     * @param argv
     * @throws Exception
     */
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        //設定登入賬号
        factory.setHost(ServerInfo.host);
        factory.setPort(ServerInfo.port);
        factory.setUsername(ServerInfo.uName);
        factory.setPassword(ServerInfo.uPwd);
        //連結伺服器
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        Map<String,Object> args = new HashMap<>();
//        args.put("x-message-ttl",15*1000);//消息過期時間為 15 秒
        //args.put("x-expires",1*60*1000); //隊列過期時間為 1 分鐘
        //定義一個隊列
        boolean duiable=false;//持久化
        boolean exclusive = false;//排他隊列
        boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
        channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, args);


        //發送消息
        System.out.println("輸入要發送的消息,退出輸入 x ");
        String message ;
        //
        AMQP.BasicProperties prop = new AMQP.BasicProperties
                .Builder()
                .expiration("15000") //消息過期時間為 15 秒
                .build();
        do{
            Scanner scanner = new Scanner(System.in);
            message = scanner.next();
            channel.basicPublish(""
                    , QUEUE_NAME
                    , prop
                    , message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }while(!"x".equals(message));
        //關閉連結
        channel.close();
        connection.close();
    }
}      
package com.example.tutorials;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;


import java.util.Scanner;


/**
 * 使用 channel.basicGet() 方法,拉取指定隊列中的内容
 * @create 2017-09-05
 * amqp-client 4.2.0
 **/
public class D7_PullRecv {
    private final static String QUEUE_NAME = "pull_queue";


    /**
     * 消費者, "Hello World!"
     * @param argv
     * @throws Exception
     */
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        //設定登入賬号
        factory.setHost(ServerInfo.host);
        factory.setPort(ServerInfo.port);
        factory.setUsername(ServerInfo.uName);
        factory.setPassword(ServerInfo.uPwd);
        //連結伺服器
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //定義一個隊列
        boolean duiable=false;//持久化
        boolean exclusive = false;//排他隊列
        boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
        channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);


        //接收消息
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        boolean autoAck = true; //自動應答


        System.out.println("輸入回車擷取消息,退出輸入 x ");
        String message ="";
        GetResponse resp ;
        do{
            Scanner scanner = new Scanner(System.in);
            scanner.nextLine();
            resp = channel.basicGet(QUEUE_NAME,autoAck);
            if(resp==null){
                System.out.println(QUEUE_NAME+" 隊列無消息");
                continue;
            }
            message = new String(resp.getBody(), "UTF-8");
            System.out.println(String.format(" [x] Recv Count %s , msg = %s;"
                ,resp.getMessageCount()
                ,message));
        }while(!"x".equals(message));
    }
}