java-rabbitmq-官網執行個體01
描述:最簡單執行個體,使用非持久化隊列,生産者釋出消息,MQ 将消息推送給消費者消費,之後 MQ 在隊列中删除該消息
依次運作:D1_Send.main();D1_Recv.main();
package com.example.tutorials;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
/**
* 最簡單執行個體,使用非持久化隊列,生産者釋出消息,MQ 将消息推送給消費者消費,之後 MQ 在隊列中删除該消息
* 發送消息到"預設交換器",預設交換器是一個無名直連交換器
* @create 2017-08-29
* amqp-client 4.2.0
**/
public class D1_Send {
private final static String QUEUE_NAME = "hello";
/**
* 生産者, "Hello World!"
* @param argv
* @throws Exception
*/
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//設定登入賬号
factory.setHost(ServerInfo.host);
factory.setPort(ServerInfo.port);
factory.setUsername(ServerInfo.uName);
factory.setPassword(ServerInfo.uPwd);
//連結伺服器
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定義一個隊列
boolean duiable=false;//持久化
boolean exclusive = false;//排他隊列
boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);
//發送消息
System.out.println("輸入要發送的消息,退出輸入 x ");
String message = "Hello World!";
do{
Scanner scanner = new Scanner(System.in);
message = scanner.next();
channel.basicPublish(""
, QUEUE_NAME
, null
, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}while(!"x".equals(message));
//關閉連結
channel.close();
connection.close();
}
}
package com.example.tutorials;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 最簡單執行個體,使用非持久化隊列,生産者釋出消息,MQ 将消息推送給消費者消費,之後 MQ 在隊列中删除該消息
* 發送消息到"預設交換器",預設交換器是一個無名直連交換器
* @create 2017-08-29
* amqp-client 4.2.0
**/
public class D1_Recv {
private final static String QUEUE_NAME = "hello";
/**
* 消費者, "Hello World!"
* @param argv
* @throws Exception
*/
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//設定登入賬号
factory.setHost(ServerInfo.host);
factory.setPort(ServerInfo.port);
factory.setUsername(ServerInfo.uName);
factory.setPassword(ServerInfo.uPwd);
//連結伺服器
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定義一個隊列
boolean duiable=false;//持久化
boolean exclusive = false;//排他隊列
boolean autoDelete=false;//沒有consumer時,隊列是否自動删除
channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);
//接收消息
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//啟動消費者,接收消息
boolean autoAck=true;//自動應答,true=自動發送應答;false=手動發送應答;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}