天天看點

RabbitMQ消息隊列學習筆記

作者:俏巴

概述

初次使用AMQP的過程中,總是容易被AMQP支援的消息模型繞暈,這裡結合官方的教程,對AMQP的消息模型做一個簡要總結,供參考。目前官方給出了六種消息發送/接收

模型

,這裡主要介紹前五種消息模型。

消息模型

1、Hello World

簡單模式就是生産者将消息發送到隊列、消費者從隊列中擷取消息。一條消息對應一個消費者。
RabbitMQ消息隊列學習筆記

示例代碼說明:

測試使用的是阿裡雲的AMQP消息隊列服務,具體的代碼配置過程可以參考阿裡雲官方 連結

工具類

import AMQP.AliyunCredentialsProvider;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

  public static Connection getConnection() throws Exception{
      // 初始化參數設定
      String AccessKey= "********";
      String SecretKey = "********";
      Long Uid = ********16617278L;
      String VhostName = "********";
      String host = "********16617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";

      // 定義連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      // 設定服務位址
      connectionFactory.setHost(host);
      // 端口
      connectionFactory.setPort(5672);
      // 設定使用者名、密碼、vhost
      connectionFactory.setCredentialsProvider(new AliyunCredentialsProvider(AccessKey,SecretKey,Uid));
      connectionFactory.setAutomaticRecoveryEnabled(true);
      connectionFactory.setNetworkRecoveryInterval(5000);
      connectionFactory.setVirtualHost(VhostName);

      // 通過工廠擷取連接配接對象
      Connection connection = connectionFactory.newConnection();
      return connection;
  }
}           

發送端示例代碼

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// hello world 單個消費者和接收者
public class Send {

    private final static String Queue_name = "helloDemo";
    public static void main(String[] args) throws Exception {
        // 擷取連接配接及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(Queue_name,false,false,false,null);
        //消息内容
        String message = "Hello World!";
        // 1、交換機,此處無   2、發送到那個隊列 3、屬性  4、消息内容
        channel.basicPublish("",Queue_name,null,message.getBytes());

        System.out.println("發送資料:" + message);

        // 關閉連接配接
        channel.close();
        connection.close();
    }
}           

消費端示例代碼

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver {
    private final static String Queue_name = "helloDemo";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel =  connection.createChannel();
        
        // 開始消費消息
        channel.basicConsume(Queue_name, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,進行業務邏輯處理
                System.out.println("message receive: ");
                System.out.println("Received: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        Thread.sleep(100000);
        channel.close();
        connection.close();
    }
}           

2、Work Queues

一條消息可以被多個消費者嘗試接收,最終隻有一個消費者能夠擷取到消息。
RabbitMQ消息隊列學習筆記

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 1:N  消費者各自接收消息
public class Sender {

    private final static String queueName = "workQueue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(queueName,false,false,false,null);
        for (int i = 0; i < 100; i++) {

            String message = "workqueues message " + i;
            channel.basicPublish("",queueName,null,message.getBytes());
            System.out.println("發送消息: " + message);

            Thread.sleep(10);//休眠
        }
        // 關閉連接配接
        channel.close();
        connection.close();
    }
}           

消費端示例代碼1

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver1 {

    private final static String queueName = "workQueue";
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(queueName,false,false,false,null);
        channel.basicQos(1);//告訴伺服器,在沒有确認目前消息完成之前,不要給我發新的消息。

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,進行業務邏輯處理
                System.out.println("message receive1: ");
                System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
            }
        };
        channel.basicConsume(queueName,false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
    }
}           

消費端示例代碼2

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Receiver2 {

    private final static String queueName = "workQueue";
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(queueName,false,false,false,null);
        channel.basicQos(1);//告訴伺服器,在沒有确認目前消息完成之前,不要給我發新的消息。

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,進行業務邏輯處理
                System.out.println("message receive2: ");
                System.out.println("Received2: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
            }
        };
        channel.basicConsume(queueName,false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
    }
}           

3、Publish/Subscribe

一條消息可以被多個消費者同時擷取,生産者将消息發送給交換機,消費者将自己對應的隊列注冊到交換機,當發送消息後,所有注冊的隊列的消費者都可以收到消息。
RabbitMQ消息隊列學習筆記

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {

    private static String Exchange_Name = "ExchangeDemo";//聲明交換機
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        // Producer 将消息發送到 Exchange ,由 Exchange 将消息路由到一個或多個 Queue 中(或者丢棄),Exchange 按照相應的 Binding 邏輯将消息路由到 Queue。
        channel.exchangeDeclare(Exchange_Name,"fanout");
        String message = "Exchange message demo";

        // 消息發送端交換機,如果此時沒有隊列綁定,則消息會丢失,因為交換機沒有存儲消息的能力
        channel.basicPublish(Exchange_Name,"",null,message.getBytes());
        System.out.println("發送消息: " + message);
    }
}           

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub1 {
    private static String Exchange_Name = "ExchangeDemo";//聲明交換機
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare("testqueue1",false,false,false,null);
        // 綁定到交換機
        channel.queueBind("testqueue1",Exchange_Name,"");
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{

                System.out.println("sub1: " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testqueue1",false,consumer);
    }
}           

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub2 {
    private static String Exchange_Name = "ExchangeDemo";//聲明交換機

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testqueue2",false,false,false,null);
        // 綁定到交換機
        channel.queueBind("testqueue2",Exchange_Name,"");
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{
                System.out.println("sub2: " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testqueue2",false,consumer);
    }
}           

4、Routing

生産者将消息發送到type為direct模式的交換機,消費者的隊列将自己綁定到路由的時候給自己綁定一個key,隻有生産者發送的消息key和綁定的key一緻時,消費者才能收到對應的消息。
RabbitMQ消息隊列學習筆記

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private static final String ExchangeName = "Rout_Change";//路由消息交換機

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(ExchangeName,"direct");
        channel.basicPublish(ExchangeName,"key3",null,"route 消息".getBytes());

        channel.close();
        connection.close();
    }
}           

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub {
    private static final String ExchangeName = "Rout_Change";//路由消息交換機
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare("testroutequeue1",false,false,false,null);

        // 綁定交換機
        // 參數3 标記 綁定到交換機的時候會有一個标記,隻有和它一樣标記的消息才會别消費到
        channel.queueBind("testroutequeue1",ExchangeName,"key1");
        channel.queueBind("testroutequeue1",ExchangeName,"key2");//綁定多個标記
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,進行業務邏輯處理
                System.out.println("message route receive1: ");
                System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
            }
        };
        channel.basicConsume("testroutequeue1",false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
    }
}           

5、Topics

該類型與 Direct 類型相似,隻是規則沒有那麼嚴格,可以模糊比對和多條件比對,即該類型 Exchange 使用 Routing key 模式比對和字元串比較的方式将消息路由至綁定的 Queue。

示例:

Routing key 為 use.stock 的消息會轉發給綁定比對模式為 .stock, use.stock, . 和 #.use.stock.# 的 Queue; 表是比對一個任意詞組,# 表示比對 0 個或多個詞組。
RabbitMQ消息隊列學習筆記

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private static String Exchange_Name = "Exchange_Topic";
    public static void main(String[] args) throws Exception{

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange類型為Topic 也就是通配符模式
        channel.exchangeDeclare(Exchange_Name,"topic");
        channel.basicPublish(Exchange_Name,"abc.1.2",null,"Topic 模式消息".getBytes());

        // 關閉通道和連接配接
        channel.close();
        connection.close();
    }
}           

接收端示例代碼

import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Sub {
    private static String ExchangeName = "Exchange_Topic";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare("topicqueue",false,false,false,null);
        // 綁定交換機
        // 參數3 标記 綁定到交換機的時候會有一個标記,隻有和它一樣标記的消息才會别消費到
        channel.queueBind("topicqueue",ExchangeName,"key.*");
        channel.queueBind("topicqueue",ExchangeName,"abc.#");//綁定多個标記
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,進行業務邏輯處理
                System.out.println("message route receive1: ");
                System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
            }
        };
        channel.basicConsume("topicqueue",false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
    }
}           

參考連結

RabbitMQ Tutorials Rabbitmq企業級消息隊列視訊課程