天天看點

一、RabbitMQ 初建 生産消費RabbitMQ 初建 生産消費

文章目錄

  • RabbitMQ 初建 生産消費
    • 1.環境準備(使用docker容器)
      • 1)docker安裝
      • 2)拉取RabbitMQ鏡像并啟動
    • 2.java工程建立編寫

RabbitMQ 初建 生産消費

RabbitMQ:http://www.rabbitmq.com/
  • 是一個開源的AMQP實作,伺服器端用Erlang語言編寫,支援多種用戶端,如:Python、Ruby、.NET、Java、C、用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不錯
  • 缺點:使用Erlang開發,閱讀和修改源碼難度大
什麼是AMQP?
  • AMQP(advanced message queuing protocol)在2003年時被提出,最早用于解決金融領不同平台之間的消息傳遞互動問題,就是是一種協定,相容JMS
  • 更準确說的連結協定 binary- wire-level-protocol 直接定義網絡交換的資料格式,類似http
  • 具體的産品實作比較多,RabbitMQ就是其中一種

特性:

  • 獨立于平台的底層消息傳遞協定
  • 消費者驅動消息傳遞
  • 跨語言和平台的互用性、屬于底層協定
  • 有5種交換類型direct,fanout,topic,headers,system
  • 面向緩存的、可實作高性能、支援經典的消息隊列,循環,存儲和轉發
  • 支援長周期消息傳遞、支援事務(跨消息隊列)

AMQP和JMS的主要差別:

1.AMQP可跨平台語言生産消費,java生産的消息可以Python進行消費,可以用HTTP進行類比,不關心實作接口的語言,JMS則必須是java寫的生産端和消費端。

2.AMQP的消息類型是Byt[],而JMS的消息類型是TextMessage/ObjectMessage/StreamMessage等。

1.環境準備(使用docker容器)

1)docker安裝

搞個虛拟機裝centos系統然後安裝docker

#更新yum
yum update
yum install epel-release -y
yum clean all
yum list
           
#安裝運作docker
yum install docker-io -y
systemctl start docker
           
#檢測安裝結果
docker info
systemctl start docker     #運作Docker守護程序
systemctl stop docker      #停止Docker守護程序
systemctl restart docker   #重新開機Docker守護程序
           
#列舉全部 容器 : 
docker ps -a

#列舉目前運作的容器:
docker ps

#檢查容器内部資訊:
docker inspect 容器名稱

#删除鏡像:
docker rmi IMAGE_NAME
#強制移除鏡像不管是否有容器使用該鏡像 增加 -f 參數
#删除前到停掉

#停止某個容器:
docker stop 容器名稱

#啟動某個容器:
docker start 容器名稱

#移除某個容器: 
docker rm 容器名稱 (容器必須是停止狀态)
           

2)拉取RabbitMQ鏡像并啟動

#拉取鏡像
docker pull rabbitmq:management

docker run -d --hostname rabbit_host1 --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:management

#說明
-d 以守護程序方式在背景運作
-p 15672:15672 management 界面管理通路端口
-p 5672:5672 amqp 通路端口
--name:指定容器名
--hostname:設定容器的主機名,它會被寫到容器内的 /etc/hostname 和 /etc/hosts,作為容器主機IP的别名,并且将顯示在容器的bash中

-e 參數
  RABBITMQ_DEFAULT_USER 使用者名
  RABBITMQ_DEFAULT_PASS 密碼
           

4369 erlang 發現口

5672 client 端通信口

15672 管理界面 ui 端口

25672 server 間内部通信口

ip:15672通路管理界面

後續如果關閉虛拟機後再啟動服務就用

#1.查容器名
docker ps -a
#2.根據容器名啟動
docker start 容器名稱或容器id
           

如下圖:

一、RabbitMQ 初建 生産消費RabbitMQ 初建 生産消費

2.java工程建立編寫

建立一個空的maven工程(我用的jdk1.8的)然後在pom裡導入引用包

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <skipTests>true</skipTests>
</properties>
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.10.0</version>
    </dependency>
</dependencies>
           

導入所需包之後便可以寫一個組簡單的生産消費,根據官方樣例代碼

一、RabbitMQ 初建 生産消費RabbitMQ 初建 生産消費
一、RabbitMQ 初建 生産消費RabbitMQ 初建 生産消費

根據官方樣例改造成自己的

send.java

public class Send {

    // 隊列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 連接配接資訊
        factory.setHost("20.18.24.143");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("devTest");
        factory.setPort(5672);

        // 繼承
        try (Connection connection = factory.newConnection();
             // 建立信道
             Channel channel = connection.createChannel()) {
            /**
             * 隊列名稱
             * 持久化配置:mq重新開機後還在
             * 是否獨占:隻能有一個消費者監聽隊列;當connection關閉是否删除隊列,一般是false,釋出訂閱是獨占
             * 自動删除: 當沒有消費者的時候,自動删除掉,一般是false
             * 其他參數
             *
             * 隊列不存在則會自動建立,如果存在則不會覆寫,是以此時的時候需要注意屬性
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}
           

Recv.java代碼

public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 連接配接資訊
        factory.setHost("20.18.24.143");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("devTest");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 消費
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                // 一般固定,一般稱為會話名稱
                System.out.println("consumerTag: "+consumerTag);
                // 可以擷取交換機、路由鍵等資訊
                System.out.println("envelope: "+envelope);
                //
                System.out.println("properties: "+properties);
                //
                System.out.println("body: "+new String(body,"utf-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}
           

先啟動Recv消費者代碼再啟動生産者代碼Send

控制台列印如下:

[*] Waiting for messages. To exit press CTRL+C
consumerTag: amq.ctag-fwM_uUThzXI3cZBV6U6fvg
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=hello)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello World!
           

期初控制台隻列印到CTRL+C那裡,後面沒有,隻當生産者發送到channel中後消費者才有的消費,才有後面的日志。