- RabbitMQ使用示例代碼
- 消息隊列(Message Queue):是一種跨程序的通信機制,主要用于上下文傳遞消息。
- MQ作為消息中間件,最主要的作用是對系統之間傳遞消息進行"解耦",MQ是資料可靠性的重要保障。
- MQ主要角色是消息代理伺服器。
- RabbitMQ的優勢
- RabbitMQ是世界上最火的開源消息代理伺服器;
- RabbitMQ幾乎支援所有的作業系統與程式設計語言;
- RabbitMQ提供了高并發、高可用的成熟方案,支援多種消息協定,易于部署與使用。
- RabbitMQ的應用場景
- 異構系統的資料傳遞
- 高并發程式的流量控制
- 基于P2P的程式
- 分布式系統的事務一緻性
- 高可靠性的交易系統
- RabbitMQ常用指令
rabbitmq-server #前台啟動服務 rabbitmq-server -detached #背景啟動服務 rabbitmqctl stop #停止服務,表示停止程序 rabbitmqctl start_app #啟動應用 rabbitmqctl stop_app #終止應用,不會關閉程序 rabbitmqctl add_user username password #建立使用者 rabbitmqctl delete_user #删除使用者 rabbitmqctl change_password username newpassword #修改使用者的密碼 rabbitmqctl set_user_tags username tag #授予使用者角色(tag) rabbitmqctl set_permissions -p / username '.*' '.*' '.*' #設定使用者允許通路的vhost(虛拟主機),分别是配置權限、讀權限、寫權限
- 檢視網絡端口号
netstat -tulpn
- RabbitMQ的4中tag
-
超級管理者administrator:
可登入管理控制台,可檢視所有的資訊,并且對使用者、政策(policy)進行操作
-
監控者monitoring
登入管理控制台,同時可以檢視rabbitmq節點相關的資訊(程序數、記憶體使用情況、磁盤使用情況等),但無法進行政策的定制
-
政策制定者policymaker
可登入管理控制台,同時可以對policy進行管理,但無法檢視節點(執行個體)相關的資訊
-
普通管理者management
僅可登入管理控制台,無法看到節點資訊,也無法對政策進行管理
-
- RabbitMQ的guest使用者隻能在本地登入,如果需要遠端登入RabbitMQ控制台,需要新建立一個使用者,并将該使用者的權限設定為administrator
- AMPQ:進階消息隊列協定,一個統一提供消息服務的應用層标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。基于此協定的用戶端與中間件可傳遞消息,并不受用戶端/中間件不同産品、不同的開發語言等條件的限制。Erlang中的實作有RabbitMQ等。
-
基本概念
Producer 生産者,消息的提供者、
Consumer 消費者,消息的使用者
Message 消息,程式間的通信資料
Queue 隊列,消息存放的容器,消息先進先出
Vhost 虛拟主機,相當于MQ的"資料庫",用于存儲隊列
- 消息的狀态
- Ready:消息已經被傳入隊列,等待被消費
-
Unacked:
消息已經被消費者認領,但還未被确認"已被消費"
Unacked狀态下消費者斷開連接配接,則消息回到Ready
沒有确認,用戶端沒有斷開,則一直處于Unacked
- Finished:調用basicAck()方法後,表示消息已經被消費,從隊列中移除
- 第一次連接配接RabbitMQ
- 導入依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
- RabbitMQ生産者的代碼
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Procuder { public static void main(String[] args) throws IOException, TimeoutException { //ConnectionFactory建立MQ的實體連接配接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //ip位址 connectionFactory.setPort(5672); //端口 connectionFactory.setUsername("guest"); //使用者名 connectionFactory.setPassword("guest"); //密碼 connectionFactory.setVirtualHost("/test"); //虛拟主機 //TCP實體連接配接 Connection connection = connectionFactory.newConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列 //第一個參數,對列名稱 //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare("helloworld", false, false, false, null); //需要發送的消息 String content = "Hello World bb!"; //第一個參數,交換機 //第二個參數,隊列名稱 //第三個參數,額外的設定屬性 //第四個參數,需要發送的消息的位元組數組 channel.basicPublish("", "helloworld", null, content.getBytes()); channel.close(); connection.close(); System.out.println("資料發送成功"); } }
- RabbitMQ消費者的代碼
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //ConnectionFactory建立MQ的實體連接配接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //ip位址 connectionFactory.setPort(5672); //端口 connectionFactory.setUsername("guest"); //使用者名 connectionFactory.setPassword("guest"); //密碼 connectionFactory.setVirtualHost("/test"); //虛拟主機 //TCP實體連接配接 Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //綁定消息隊列 //第一個參數,對列名稱 //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare("helloworld", false, false, false, null); //建立一個消息消費者 //第一個參數,隊列名稱 //第二個參數,是否自動确認收到消息,false表示手動編寫程式來确認消息,這是MQ推薦的做法 //第三個參數,DefaultConsumer的實作類 channel.basicConsume("helloworld", false, new Receiver(channel)); //在消費者中不能關閉channel和connection } } class Receiver extends DefaultConsumer{ private Channel channel; //重寫構造函數,channel通道對象需要從外部傳入,在handleDelivery中會用到 public Receiver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /*super.handleDelivery(consumerTag, envelope, properties, body);*/ String messageBody = new String(body); System.out.println("消費者接收到: " + messageBody); //簽收消息,确認消息 //第一個參數,envelope.getDeliveryTag()擷取這個消息的TagId,是一個整數 //第二個參數,false隻确認簽收目前的消息,true時,表示簽收該消費者所有未簽收的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }
- RabbitMQ連接配接工具類的封裝
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQUtil { private static ConnectionFactory connectionFactory; static { //ConnectionFactory建立MQ的實體連接配接 connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //ip位址 connectionFactory.setPort(5672); //端口 connectionFactory.setUsername("guest"); //使用者名 connectionFactory.setPassword("guest"); //密碼 connectionFactory.setVirtualHost("/test"); //虛拟主機 } public static Connection getConnection() { Connection connection = null; try { connection = connectionFactory.newConnection(); } catch (Exception e) { throw new RuntimeException(e); //不需要顯式的聲明抛出 } return connection; } }
- 優化後的生産者
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Procuder { public static void main(String[] args) throws IOException, TimeoutException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列 //第一個參數,對列名稱 helloworld //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare(RabbitMQConsts.QUEUE_HELLO, false, false, false, null); //需要發送的消息 String content = "Hello World bb!"; //第一個參數,交換機 //第二個參數,隊列名稱 helloworld //第三個參數,額外的設定屬性 //第四個參數,需要發送的消息的位元組數組 channel.basicPublish("", RabbitMQConsts.QUEUE_HELLO, null, content.getBytes()); channel.close(); connection.close(); System.out.println("資料發送成功"); } }
- 優化後的消費者
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通道 Channel channel = connection.createChannel(); //綁定消息隊列 //第一個參數,對列名稱 helloworld //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare(RabbitMQConsts.QUEUE_HELLO, false, false, false, null); //建立一個消息消費者 //第一個參數,隊列名稱 helloworld //第二個參數,是否自動确認收到消息,false表示手動編寫程式來确認消息,這是MQ推薦的做法 //第三個參數,DefaultConsumer的實作類 channel.basicConsume(RabbitMQConsts.QUEUE_HELLO, false, new Receiver(channel)); //在消費者中不能關閉channel和connection } } class Receiver extends DefaultConsumer{ private Channel channel; //重寫構造函數,channel通道對象需要從外部傳入,在handleDelivery中會用到 public Receiver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /*super.handleDelivery(consumerTag, envelope, properties, body);*/ String messageBody = new String(body); System.out.println("消費者接收到: " + messageBody); //簽收消息,确認消息 //第一個參數,envelope.getDeliveryTag()擷取這個消息的TagId,是一個整數 //第二個參數,false隻确認簽收目前的消息,true時,表示簽收該消費者所有未簽收的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }
- RabbitMQ的工作模式
- Hello World:最簡單的模式,一個生産者對應一個消費者
- Work queues:一個生産者生産的消息由多個消費者進行消費,一個消息隻能被一個消費者處理
- Publish/Subsribe:多了一個交換器,交消息按照一定的規則分發給多個消費者,每個消費者收到的消息是一模一樣的,一個消息可能被多個消費者處理
- Routing模式:交消息有選擇的分發給不同的消費者,不同的消費者收到的消息有可能不一樣,缺點是,需要對資料進行精準比對
- Topics:與Routing類似,可以定義一個表達式規則,模糊比對
- RPC:遠端調用
-
Work queues工作隊列:
建立一個工作隊列,它會發送一些耗時的任務給多個消費者。
有多個消息的情況下,Work queue會将消息分發給不同的消費者,每個消費者都會接收到不同的消息,并且可以根據處理消息的速度來接收消息的數量,讓消費者程式發揮最大性能。
特别适合在叢集環境中做異步處理,能最大程度的發揮每一台伺服器的性能。
- 将消費者的處理方式修改為處理完一條,再去隊裡裡面拿下一條資料的方法
channel.basicQos(1); //處理完一條消息再去隊列裡面取下一條消息 channel.basicAck(envelope.getDeliveryTag(), false); //調用basicAck的時候,才會去取下一條的資料
-
Publish/Subsribe模式
釋出/訂閱模式中,生産者不再直接與隊列進行綁定,而是将資料發送至"交換機Exchange"
交換機Exchange用于将資料按某種規則送入與之綁定的隊列,進而供消費者使用。
釋出/訂閱模式中,交換機将無差别的将所有的消息送入與之綁定的隊列,所有的消費者拿到的消息完全相同,交換機的類型被稱為fanout。
- 使用場景:釋出訂閱模式因為所有消費者獲得相同的消息,是以特别适合"資料提供商與應用商"
- 建立一個釋出訂閱模式下的交換機(類型為fanout)
RabbitMQ介紹及各個工作模式的使用 - 生産者代碼示例
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; /** * 氣象局(模拟生産者) */ public class WeatherBureau { public static void main(String[] args) throws IOException, TimeoutException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); String input = new Scanner(System.in).next(); //第一個參數,交換機 //第二個參數,隊列名稱,此處不需要 //第三個參數,額外的設定屬性 //第四個參數,需要發送的消息的位元組數組 channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER, "", null, input.getBytes()); channel.close(); connection.close(); } }
- 消費者一代碼示例
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; /** * 百度(模拟消費者) */ public class Baidu { public static void main(String[] args) throws IOException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列 //第一個參數,對列名稱 helloworld //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare(RabbitMQConsts.QUEUE_BAIDU, false, false, false, null); //綁定隊列交換機 //第一個參數,隊列名稱 //第二個參數,交換機名稱 //第三個參數,路由key,此處不需要 channel.queueBind(RabbitMQConsts.QUEUE_BAIDU, RabbitMQConsts.EXCHANGE_WEATHER, ""); //每次隻去一條消息進行消費 channel.basicQos(1); channel.basicConsume(RabbitMQConsts.QUEUE_BAIDU, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度收到氣象資訊----》"+new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
- 消費者二代碼示例
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Sina { public static void main(String[] args) throws IOException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列 //第一個參數,對列名稱 helloworld //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare(RabbitMQConsts.QUEUE_SINA, false, false, false, null); //綁定隊列交換機 //第一個參數,隊列名稱 //第二個參數,交換機名稱 //第三個參數,路由key,此處不需要 channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER, ""); //每次隻去一條消息進行消費 channel.basicQos(1); channel.basicConsume(RabbitMQConsts.QUEUE_SINA, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪收到氣象資訊----》"+new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
- 路由Routing模式:是在釋出訂閱模式上的變種
- 釋出訂閱模式是無條件的将所有消息分發給所有的隊列,路由模式則是交換機根據Routing key有條件的将資料篩選後分發給消費者隊列。
- 路由模式下的交換機被稱為direct
- 建立routing模式下的交換機(類型為direct)
RabbitMQ介紹及各個工作模式的使用 - 路由模式下的生産者
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * 氣象局(模拟生産者) */ public class WeatherBureau { public static void main(String[] args) throws IOException, TimeoutException { LinkedHashMap<String, String> area = new LinkedHashMap<>(); area.put("china.shaanxi.xian.20190624", "中國陝西西安20190624天氣資料"); area.put("china.shandong.qingdao.20190624", "中國山東青島20190624天氣資料"); area.put("china.henan.zhengzhou.20190624", "中國河南鄭州20190624天氣資料"); area.put("us.cal.la.20190624", "美國加州洛杉矶20190624天氣資料"); area.put("china.shaanxi.xian.20190625", "中國陝西西安20190625天氣資料"); area.put("china.shandong.qingdao.20190625", "中國山東青島20190625天氣資料"); area.put("china.henan.zhengzhou.20190625", "中國河南鄭州20190625天氣資料"); area.put("us.cal.la.20190625", "美國加州洛杉矶20190625天氣資料"); //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator(); while(iterator.hasNext()){ Map.Entry<String, String> next = iterator.next(); //第一個參數,交換機 //第二個參數,相當于資料篩選的條件 //第三個參數,額外的設定屬性 //第四個參數,需要發送的消息的位元組數組 channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, next.getKey(), null, next.getValue().getBytes()); } channel.close(); connection.close(); } }
- 路由模式下的消費者
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Sina { public static void main(String[] args) throws IOException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列 //第一個參數,對列名稱 helloworld //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare(RabbitMQConsts.QUEUE_SINA, false, false, false, null); //綁定隊列交換機 //第一個參數,隊列名稱 //第二個參數,交換機名稱 //第三個參數,路由key,此處需要設定路由規則 channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20190624"); channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20190624"); channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20190625"); channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20190625"); //每次隻去一條消息進行消費 channel.basicQos(1); channel.basicConsume(RabbitMQConsts.QUEUE_SINA, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪收到氣象資訊----》"+new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } } /** 接收到的消息: 新浪收到氣象資訊----》中國河南鄭州20190624天氣資料 新浪收到氣象資訊----》美國加州洛杉矶20190624天氣資料 新浪收到氣象資訊----》中國河南鄭州20190625天氣資料 新浪收到氣象資訊----》美國加州洛杉矶20190625天氣資料 **/
- 主題模式Topic:
- 在Routing模式的基礎上提供了對Routing Key模糊比對的功能,可以簡化程式的編寫。
- 主題模式下,模糊比對表達式規則為:
* 比對單個關鍵字 # 比對所有關鍵字
- 主題模式下的交換機類型為topic
- 建立topic模式下的交換機(類型為topic):
RabbitMQ介紹及各個工作模式的使用 - 主題模式下的生産者代碼示例
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * 氣象局(模拟生産者) */ public class WeatherBureau { public static void main(String[] args) throws IOException, TimeoutException { LinkedHashMap<String, String> area = new LinkedHashMap<>(); area.put("china.shaanxi.xian.20190624", "中國陝西西安20190624天氣資料"); area.put("china.shandong.qingdao.20190624", "中國山東青島20190624天氣資料"); area.put("china.henan.zhengzhou.20190624", "中國河南鄭州20190624天氣資料"); area.put("us.cal.la.20190624", "美國加州洛杉矶20190624天氣資料"); area.put("china.shaanxi.xian.20190625", "中國陝西西安20190625天氣資料"); area.put("china.shandong.qingdao.20190625", "中國山東青島20190625天氣資料"); area.put("china.henan.zhengzhou.20190625", "中國河南鄭州20190625天氣資料"); area.put("us.cal.la.20190625", "美國加州洛杉矶20190625天氣資料"); //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator(); while(iterator.hasNext()){ Map.Entry<String, String> next = iterator.next(); //第一個參數,交換機 //第二個參數,相當于資料篩選的條件 //第三個參數,額外的設定屬性 //第四個參數,需要發送的消息的位元組數組 channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, next.getKey(), null, next.getValue().getBytes()); } channel.close(); connection.close(); } }
- 主題模式的消費者示例1
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; /** * 百度(模拟消費者) */ public class Baidu { public static void main(String[] args) throws IOException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列 //第一個參數,對列名稱 helloworld //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare(RabbitMQConsts.QUEUE_BAIDU, false, false, false, null); //綁定隊列交換機 //第一個參數,隊列名稱 //第二個參數,交換機名稱 //第三個參數,路由key,此處需要設定路由規則 channel.queueBind(RabbitMQConsts.QUEUE_BAIDU, RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, "us.#"); //每次隻去一條消息進行消費 channel.basicQos(1); channel.basicConsume(RabbitMQConsts.QUEUE_BAIDU, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度收到氣象資訊----》"+new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
- 主題模式的消費者示例2
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Sina { public static void main(String[] args) throws IOException { //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列 //第一個參數,對列名稱 helloworld //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失 //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路 //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列 //第五個參數,其他額外的參數 channel.queueDeclare(RabbitMQConsts.QUEUE_SINA, false, false, false, null); //綁定隊列交換機 //第一個參數,隊列名稱 //第二個參數,交換機名稱 //第三個參數,路由key,此處需要設定路由規則 channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, "*.*.*.20190624"); //解綁 //channel.queueUnbind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, "*.*.*.20190624"); //每次隻去一條消息進行消費 channel.basicQos(1); channel.basicConsume(RabbitMQConsts.QUEUE_SINA, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪收到氣象資訊----》"+new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
- RabbitMQ消息确認機制:RabbitMQ在傳遞消息的過程中充當了代理人(Broker)的角色,生産者怎麼知道消息被正确投遞到了Broker了呢?
- RabbitMQ提供了監聽器(Listener)來接收消息的投遞狀态。
- 消息确認涉及兩種狀态Confirm與Return。
- Confirm代表生産者将消息送到Broker時産生的狀态,後續會出現兩種情況:
- ack 代表broker已經将資料接收
- nack 代表broker拒收消息,原因有多種,隊列已滿、限流、IO異常…
- Return代表消息被Broker正常接收(ack)後,但Broker沒有對應的隊列進行投遞時産生的狀态,消息被退回生産者。
- 上面的兩種狀态隻代表生産者與Broker之間消息投遞的情況。與消費者是否接收/确認消息無關。
- 監聽器的代碼示例
import com.kangswx.rabbitmq.utils.RabbitMQConsts; import com.kangswx.rabbitmq.utils.RabbitMQUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * 氣象局(模拟生産者) */ public class WeatherBureau { public static void main(String[] args) throws IOException, TimeoutException { LinkedHashMap<String, String> area = new LinkedHashMap<>(); area.put("china.shaanxi.xian.20190624", "中國陝西西安20190624天氣資料"); area.put("china.shandong.qingdao.20190624", "中國山東青島20190624天氣資料"); area.put("china.henan.zhengzhou.20190624", "中國河南鄭州20190624天氣資料"); area.put("us.cal.la.20190624", "美國加州洛杉矶20190624天氣資料"); area.put("china.shaanxi.xian.20190625", "中國陝西西安20190625天氣資料"); area.put("china.shandong.qingdao.20190625", "中國山東青島20190625天氣資料"); area.put("china.henan.zhengzhou.20190625", "中國河南鄭州20190625天氣資料"); area.put("us.cal.la.20190625", "美國加州洛杉矶20190625天氣資料"); //TCP實體連接配接 Connection connection = RabbitMQUtil.getConnection(); //建立通信通道,相當于TCP的虛拟連接配接 Channel channel = connection.createChannel(); //開啟confirm監聽模式 channel.confirmSelect(); //添加監聽器的處理代碼 channel.addConfirmListener(new ConfirmListener() { //成功接收時處理的代碼 //第一個參數是消息在傳遞的過程中的唯一id,第二個參數是資料是否為批量接收(一般用不到) @Override public void handleAck(long l, boolean b) throws IOException { System.out.println("消息成功投遞,Tag:"+l); } //MQ拒收時處理的代碼 @Override public void handleNack(long l, boolean b) throws IOException { System.out.println("消息投遞被拒收,Tag:"+l); } }); //添加ReturnListener監聽器 /*channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { } });*/ channel.addReturnListener(new ReturnCallback() { @Override public void handle(Return r) { System.err.println("================="); System.err.println("Return編碼:"+r.getReplyCode()+",描述資訊:"+r.getReplyText()); System.err.println("交換機:"+r.getExchange()+",路由key:"+r.getRoutingKey()); System.err.println("消息主題為:"+new String(r.getBody())); System.err.println("================="); } }); Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator(); while(iterator.hasNext()){ Map.Entry<String, String> next = iterator.next(); //第一個參數,交換機 //第二個參數,相當于資料篩選的條件 //第三個參數,mandatory,為true時如果消息無法正常投遞到交換機則return回生産者,為false時直接将消息丢棄 //第四個參數,額外的設定屬性 //第五個參數,需要發送的消息的位元組數組 channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, next.getKey(), true, null, next.getValue().getBytes()); } //讓channel一直處于等待監聽的狀态 /*channel.close(); connection.close();*/ } }
- Exchange模式是生産者和消費者之間通信需要通過交換機(Exchange)綁定隊列(Queue)進行分發。
- Exchange對應fanout,direct,topic三種類型,
RabbitMQ介紹及各個工作模式的使用 - 上面所有的代碼可見RabbitMQ使用示例代碼