天天看點

1、RabbitMQ的簡單使用

1、AMQP

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。基于此協定的用戶端與消息中間件可傳遞消息,并不受用戶端/中間件不同産品,不同的開發語言等條件的限制。Erlang中的實作有 RabbitMQ等。

2、RabbitMQ

1、Erlang語言

Erlang是一種通用的并發程式設計語言,它由喬·阿姆斯特朗(Joe Armstrong)在瑞典電信裝置制造商愛立信所轄的計算機科學研究室開發,目的是創造一種可以應付大規模開發活動的程式設計語言和運作環境。Erlang于1987年釋出正式版本,最早是愛立信擁有的私有軟體,經過十年的發展,于1998年發表開放源代碼版本。

Erlang是運作于虛拟機的解釋型語言,但是現在也包含有烏普薩拉大學高性能Erlang計劃(HiPE)[3]開發的原生代碼編譯器,自R11B-4版本開始,Erlang也支援腳本方式執行。在程式設計範型上,Erlang屬于多重典範程式設計語言,涵蓋函數式、并行及分布式。循序運作的Erlang是一個及早求值, 單次指派和動态類型的函數式程式設計語言。

2、消息隊列(Message Queue)

在計算機科學中,消息隊列(英語:Message queue)是一種程序間通信或同一程序的不同線程間的通信方式,軟體的貯列用來處理一系列的輸入,通常是來自使用者。消息隊列提供了異步的通信協定,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入裝置的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列互動。消息會儲存在隊列中,直到接收者取回它。

實際上,消息隊列常常儲存在連結清單結構中。擁有權限的程序可以向消息隊列中寫入或讀取消息。

目前,有很多消息隊列有很多開源的實作,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、RabbitMQ、IBM MQ、Apache Qpid和HTTPSQS。

3、RabbitMQ簡介

RabbitMQ是實作了進階消息隊列協定(AMQP)的開源消息代理軟體(亦稱面向消息的中間件)。RabbitMQ伺服器是用Erlang語言編寫的,而群集和故障轉移是建構在開放電信平台架構上的。所有主要的程式設計語言均有與代理接口通訊的用戶端庫。

RabbitMQ支援以下作業系統:

  • Windows
  • Linux/Unix
  • MAC OS X

RabbitMQ支援下列程式設計語言:

  • C# (using .net/c# client)
  • clojure (using Langohr)
  • erlang (using erlang client)
  • java (using java client)
  • javascript/node.js (using amqp.node)
  • perl (using Net::RabbitFoot)
  • python (using pika)
  • python-puka (using puka)
  • ruby (using Bunny)
  • ruby (using amqp gem)

RabbitMQ官網:http://www.rabbitmq.com/

3、RabbitMQ安裝

運用docker安裝rabbitmq

下載下傳安裝rabbitmq:docker pull registry.docker-cn.com/library/rabbitmq:-management
啟動rabbitmq:docker run -d -p : -p15672: --name myrabbitmq d6274c2217
:預設的用戶端連接配接的端口
:預設的web管理界面的端口
           

通路15672端口出現下面界面代表RabbitMQ安裝成功

1、RabbitMQ的簡單使用

預設賬号密碼都為guest

1、RabbitMQ的簡單使用

4、RabbitMQ使用

1、核心概念

Message :消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、 priority(相對于其他消息的優先權)、 delivery-mode(指出

該消息可能需要持久性存儲)等。

Publisher:消息的生産者,也是一個向交換器釋出消息的用戶端應用程式

Exchange:交換器,用來接收生産者發送的消息并将這些消息路由給伺服器中的隊列。Exchange有4種類型: direct(預設), fanout, topic, 和headers,不同類型的Exchange轉發消息的政策有所差別

Queue:消息隊列,用來儲存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接配接到這個隊列将其取走。

Binding:綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵将交換器和消息隊列連

接起來的路由規則,是以可以将交換器了解成一個由綁定構成的路由表。Exchange 和Queue的綁定可以是多對多的關系。

Connection:網絡連接配接,比如一個TCP連接配接。

Channel:信道,多路複用連接配接中的一條獨立的雙向資料流通道。信道是建立在真實的TCP連接配接内的虛拟連接配接, AMQP 指令都是通過信道發出去的,不管是釋出消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,是以引入了信道的概念,以複用一條 TCP 連接配接。

Consumer:消息的消費者,表示一個從消息隊列中取得消息的用戶端應用程式。

Virtual Host:虛拟主機,表示一批交換器、消息隊列和相關對象。虛拟主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和權限機制。 vhost 是 AMQP 概念的基礎,必須在連接配接時指定,RabbitMQ 預設的 vhost 是 / 。

2、幾種消息模型

1、RabbitMQ的簡單使用

2.1、簡單模式

1、RabbitMQ的簡單使用

​ 一個生産者,一個消費者

/**
     * 擷取連接配接
     * @return Connection
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定義連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.235");
        factory.setPort();
        //設定vhost
        factory.setVirtualHost("/tzb");
        factory.setUsername("test");
        factory.setPassword("123456");
        //通過工廠擷取連接配接
        Connection connection = factory.newConnection();
        return connection;
    }

    //建立隊列,發送消息
    public static void main(String[] args) throws Exception {
        //擷取連接配接
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //聲明建立隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //消息内容
        String message = "Hello World!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("發送消息:"+message);
        //關閉連接配接和通道
        channel.close();
        connection.close();
    }

    //消費者消費消息
    public static void main(String[] args) throws Exception {
        //擷取連接配接和通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明通道
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定義消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //監聽隊列
        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            //這個方法會阻塞住,直到擷取到消息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("接收到消息:"+message);
        }
    }
           

2.2、work模式

1、RabbitMQ的簡單使用

​ 一個生産者,多個消費者,每個消費者擷取到的消息唯一。

//消息生産者 
public static void main(String[] args) throws Exception {
        //擷取連接配接和通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "";
        for(int i = ; i<; i++){
            message = "" + i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("發送消息:"+message);
            Thread.sleep(i);
        }

        channel.close();
        connection.close();
    }

    //消費者1
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //同一時刻伺服器隻發送一條消息給消費端
        channel.basicQos();

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(QUEUE_NAME,false,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("recive1:"+message);
            Thread.sleep();
            //消息消費完給伺服器傳回确認狀态,表示該消息已被消費
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

    //生産者2
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos();

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("recive1:"+message);
            Thread.sleep();
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }
           

消息消費的兩種模式

1、 自動模式

​ 消費者從消息隊列擷取消息後,服務端就認為該消息已經成功消費。

2、 手動模式

​ 消費者從消息隊列擷取消息後,服務端并沒有标記為成功消費

​ 消費者成功消費後需要将狀态傳回到服務端

自動模式:

channel.basicConsume(QUEUE_NAME,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("recive1:"+message);
            Thread.sleep();
            //無需回報
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
           

手動模式:

channel.basicConsume(QUEUE_NAME,false,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("recive1:"+message);
            Thread.sleep();
            //消息消費完給伺服器傳回确認狀态,表示該消息已被消費
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
           

2.3、訂閱模式

1、RabbitMQ的簡單使用

​ 一個生産者發送的消息會被多個消費者擷取。

生産者:可以将消息發送到隊列或者是交換機。

消費者:隻能從隊列中擷取消息。

如果消息發送到沒有隊列綁定的交換機上,那麼消息将丢失。

每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。 fanout 交換器不處理路由鍵,隻是簡單的将隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。 fanout 類型轉發消息是最快的 。

public static final String EXCHANGE_NAME = "test_exchange_fanout";
    //生産者,發送消息到交換機
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明交換機 fanout:交換機類型 主要有fanout,direct,topics三種
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        System.out.println(message);
        channel.close();
        connection.close();
    }

    //消費者1
    public final static String QUEUE_NAME = "test_queue_exchange_1";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //綁定隊列到交換機上
        channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
        channel.basicQos();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

    //消費者2
    public final static String QUEUE_NAME = "test_queue_exchange_2";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //綁定隊列到交換機上
        channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
        channel.basicQos();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }
           

2.4、路由模式

1、RabbitMQ的簡單使用

1、 發送消息到交換機并且要指定路由key

2、 消費者将隊列綁定到交換機時需要指定路由key

是一種完全比對,隻有比對到的消費者才能消費消息

消息中的路由鍵( routing key)如果和 Binding 中的 binding key 一緻, 交換器就将消息發到對應的隊列中。路由鍵與隊列名完全比對,如果一個隊列綁定到交換機要求路由鍵為“ dog”,則隻轉發 routing key 标記為“ dog”的消息,不會轉發“ dog.puppy”,也不會轉發“ dog.guard”等等。它是完全比對、單點傳播的模式。

public static final String EXCHANGE_NAME = "test_exchange_direct";
    //生産者
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明交換機 fanout:交換機類型 主要有fanout,direct,topics三種
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME,"dog",null,message.getBytes());
        System.out.println(message);
        channel.close();
        connection.close();
    }

    //消費者1
    public final static String QUEUE_NAME = "test_queue_direct_1";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //綁定隊列到交換機上,并制定路由鍵為"dog"
        channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.routing.Send.EXCHANGE_NAME,"dog");
        channel.basicQos();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

    //消費者2
    public final static String QUEUE_NAME = "test_queue_direct_2";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //綁定隊列到交換機上,并制定路由鍵為"cat"
        channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.routing.Send.EXCHANGE_NAME,"cat");
        channel.basicQos();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }
           

2.5、通配符模式

1、RabbitMQ的簡單使用

topic交換器通過模式比對配置設定消息的路由鍵屬性,将路由鍵和某個模式進行比對,此時隊列需要綁定到一個模式上。它将路由鍵和綁定鍵的字元串切分成單詞,這些單詞之間用點隔開。它同樣也會識别兩個通配符:符号“#”和符号“”。#比對0個或多個單詞,比對一個單詞。如下圖所示:

1、RabbitMQ的簡單使用
生産者
public static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明交換機 topic:交換機類型 
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME,"dog.1",null,message.getBytes());
        System.out.println(message);
        channel.close();
        connection.close();
    }

//消費者1
public final static String QUEUE_NAME = "test_queue_topic_1";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //綁定隊列到交換機上,并制定路由鍵比對規則為"dog.*"
        channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.topics.Send.EXCHANGE_NAME,"dog.*");
        channel.basicQos();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

//消費者2
public final static String QUEUE_NAME = "test_queue_topic_2";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //綁定隊列到交換機上,并制定路由鍵比對規則為"#.1"
        channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.topics.Send.EXCHANGE_NAME,"#.1");
        channel.basicQos();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

//消費者3
public final static String QUEUE_NAME = "test_queue_topic_3";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //綁定隊列到交換機上,并制定路由鍵比對規則為"cat.#"
        channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.topics.Send.EXCHANGE_NAME,"cat.#");
        channel.basicQos();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }
           

結果:消費者1和消費者2可以收到消息,消費者3不能收到消息。