天天看點

RabbitMQ介紹及各個工作模式的使用

  1. RabbitMQ使用示例代碼
  2. 消息隊列(Message Queue):是一種跨程序的通信機制,主要用于上下文傳遞消息。
  3. MQ作為消息中間件,最主要的作用是對系統之間傳遞消息進行"解耦",MQ是資料可靠性的重要保障。
  4. MQ主要角色是消息代理伺服器。
  5. RabbitMQ的優勢
    1. RabbitMQ是世界上最火的開源消息代理伺服器;
    2. RabbitMQ幾乎支援所有的作業系統與程式設計語言;
    3. RabbitMQ提供了高并發、高可用的成熟方案,支援多種消息協定,易于部署與使用。
  6. RabbitMQ的應用場景
    1. 異構系統的資料傳遞
    2. 高并發程式的流量控制
    3. 基于P2P的程式
    4. 分布式系統的事務一緻性
    5. 高可靠性的交易系統
  7. RabbitMQ常用指令
    rabbitmq-server  #前台啟動服務
    rabbitmq-server -detached #背景啟動服務
    rabbitmqctl stop #停止服務,表示停止程序
    rabbitmqctl start_app  #啟動應用
    rabbitmqctl stop_app   #終止應用,不會關閉程序
    rabbitmqctl add_user username password  #建立使用者
    rabbitmqctl delete_user   #删除使用者
    rabbitmqctl change_password username newpassword  #修改使用者的密碼
    rabbitmqctl set_user_tags username tag   #授予使用者角色(tag)
    rabbitmqctl set_permissions -p / username '.*' '.*' '.*'  #設定使用者允許通路的vhost(虛拟主機),分别是配置權限、讀權限、寫權限
    
               
  8. 檢視網絡端口号
    netstat -tulpn
               
  9. RabbitMQ的4中tag
    1. 超級管理者administrator:

      可登入管理控制台,可檢視所有的資訊,并且對使用者、政策(policy)進行操作

    2. 監控者monitoring

      登入管理控制台,同時可以檢視rabbitmq節點相關的資訊(程序數、記憶體使用情況、磁盤使用情況等),但無法進行政策的定制

    3. 政策制定者policymaker

      可登入管理控制台,同時可以對policy進行管理,但無法檢視節點(執行個體)相關的資訊

    4. 普通管理者management

      僅可登入管理控制台,無法看到節點資訊,也無法對政策進行管理

  10. RabbitMQ的guest使用者隻能在本地登入,如果需要遠端登入RabbitMQ控制台,需要新建立一個使用者,并将該使用者的權限設定為administrator
  11. AMPQ:進階消息隊列協定,一個統一提供消息服務的應用層标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。基于此協定的用戶端與中間件可傳遞消息,并不受用戶端/中間件不同産品、不同的開發語言等條件的限制。Erlang中的實作有RabbitMQ等。
  12. 基本概念

    Producer 生産者,消息的提供者、

    Consumer 消費者,消息的使用者

    Message 消息,程式間的通信資料

    Queue 隊列,消息存放的容器,消息先進先出

    Vhost 虛拟主機,相當于MQ的"資料庫",用于存儲隊列

  13. 消息的狀态
    1. Ready:消息已經被傳入隊列,等待被消費
    2. Unacked:

      消息已經被消費者認領,但還未被确認"已被消費"

      Unacked狀态下消費者斷開連接配接,則消息回到Ready

      沒有确認,用戶端沒有斷開,則一直處于Unacked

    3. Finished:調用basicAck()方法後,表示消息已經被消費,從隊列中移除
  14. 第一次連接配接RabbitMQ
  15. 導入依賴
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>
               
  16. RabbitMQ生産者的代碼
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Procuder {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //ConnectionFactory建立MQ的實體連接配接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");  //ip位址
            connectionFactory.setPort(5672);         //端口
            connectionFactory.setUsername("guest");  //使用者名
            connectionFactory.setPassword("guest");  //密碼
            connectionFactory.setVirtualHost("/test"); //虛拟主機
    
            //TCP實體連接配接
            Connection connection = connectionFactory.newConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列
            //第一個參數,對列名稱
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare("helloworld", false, false, false, null);
    
            //需要發送的消息
            String content = "Hello World bb!";
    
            //第一個參數,交換機
            //第二個參數,隊列名稱
            //第三個參數,額外的設定屬性
            //第四個參數,需要發送的消息的位元組數組
            channel.basicPublish("", "helloworld", null, content.getBytes());
    
            channel.close();
            connection.close();
    
            System.out.println("資料發送成功");
        }
    }
               
  17. RabbitMQ消費者的代碼
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //ConnectionFactory建立MQ的實體連接配接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");  //ip位址
            connectionFactory.setPort(5672);         //端口
            connectionFactory.setUsername("guest");  //使用者名
            connectionFactory.setPassword("guest");  //密碼
            connectionFactory.setVirtualHost("/test"); //虛拟主機
    
            //TCP實體連接配接
            Connection connection = connectionFactory.newConnection();
    
            //建立通道
            Channel channel = connection.createChannel();
    
            //綁定消息隊列
            //第一個參數,對列名稱
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare("helloworld", false, false, false, null);
    
            //建立一個消息消費者
            //第一個參數,隊列名稱
            //第二個參數,是否自動确認收到消息,false表示手動編寫程式來确認消息,這是MQ推薦的做法
            //第三個參數,DefaultConsumer的實作類
            channel.basicConsume("helloworld", false, new Receiver(channel));
    
            //在消費者中不能關閉channel和connection
        }
    }
    
    class Receiver extends DefaultConsumer{
    
        private Channel channel;
    
        //重寫構造函數,channel通道對象需要從外部傳入,在handleDelivery中會用到
        public Receiver(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            /*super.handleDelivery(consumerTag, envelope, properties, body);*/
    
            String messageBody = new String(body);
            System.out.println("消費者接收到: " + messageBody);
    
            //簽收消息,确認消息
            //第一個參數,envelope.getDeliveryTag()擷取這個消息的TagId,是一個整數
            //第二個參數,false隻确認簽收目前的消息,true時,表示簽收該消費者所有未簽收的消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    
    }
               
  18. RabbitMQ連接配接工具類的封裝
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitMQUtil {
    
        private static ConnectionFactory connectionFactory;
    
        static {
            //ConnectionFactory建立MQ的實體連接配接
            connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");  //ip位址
            connectionFactory.setPort(5672);         //端口
            connectionFactory.setUsername("guest");  //使用者名
            connectionFactory.setPassword("guest");  //密碼
            connectionFactory.setVirtualHost("/test"); //虛拟主機
        }
    
        public static Connection getConnection() {
            Connection connection = null;
            try {
                connection = connectionFactory.newConnection();
            } catch (Exception e) {
                throw new RuntimeException(e);  //不需要顯式的聲明抛出
            }
            return connection;
        }
    }
               
  19. 優化後的生産者
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Procuder {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列
            //第一個參數,對列名稱  helloworld
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare(RabbitMQConsts.QUEUE_HELLO, false, false, false, null);
    
            //需要發送的消息
            String content = "Hello World bb!";
    
            //第一個參數,交換機
            //第二個參數,隊列名稱  helloworld
            //第三個參數,額外的設定屬性
            //第四個參數,需要發送的消息的位元組數組
            channel.basicPublish("", RabbitMQConsts.QUEUE_HELLO, null, content.getBytes());
    
            channel.close();
            connection.close();
    
            System.out.println("資料發送成功");
        }
    }
    
               
  20. 優化後的消費者
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通道
            Channel channel = connection.createChannel();
    
            //綁定消息隊列
            //第一個參數,對列名稱  helloworld
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare(RabbitMQConsts.QUEUE_HELLO, false, false, false, null);
    
            //建立一個消息消費者
            //第一個參數,隊列名稱  helloworld
            //第二個參數,是否自動确認收到消息,false表示手動編寫程式來确認消息,這是MQ推薦的做法
            //第三個參數,DefaultConsumer的實作類
            channel.basicConsume(RabbitMQConsts.QUEUE_HELLO, false, new Receiver(channel));
    
            //在消費者中不能關閉channel和connection
        }
    }
    
    class Receiver extends DefaultConsumer{
    
        private Channel channel;
    
        //重寫構造函數,channel通道對象需要從外部傳入,在handleDelivery中會用到
        public Receiver(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            /*super.handleDelivery(consumerTag, envelope, properties, body);*/
    
            String messageBody = new String(body);
            System.out.println("消費者接收到: " + messageBody);
    
            //簽收消息,确認消息
            //第一個參數,envelope.getDeliveryTag()擷取這個消息的TagId,是一個整數
            //第二個參數,false隻确認簽收目前的消息,true時,表示簽收該消費者所有未簽收的消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    
    }
               
  21. RabbitMQ的工作模式
    1. Hello World:最簡單的模式,一個生産者對應一個消費者
    2. Work queues:一個生産者生産的消息由多個消費者進行消費,一個消息隻能被一個消費者處理
    3. Publish/Subsribe:多了一個交換器,交消息按照一定的規則分發給多個消費者,每個消費者收到的消息是一模一樣的,一個消息可能被多個消費者處理
    4. Routing模式:交消息有選擇的分發給不同的消費者,不同的消費者收到的消息有可能不一樣,缺點是,需要對資料進行精準比對
    5. Topics:與Routing類似,可以定義一個表達式規則,模糊比對
    6. RPC:遠端調用
  22. Work queues工作隊列:

    建立一個工作隊列,它會發送一些耗時的任務給多個消費者。

    有多個消息的情況下,Work queue會将消息分發給不同的消費者,每個消費者都會接收到不同的消息,并且可以根據處理消息的速度來接收消息的數量,讓消費者程式發揮最大性能。

    特别适合在叢集環境中做異步處理,能最大程度的發揮每一台伺服器的性能。

  23. 将消費者的處理方式修改為處理完一條,再去隊裡裡面拿下一條資料的方法
    channel.basicQos(1);  //處理完一條消息再去隊列裡面取下一條消息
    channel.basicAck(envelope.getDeliveryTag(), false);  //調用basicAck的時候,才會去取下一條的資料
               
  24. Publish/Subsribe模式

    釋出/訂閱模式中,生産者不再直接與隊列進行綁定,而是将資料發送至"交換機Exchange"

    交換機Exchange用于将資料按某種規則送入與之綁定的隊列,進而供消費者使用。

    釋出/訂閱模式中,交換機将無差别的将所有的消息送入與之綁定的隊列,所有的消費者拿到的消息完全相同,交換機的類型被稱為fanout。

  25. 使用場景:釋出訂閱模式因為所有消費者獲得相同的消息,是以特别适合"資料提供商與應用商"
  26. 建立一個釋出訂閱模式下的交換機(類型為fanout)
    RabbitMQ介紹及各個工作模式的使用
  27. 生産者代碼示例
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 氣象局(模拟生産者)
     */
    public class WeatherBureau {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            String input = new Scanner(System.in).next();
    
            //第一個參數,交換機
            //第二個參數,隊列名稱,此處不需要
            //第三個參數,額外的設定屬性
            //第四個參數,需要發送的消息的位元組數組
            channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER, "", null, input.getBytes());
    
            channel.close();
            connection.close();
        }
    }
               
  28. 消費者一代碼示例
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * 百度(模拟消費者)
     */
    public class Baidu {
    
        public static void main(String[] args) throws IOException {
    
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列
            //第一個參數,對列名稱  helloworld
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare(RabbitMQConsts.QUEUE_BAIDU, false, false, false, null);
    
            //綁定隊列交換機
            //第一個參數,隊列名稱
            //第二個參數,交換機名稱
            //第三個參數,路由key,此處不需要
            channel.queueBind(RabbitMQConsts.QUEUE_BAIDU, RabbitMQConsts.EXCHANGE_WEATHER, "");
    
            //每次隻去一條消息進行消費
            channel.basicQos(1);
    
            channel.basicConsume(RabbitMQConsts.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("百度收到氣象資訊----》"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    
    }
               
  29. 消費者二代碼示例
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Sina {
    
        public static void main(String[] args) throws IOException {
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列
            //第一個參數,對列名稱  helloworld
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare(RabbitMQConsts.QUEUE_SINA, false, false, false, null);
    
            //綁定隊列交換機
            //第一個參數,隊列名稱
            //第二個參數,交換機名稱
            //第三個參數,路由key,此處不需要
            channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER, "");
    
            //每次隻去一條消息進行消費
            channel.basicQos(1);
    
            channel.basicConsume(RabbitMQConsts.QUEUE_SINA, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("新浪收到氣象資訊----》"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }
               
  30. 路由Routing模式:是在釋出訂閱模式上的變種
  31. 釋出訂閱模式是無條件的将所有消息分發給所有的隊列,路由模式則是交換機根據Routing key有條件的将資料篩選後分發給消費者隊列。
  32. 路由模式下的交換機被稱為direct
  33. 建立routing模式下的交換機(類型為direct)
    RabbitMQ介紹及各個工作模式的使用
  34. 路由模式下的生産者
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 氣象局(模拟生産者)
     */
    public class WeatherBureau {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            LinkedHashMap<String, String> area = new LinkedHashMap<>();
            area.put("china.shaanxi.xian.20190624", "中國陝西西安20190624天氣資料");
            area.put("china.shandong.qingdao.20190624", "中國山東青島20190624天氣資料");
            area.put("china.henan.zhengzhou.20190624", "中國河南鄭州20190624天氣資料");
            area.put("us.cal.la.20190624", "美國加州洛杉矶20190624天氣資料");
    
            area.put("china.shaanxi.xian.20190625", "中國陝西西安20190625天氣資料");
            area.put("china.shandong.qingdao.20190625", "中國山東青島20190625天氣資料");
            area.put("china.henan.zhengzhou.20190625", "中國河南鄭州20190625天氣資料");
            area.put("us.cal.la.20190625", "美國加州洛杉矶20190625天氣資料");
    
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator();
            while(iterator.hasNext()){
                Map.Entry<String, String> next = iterator.next();
                //第一個參數,交換機
                //第二個參數,相當于資料篩選的條件
                //第三個參數,額外的設定屬性
                //第四個參數,需要發送的消息的位元組數組
                channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, next.getKey(), null, next.getValue().getBytes());
            }
    
            channel.close();
            connection.close();
        }
    }
               
  35. 路由模式下的消費者
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Sina {
    
        public static void main(String[] args) throws IOException {
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列
            //第一個參數,對列名稱  helloworld
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare(RabbitMQConsts.QUEUE_SINA, false, false, false, null);
    
            //綁定隊列交換機
            //第一個參數,隊列名稱
            //第二個參數,交換機名稱
            //第三個參數,路由key,此處需要設定路由規則
            channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20190624");
            channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20190624");
            channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20190625");
            channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20190625");
    
            //每次隻去一條消息進行消費
            channel.basicQos(1);
    
            channel.basicConsume(RabbitMQConsts.QUEUE_SINA, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("新浪收到氣象資訊----》"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }
    /**
        接收到的消息:
            新浪收到氣象資訊----》中國河南鄭州20190624天氣資料
            新浪收到氣象資訊----》美國加州洛杉矶20190624天氣資料
            新浪收到氣象資訊----》中國河南鄭州20190625天氣資料
            新浪收到氣象資訊----》美國加州洛杉矶20190625天氣資料
    **/
               
  36. 主題模式Topic:
  37. 在Routing模式的基礎上提供了對Routing Key模糊比對的功能,可以簡化程式的編寫。
  38. 主題模式下,模糊比對表達式規則為:
    * 比對單個關鍵字
    # 比對所有關鍵字
               
  39. 主題模式下的交換機類型為topic
  40. 建立topic模式下的交換機(類型為topic):
    RabbitMQ介紹及各個工作模式的使用
  41. 主題模式下的生産者代碼示例
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 氣象局(模拟生産者)
     */
    public class WeatherBureau {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            LinkedHashMap<String, String> area = new LinkedHashMap<>();
            area.put("china.shaanxi.xian.20190624", "中國陝西西安20190624天氣資料");
            area.put("china.shandong.qingdao.20190624", "中國山東青島20190624天氣資料");
            area.put("china.henan.zhengzhou.20190624", "中國河南鄭州20190624天氣資料");
            area.put("us.cal.la.20190624", "美國加州洛杉矶20190624天氣資料");
    
            area.put("china.shaanxi.xian.20190625", "中國陝西西安20190625天氣資料");
            area.put("china.shandong.qingdao.20190625", "中國山東青島20190625天氣資料");
            area.put("china.henan.zhengzhou.20190625", "中國河南鄭州20190625天氣資料");
            area.put("us.cal.la.20190625", "美國加州洛杉矶20190625天氣資料");
    
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator();
            while(iterator.hasNext()){
                Map.Entry<String, String> next = iterator.next();
                //第一個參數,交換機
                //第二個參數,相當于資料篩選的條件
                //第三個參數,額外的設定屬性
                //第四個參數,需要發送的消息的位元組數組
                channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, next.getKey(), null, next.getValue().getBytes());
            }
    
            channel.close();
            connection.close();
        }
    }
               
  42. 主題模式的消費者示例1
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * 百度(模拟消費者)
     */
    public class Baidu {
    
        public static void main(String[] args) throws IOException {
    
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列
            //第一個參數,對列名稱  helloworld
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare(RabbitMQConsts.QUEUE_BAIDU, false, false, false, null);
    
            //綁定隊列交換機
            //第一個參數,隊列名稱
            //第二個參數,交換機名稱
            //第三個參數,路由key,此處需要設定路由規則
            channel.queueBind(RabbitMQConsts.QUEUE_BAIDU, RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, "us.#");
    
            //每次隻去一條消息進行消費
            channel.basicQos(1);
    
            channel.basicConsume(RabbitMQConsts.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("百度收到氣象資訊----》"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    
    }
               
  43. 主題模式的消費者示例2
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Sina {
    
        public static void main(String[] args) throws IOException {
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //建立隊列,聲明并建立一個隊列,如果隊列已經存在,則使用這個隊列
            //第一個參數,對列名稱  helloworld
            //第二個參數,是否持久話,false表示不持久化資料,MQ停掉後資料就會丢失
            //第三個參數,是否隊列私有化,false表示所有的消費者都可以通路,true表示隻有第一次擁有它的消費者才可以一直使用,其他消費者不能通路
            //第四個參數,是否自動删除,false連接配接停掉後不自動删除掉這個隊列
            //第五個參數,其他額外的參數
            channel.queueDeclare(RabbitMQConsts.QUEUE_SINA, false, false, false, null);
    
            //綁定隊列交換機
            //第一個參數,隊列名稱
            //第二個參數,交換機名稱
            //第三個參數,路由key,此處需要設定路由規則
            channel.queueBind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, "*.*.*.20190624");
    
            //解綁
            //channel.queueUnbind(RabbitMQConsts.QUEUE_SINA, RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, "*.*.*.20190624");
    
            //每次隻去一條消息進行消費
            channel.basicQos(1);
    
            channel.basicConsume(RabbitMQConsts.QUEUE_SINA, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("新浪收到氣象資訊----》"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }
               
  44. RabbitMQ消息确認機制:RabbitMQ在傳遞消息的過程中充當了代理人(Broker)的角色,生産者怎麼知道消息被正确投遞到了Broker了呢?
  45. RabbitMQ提供了監聽器(Listener)來接收消息的投遞狀态。
  46. 消息确認涉及兩種狀态Confirm與Return。
  47. Confirm代表生産者将消息送到Broker時産生的狀态,後續會出現兩種情況:
    1. ack 代表broker已經将資料接收
    2. nack 代表broker拒收消息,原因有多種,隊列已滿、限流、IO異常…
  48. Return代表消息被Broker正常接收(ack)後,但Broker沒有對應的隊列進行投遞時産生的狀态,消息被退回生産者。
  49. 上面的兩種狀态隻代表生産者與Broker之間消息投遞的情況。與消費者是否接收/确認消息無關。
  50. 監聽器的代碼示例
    import com.kangswx.rabbitmq.utils.RabbitMQConsts;
    import com.kangswx.rabbitmq.utils.RabbitMQUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 氣象局(模拟生産者)
     */
    public class WeatherBureau {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            LinkedHashMap<String, String> area = new LinkedHashMap<>();
            area.put("china.shaanxi.xian.20190624", "中國陝西西安20190624天氣資料");
            area.put("china.shandong.qingdao.20190624", "中國山東青島20190624天氣資料");
            area.put("china.henan.zhengzhou.20190624", "中國河南鄭州20190624天氣資料");
            area.put("us.cal.la.20190624", "美國加州洛杉矶20190624天氣資料");
    
            area.put("china.shaanxi.xian.20190625", "中國陝西西安20190625天氣資料");
            area.put("china.shandong.qingdao.20190625", "中國山東青島20190625天氣資料");
            area.put("china.henan.zhengzhou.20190625", "中國河南鄭州20190625天氣資料");
            area.put("us.cal.la.20190625", "美國加州洛杉矶20190625天氣資料");
    
            //TCP實體連接配接
            Connection connection = RabbitMQUtil.getConnection();
    
            //建立通信通道,相當于TCP的虛拟連接配接
            Channel channel = connection.createChannel();
    
            //開啟confirm監聽模式
            channel.confirmSelect();
            //添加監聽器的處理代碼
            channel.addConfirmListener(new ConfirmListener() {
                //成功接收時處理的代碼
                //第一個參數是消息在傳遞的過程中的唯一id,第二個參數是資料是否為批量接收(一般用不到)
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println("消息成功投遞,Tag:"+l);
                }
    
                //MQ拒收時處理的代碼
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("消息投遞被拒收,Tag:"+l);
                }
            });
    
            //添加ReturnListener監聽器
            /*channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    
                }
            });*/
            channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return r) {
                    System.err.println("=================");
                    System.err.println("Return編碼:"+r.getReplyCode()+",描述資訊:"+r.getReplyText());
                    System.err.println("交換機:"+r.getExchange()+",路由key:"+r.getRoutingKey());
                    System.err.println("消息主題為:"+new String(r.getBody()));
                    System.err.println("=================");
                }
            });
    
            Iterator<Map.Entry<String, String>> iterator = area.entrySet().iterator();
            while(iterator.hasNext()){
                Map.Entry<String, String> next = iterator.next();
                //第一個參數,交換機
                //第二個參數,相當于資料篩選的條件
                //第三個參數,mandatory,為true時如果消息無法正常投遞到交換機則return回生産者,為false時直接将消息丢棄
                //第四個參數,額外的設定屬性
                //第五個參數,需要發送的消息的位元組數組
                channel.basicPublish(RabbitMQConsts.EXCHANGE_WEATHER_TOPIC, next.getKey(), true, null, next.getValue().getBytes());
            }
    
            //讓channel一直處于等待監聽的狀态
            /*channel.close();
            connection.close();*/
        }
    }
               
  51. Exchange模式是生産者和消費者之間通信需要通過交換機(Exchange)綁定隊列(Queue)進行分發。
  52. Exchange對應fanout,direct,topic三種類型,
    RabbitMQ介紹及各個工作模式的使用
  53. 上面所有的代碼可見RabbitMQ使用示例代碼