Java簡單操作RabbitMQ
-
- 前言
-
- 一、添加pom依賴
- 二、生産者
- 三、消費者
前言
目前正在學習rabbitmq,簡單記錄一下自己的代碼
一、添加pom依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
二、生産者
public class Send {
// 定義一個消息隊列名稱
private final static String QUEUE_NAME = "FIRST-MQ";
public static void main(String[] args) throws IOException, TimeoutException {
// 建立新的連接配接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("106.**.**.82");
factory.setPort(5672);
// 設定虛拟主機 一個mq的服務可以設定多個虛拟機,每個虛拟機相當于獨立的mq
factory.setVirtualHost("/");
factory.setUsername("user");
factory.setPassword("pwd");
Connection connection = null;
Channel channel = null;
try{
connection = factory.newConnection();
// 建立内部會話通道,生産者和mq服務通信都在channel中完成
channel = connection.createChannel();
// 聲明交換機,這裡暫時沒有
// 聲明建立隊列,如果沒有,則建立
// 參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* queue:隊列名稱
* durable:是否持久化,存到磁盤中
* exclusive:是否排他,是否獨占連接配接,該隊列隻允許在這個connection中通路,如果關閉連接配接,則删除隊列
* autoDelete:不再使用時自動删除。和exclusive 一同設為true 可以實作臨時隊列。
* args:擴充參數,比如存活時間
*/
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
String message = "HELLO WORLD";
// 發送消息
// 參數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* exchange:交換機 如果不指定(""),就預設交換機
* routingKey:路由key;交換機根據路由key将消息轉發到指定的隊列,如果使用預設交換機,routingKey為隊列名稱
* props:額外屬性
* body:消息内容
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("send message:"+message);
} catch (TimeoutException | IOException e) {
e.printStackTrace();
} finally {
// 先關閉通道
channel.close();
// 再關閉連接配接
connection.close();
}
}
}
三、消費者
public class Recv {
private final static String QUEUE_NAME = "FIRST-MQ";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("106.*5.**.8*");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("user");
factory.setPassword("pwd");
try{
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" Waiting for messages. ");
// 定義實作消費方法
Consumer consumer = new DefaultConsumer(channel) {
/**
* 當接收到消息後,此方法将被調用
* @description:
* @param consumerTag: (非必須)消費者标簽:辨別消費者,在監聽隊列的時候設定channel.basicConsume中設定
* @param envelope: 信封,通過envelope 可以擷取很多資訊,交換機、消息id(可用于消息接收ack)、路由Key等等
* @param properties:消息的屬性,發送消息時 設定的額外屬性
* @param body :消息内容
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("receive:" + message);
TimeUnit.SECONDS.sleep(1);
// 消息id
envelope.getDeliveryTag();
// 手動回複ack
// channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 接收消息 監聽隊列
// 參數:String queue, boolean autoAck, Consumer callback
/**
* queue:隊列
* autoAck:自動回複:當消費者接收到消息後,告訴mq消息已經接收。TRUE:自動回複,false:程式設計回複
* callback:消費方法,當消費者接收消息執行的方法。
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
TimeUnit.SECONDS.sleep(15);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
}