java-rabbitmq-執行個體pull模式拉取消息
描述:
手動拉取指定隊列的消息。
運作:
D7_PullSend.main();
D7_PullRecv.main();
package com.example.tutorials;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
/**
* 使用 channel.basicGet() 方法,拉取指定隊列中的内容
* @create 2017-09-05
* amqp-client 4.2.0
**/
public class D7_PullSend {
private final static String QUEUE_NAME = "pull_queue";
/**
* 生産者,
* @param argv
* @throws Exception
*/
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//設定登入賬号
factory.setHost(ServerInfo.host);
factory.setPort(ServerInfo.port);
factory.setUsername(ServerInfo.uName);
factory.setPassword(ServerInfo.uPwd);
//連結伺服器
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String,Object> args = new HashMap<>();
// args.put("x-message-ttl",15*1000);//消息過期時間為 15 秒
//args.put("x-expires",1*60*1000); //隊列過期時間為 1 分鐘
//定義一個隊列
boolean duiable=false;//持久化
boolean exclusive = false;//排他隊列
boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, args);
//發送消息
System.out.println("輸入要發送的消息,退出輸入 x ");
String message ;
//
AMQP.BasicProperties prop = new AMQP.BasicProperties
.Builder()
.expiration("15000") //消息過期時間為 15 秒
.build();
do{
Scanner scanner = new Scanner(System.in);
message = scanner.next();
channel.basicPublish(""
, QUEUE_NAME
, prop
, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}while(!"x".equals(message));
//關閉連結
channel.close();
connection.close();
}
}
package com.example.tutorials;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import java.util.Scanner;
/**
* 使用 channel.basicGet() 方法,拉取指定隊列中的内容
* @create 2017-09-05
* amqp-client 4.2.0
**/
public class D7_PullRecv {
private final static String QUEUE_NAME = "pull_queue";
/**
* 消費者, "Hello World!"
* @param argv
* @throws Exception
*/
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//設定登入賬号
factory.setHost(ServerInfo.host);
factory.setPort(ServerInfo.port);
factory.setUsername(ServerInfo.uName);
factory.setPassword(ServerInfo.uPwd);
//連結伺服器
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定義一個隊列
boolean duiable=false;//持久化
boolean exclusive = false;//排他隊列
boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);
//接收消息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
boolean autoAck = true; //自動應答
System.out.println("輸入回車擷取消息,退出輸入 x ");
String message ="";
GetResponse resp ;
do{
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
resp = channel.basicGet(QUEUE_NAME,autoAck);
if(resp==null){
System.out.println(QUEUE_NAME+" 隊列無消息");
continue;
}
message = new String(resp.getBody(), "UTF-8");
System.out.println(String.format(" [x] Recv Count %s , msg = %s;"
,resp.getMessageCount()
,message));
}while(!"x".equals(message));
}
}