在上一篇中,我們采用發送消息到隊列,然後隊裡把資訊發送到消費者,其實實際情況并非如此,rabbitMQ其實真正的絲線格式生産者不發送任何消息到隊列,甚至不知道消息将發送到哪個隊列。相反,生産值發送資訊到交換機,交換機接收到生産者的資訊,然後按照規則把它推送到隊列中。
一、釋出/訂閱
在上一篇中,隊列都指定了名稱,但是我們現在不需要這麼做,我們需要所有的日志資訊,而不知是其中的一個。如果要做這樣的隊列,我們需要2件事,一個就是擷取一個新的空的隊列,這樣我就需要建立一個随機名稱的隊列,最好讓伺服器幫我們做出選擇,第一個就是我們斷開使用者的隊列,應該自動進行删除。
資訊發送端代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String []args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//fanout 表示分發,所有的消費者得到同樣的隊列資訊
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//分發資訊
for(int i = 0;i<5;i++){
String message = "hello world " + i;
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("EmitLog Send :" + message);
}
channel.close();
connection.close();
}
}
消費者代碼
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//産生一個随機的隊列名稱
String queueName = channel.queueDeclare().getQueue();
//對隊列進行綁定
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("ReceiveLogs1 等待接受消息");
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogs1 接受到消息:" + message);
}
};
//隊列會自動删除
channel.basicConsume(queueName,true,consumer);
}
}
上面完成了一個釋出/訂閱模式的消息隊列
二、Routing
上面我們采用了廣播的模式進行消息的發送,現在我們采用路由的方式對不同的消息進行過濾。
建立發送端,定義三個路由 info、warning、error
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RoutingSendDirect {
private static final String EXCHANGE_NAME = "direct_logs";
// 路由關鍵字
private static final String[] routingKeys = new String[]{"info", "warning", "error"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "direct");//注意是direct
//發送資訊
for (String routingKey : routingKeys) {
String message = "RoutingSendDirect Send the message level : " + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("RoutingSendDirect Send :" + routingKey + "':'" + message);
}
channel.close();
connection.close();
}
}
建立消費者1 用于接受info、warning
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsDirect1 {
// 交換器名稱
private static final String EXCHANGE_NAME = "direct_logs";
// 路由關鍵字
private static final String[] routingKeys = new String[]{"info" ,"warning"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//擷取匿名隊列名稱
String queueName=channel.queueDeclare().getQueue();
//根據路由關鍵字進行綁定
for (String routingKey:routingKeys){
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
" queue:"+queueName+", BindRoutingKey:" + routingKey);
}
System.out.println("ReceiveLogsDirect1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
建立消費者2 用于接受error
import com.rabbitmq.client.*;
import java.io.UnsupportedEncodingException;
public class ReceiveLogsDirect2 {
// 交換器名稱
private static final String EXCHANGE_NAME = "direct_logs";
// 路由關鍵字
private static final String[] routingKeys = new String[]{"error"};
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//擷取匿名隊列名稱
String queueName = channel.queueDeclare().getQueue();
//根據路由關鍵字進行多重綁定
for (String severity : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println("ReceiveLogsDirect2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);
}
System.out.println("ReceiveLogsDirect2 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
三、Topics
這種應該屬于模糊比對
*:可以替代一個詞
#:可以替代0或者更多的詞
發送端:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicSend {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//聲明一個比對模式的交換機
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//待發送的消息
String[] routingKeys = new String[]{
"quick.orange.rabbit",
"lazy.orange.elephant",
"quick.orange.fox",
"lazy.brown.fox",
"quick.brown.fox",
"quick.orange.male.rabbit",
"lazy.orange.male.rabbit"
};
//發送消息
for (String severity : routingKeys) {
String message = "From " + severity + " routingKey' s message!";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println("TopicSend Sent '" + severity + "':'" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
if (connection != null) {
channel.close();
connection.close();
}
} finally {
if (connection != null) {
channel.close();
connection.close();
}
}
}
}
消費者1 比對*.orange.*的
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明一個比對模式的交換機
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
//路由關鍵字
String[] routingKeys = new String[]{"*.orange.*"};
//綁定路由
for (String routingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
}
System.out.println("ReceiveLogsTopic1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消費者2 比對*.*.rabit的 和lazy.# lazy開頭的
import com.rabbitmq.client.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個比對模式的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 路由關鍵字
String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
// 綁定路由關鍵字
for (String bindingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
}
System.out.println("ReceiveLogsTopic2 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
轉載于:https://www.cnblogs.com/baidawei/p/9172464.html