學習工作模式前,先看一下rabbitmq 給的helloworld案例

這是傳統的一對一,,,, 也就是一台機器生産,一台機器接收....
為了更好的了解代碼....我這裡示範的話用底層的代碼來示範....不整合架構了
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
這是rabbitmq 提供的依賴......導入一下就可以測試了
下面是我寫的生産類...
測試是沒問題的
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer01 {
//隊列名稱
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null; //與
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定端口
factory.setPort(5672);
//設定賬号密碼
factory.setUsername("guest"); //預設賬号密碼都是guest
factory.setPassword("guest");
//設定虛拟空間
factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器
//建立與RabbitMQ服務的TCP連接配接
connection = factory.newConnection();
//建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
channel = connection.createChannel();
//這有五個參數
/***
* 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
* param1:隊列名稱*
* param2:是否持久化* rabbit 關閉了該隊列是否存在..
* param3:隊列是否獨占此連接配接* 如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
* param4:隊列不再使用時是否自動删除此隊列* 該隊列不使用了就會删除該隊列
*
* param5:隊列參數*/
channel.queueDeclare(QUEUE, true, false, false,null );
String message = "你愛到極緻的人,不會愛你";
//釋出消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*
* param1 : 交換機 後面我會講這裡是指定交換機,使用預設的交換機
* param2 : 路由key,這也是先不寫,,後面講 大概作用是用于Exchange(交換機)将消息轉發到指定的消息隊列
* param3 :消息包含的屬性
* 消息體
*
*/
channel.basicPublish("", QUEUE,null ,message.getBytes() );
System.out.println("Send Message is:'" + message + "'");
}catch (Exception e){
}finally {
if (channel != null){
channel.close();
}
if (connection != null){
connection.close();
}
}
}
}
然後下面是我寫的消費類 ,, 連接配接mq代碼都一樣來着...關注發送消息和接收消息的方法就行 ...
同樣測試過,代碼是可運作的
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer01 {
//隊列名稱
private static final String QUEUE = "helloworld";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定端口
factory.setPort(5672);
//設定賬号密碼
factory.setUsername("guest"); //預設賬号密碼都是guest
factory.setPassword("guest");
//設定虛拟空間
factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器
//建立與RabbitMQ服務的TCP連接配接
connection = factory.newConnection();
//建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
channel = connection.createChannel();
//這有五個參數
/***
* 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
* param1:隊列名稱*
* param2:是否持久化* rabbit 關閉了該隊列是否存在..
* param3:隊列是否獨占此連接配接* 如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
* param4:隊列不再使用時是否自動删除此隊列* 該隊列不使用了就會删除該隊列
*
* param5:隊列參數*/
channel.queueDeclare(QUEUE, true, false, false,null ); //這裡其實可以不用聲明隊列的,因為 生産者已經聲明過了,但是如果生産者後釋出服務,隊列沒有聲明,消費者去監聽隊列..會報錯
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽隊列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 隊列名稱
* param2 : 是否自動回複,接收到消息會自動恢複mq收到了,mq會删除消息,如果拒絕的話需要手動回複,不回複的話會導緻mq不删除被消費過的消息,一直存在
* param3 : 消費對象,,包含消費方法
*
*/
channel.basicConsume(QUEUE,true , consumer);
}catch (Exception e){
}
}
}
以上的話就是rabbitmq提供的案例
一台生産者 , 一台消費者
emmm,,,
這裡講的工作的模式是 workqueues
WorkQueues
對比helloword案例,這裡多了個消費者..
應用場景:對于 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
測試
我們啟動倆次消費者
然後用剛剛寫的生産者,發送五條資訊
然後我們看消費者列印的資訊
結果 :
mq workqueues 使用的是輪詢方式講資訊平均發給消費者 ,
消費者會在處理完消息後 接收下一條消息
2.Publish/subscribe 釋出訂閱模式
特點
生産者将消息發送給broker.由交換機将消息發給每個跟綁定了交換機綁定的消息隊列,每個隊列都能收到生産者發送的每一條消息
生産者 :
聲明Exchange_fanout_inform交換機。
聲明兩個隊列并且綁定到此交換機,
綁定時不需要指定routingkey發送消息時不需要指定routingkey
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer02 {
//消息隊列名稱
public static final String QUEUE_INFORM_Test1 = "queue_inform_1";
public static final String QUEUE_INFORM_Test2 = "queue_inform_2";
//交換機名稱
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定端口
factory.setPort(5672);
//設定賬号密碼
factory.setUsername("guest"); //預設賬号密碼都是guest
factory.setPassword("guest");
//設定虛拟空間
factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器
//建立與RabbitMQ服務的TCP連接配接
connection = factory.newConnection();
//建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
channel = connection.createChannel();
//聲明交換機
/*
String exchange, String type
param1 : 交換機
param2 : 交換機 類型 fanout 、 topic、direct、headers
FANOUT 對應的模式是 釋出訂閱模式 publish/subscribe 模式
其他的工作模式以後會将
DIRECT 對應的是路由的工作模式
TOPIC 對應的是通配符工作模式
HEADERS 對應了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//聲明隊列
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*
* param1: 隊列名稱
* param2: 是否持久化
* param3 : 是否獨占此隊列
* param4 : 隊列不用是否自動删除
* param5 : 參數
*/
channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );
channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );
//交換機和隊列綁定
/**
* String queue, String exchange, String routingKey
* param1 : 隊列名稱
* exchange : 交換機
* routingKey : 路由key 後面講,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );
// 發送消息
String message = "";
for (int i = 0; i < 9; i++) {
message = "故事的開頭總是這樣,适逢其會,猝不及防。故事的結局總是這樣,花開兩朵,天各一方。"+ i;
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*
* param1 交換機名稱
* param2 路由key,後面講,先用 "" 代替 ,
* param3 參數
* param4 傳遞的字元串
*
*
*/
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
}catch (Exception e){
}finally{
if(channel!=null){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
下面是我寫的倆個消費者
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest1 {
//隊列名稱
private static final String QUEUE_INFORM_Test1 = "queue_inform_1";
//交換機名稱
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定端口
factory.setPort(5672);
//設定賬号密碼
factory.setUsername("guest"); //預設賬号密碼都是guest
factory.setPassword("guest");
//設定虛拟空間
factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器
//建立與RabbitMQ服務的TCP連接配接
connection = factory.newConnection();
//建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
channel = connection.createChannel();
//這有五個參數
/***
* 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
* param1:隊列名稱*
* param2:是否持久化* rabbit 關閉了該隊列是否存在..
* param3:隊列是否獨占此連接配接* 如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
* param4:隊列不再使用時是否自動删除此隊列* 該隊列不使用了就會删除該隊列
*
* param5:隊列參數*/
channel.queueDeclare(QUEUE_INFORM_Test1, true, false, false,null ); //這裡其實可以不用聲明隊列的,因為 生産者已經聲明過了,但是如果生産者後釋出服務,隊列沒有聲明,消費者去監聽隊列..會報錯
//聲明交換機
/*
String exchange, String type
param1 : 交換機
param2 : 交換機 類型 fanout 、 topic、direct、headers
FANOUT 對應的模式是 釋出訂閱模式 publish/subscribe 模式
其他的工作模式以後會将
DIRECT 對應的是路由的工作模式
TOPIC 對應的是通配符工作模式
HEADERS 對應了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//交換機和隊列綁定
/**
* String queue, String exchange, String routingKey
* param1 : 隊列名稱
* exchange : 交換機
* routingKey : 路由key 後面講,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽隊列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 隊列名稱
* param2 : 是否自動回複,接收到消息會自動恢複mq收到了,mq會删除消息,如果拒絕的話需要手動回複,不回複的話會導緻mq不删除被消費過的消息,一直存在
* param3 : 消費對象,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_Test1,true , consumer);
}catch (Exception e){
}
}
}
消費者二
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest2 {
//隊列名稱
public static final String QUEUE_INFORM_Test2 = "queue_inform_2";
//交換機名稱
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定端口
factory.setPort(5672);
//設定賬号密碼
factory.setUsername("guest"); //預設賬号密碼都是guest
factory.setPassword("guest");
//設定虛拟空間
factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器
//建立與RabbitMQ服務的TCP連接配接
connection = factory.newConnection();
//建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
channel = connection.createChannel();
//這有五個參數
/***
* 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
* param1:隊列名稱*
* param2:是否持久化* rabbit 關閉了該隊列是否存在..
* param3:隊列是否獨占此連接配接* 如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
* param4:隊列不再使用時是否自動删除此隊列* 該隊列不使用了就會删除該隊列
*
* param5:隊列參數*/
channel.queueDeclare(QUEUE_INFORM_Test2, true, false, false,null ); //這裡其實可以不用聲明隊列的,因為 生産者已經聲明過了,但是如果生産者後釋出服務,隊列沒有聲明,消費者去監聽隊列..會報錯
//聲明交換機
/*
String exchange, String type
param1 : 交換機
param2 : 交換機 類型 fanout 、 topic、direct、headers
FANOUT 對應的模式是 釋出訂閱模式 publish/subscribe 模式
其他的工作模式以後會将
DIRECT 對應的是路由的工作模式
TOPIC 對應的是通配符工作模式
HEADERS 對應了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//交換機和隊列綁定
/**
* String queue, String exchange, String routingKey
* param1 : 隊列名稱
* exchange : 交換機
* routingKey : 路由key 後面講,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽隊列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 隊列名稱
* param2 : 是否自動回複,接收到消息會自動恢複mq收到了,mq會删除消息,如果拒絕的話需要手動回複,不回複的話會導緻mq不删除被消費過的消息,一直存在
* param3 : 消費對象,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_Test2,true , consumer);
}catch (Exception e){
}
}
}
細心的同學可能發現了,,生産者跟消費者的不同代碼其實就是發送消息,接收消息的方法而已 ....
生産者聲明隊列跟交換機,消費者也聲明各自的隊列跟交換機... 其實就是為了怕先啟動消費者沒有發現隊列跟交換機報錯而已...
核心代碼的話 其實 就是 生成交換機,生成隊列 綁定交換機跟隊列
測試的話
我們先啟動倆個消費者
然後我們啟動生産者
我們看列印結果,,倆台消費者各自都處理了9條資訊
其實這種方法比WORKQUEUES 工作模式強,因為多台機器可以監聽一個隊列,也就是下圖所示,我們可以要倆個隊列,當然也可以建立一個隊列...建立多少隊列跟一個隊列多少消費者完全取決與我們
因為時間問題,,,,還有4種工作模式下次寫- -