一、 交換器
RabbitMQ交換器(Exchange)分為四種
- direct
- fanout
- topic
- headers
- direct
預設的交換器類型,消息的RoutingKey與隊列的bindingKey比對,消息就投遞到相應的隊列
- fanout
一種釋出/訂閱模式的交換器,釋出一條消息時,fanout把消息廣播附加到fanout交換器的隊列上
接收類(訂閱):
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//一旦建立exchange,RabbitMQ不允許對其改變,否則報錯
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");//綁定是交換器與隊列之間的關系,可以了解為,隊列對此交換器的消息感興趣
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
釋出類:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class ReceiveLog {
private static final String EXCHANGE_NAME = "log";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "hi";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
- topic
topic類似于fanout交換器,但更加具體化,用routingKey進行規則比對,更靈活的比對出使用者想要接收的消息
routingKey形如:com.company.module.demo,具體比對規則:
"*"與"#"可以比對任意字元,差別是"*"隻能比對由"."分割的一段字元,而"#"可以比對所有字元
釋出一條"com.abc.test.push"的消息,能比對的routingKey:
com.abc.test.*
#.test.push
#
不能比對的:
com.abc.*
*.test.push
*
釋出類:
聲明隊列時,需要注意隊列的屬性,雖然隊列的聲明由消費者或生産者完成都可以,但如果由消費者聲明,由于生産者生産消息時,可能隊列還沒有聲明,會造成消息丢失,是以推薦由生産者聲明隊列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class RabbitMqSendTest {
private static String queue = "test_queue";
private static String exchange = "TestExchange";
private static String routingKey = "abc.test";
public static void main(String[] args) {
ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost("172.16.67.60");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection mqConnection = null;
try {
mqConnection = factory.newConnection();
Channel mqChannel = mqConnection.createChannel();
if (null != mqChannel && mqChannel.isOpen()) {
mqChannel.exchangeDeclare(exchange, "topic");
// String queueName = mqChannel.queueDeclare().getQueue();
// mqChannel.queueBind(queueName, exchange, routingKey);
//聲明隊列名稱與屬性
//durable持久隊列,mq重新開機隊列可恢複 exclusive獨占隊列,僅限于聲明它的連接配接使用操作
//autoDelete 自動删除 arguments 其他屬性
mqChannel.queueDeclare(queue, false, false, false, null);
mqChannel.queueBind(queue, exchange, routingKey);
//*******************************************
mqChannel.basicPublish(exchange, routingKey, null,
("hello").getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
mqConnection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
接收類
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveTopic {
private static String queue = "consume_queue";
private static String exchange = "TestExchange";
private static String routingKey = "*.test";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.67.60");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// channel聲明Exchange,名稱與類型
channel.exchangeDeclare(exchange, "topic");
// String queuename = channel.queueDeclare().getQueue();
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, "*.test"); //消費者指定消息隊列,并選擇特定的RoutingKey
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer client = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + msgString + "'");
}
};
channel.basicConsume(queue, true,client);
System.out.println();
}
}
二、持久化
RabbitMQ預設情況下重新開機消息伺服器時,會丢失消息,為了盡量保證消息在伺服器當機時不丢失,就需要把消息持久化,但是也隻是盡量不丢失,由于涉及磁盤寫入,當消息量巨大時,mq性能也會被嚴重拉低。
轉載于:https://www.cnblogs.com/castielangel/p/9952069.html