文章目錄
- RabbitMQ 初建 生産消費
-
- 1.環境準備(使用docker容器)
-
- 1)docker安裝
- 2)拉取RabbitMQ鏡像并啟動
- 2.java工程建立編寫
RabbitMQ 初建 生産消費
RabbitMQ:http://www.rabbitmq.com/
- 是一個開源的AMQP實作,伺服器端用Erlang語言編寫,支援多種用戶端,如:Python、Ruby、.NET、Java、C、用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不錯
- 缺點:使用Erlang開發,閱讀和修改源碼難度大
什麼是AMQP?
- AMQP(advanced message queuing protocol)在2003年時被提出,最早用于解決金融領不同平台之間的消息傳遞互動問題,就是是一種協定,相容JMS
- 更準确說的連結協定 binary- wire-level-protocol 直接定義網絡交換的資料格式,類似http
- 具體的産品實作比較多,RabbitMQ就是其中一種
特性:
- 獨立于平台的底層消息傳遞協定
- 消費者驅動消息傳遞
- 跨語言和平台的互用性、屬于底層協定
- 有5種交換類型direct,fanout,topic,headers,system
- 面向緩存的、可實作高性能、支援經典的消息隊列,循環,存儲和轉發
- 支援長周期消息傳遞、支援事務(跨消息隊列)
AMQP和JMS的主要差別:
1.AMQP可跨平台語言生産消費,java生産的消息可以Python進行消費,可以用HTTP進行類比,不關心實作接口的語言,JMS則必須是java寫的生産端和消費端。
2.AMQP的消息類型是Byt[],而JMS的消息類型是TextMessage/ObjectMessage/StreamMessage等。
1.環境準備(使用docker容器)
1)docker安裝
搞個虛拟機裝centos系統然後安裝docker
#更新yum
yum update
yum install epel-release -y
yum clean all
yum list
#安裝運作docker
yum install docker-io -y
systemctl start docker
#檢測安裝結果
docker info
systemctl start docker #運作Docker守護程序
systemctl stop docker #停止Docker守護程序
systemctl restart docker #重新開機Docker守護程序
#列舉全部 容器 :
docker ps -a
#列舉目前運作的容器:
docker ps
#檢查容器内部資訊:
docker inspect 容器名稱
#删除鏡像:
docker rmi IMAGE_NAME
#強制移除鏡像不管是否有容器使用該鏡像 增加 -f 參數
#删除前到停掉
#停止某個容器:
docker stop 容器名稱
#啟動某個容器:
docker start 容器名稱
#移除某個容器:
docker rm 容器名稱 (容器必須是停止狀态)
2)拉取RabbitMQ鏡像并啟動
#拉取鏡像
docker pull rabbitmq:management
docker run -d --hostname rabbit_host1 --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:management
#說明
-d 以守護程序方式在背景運作
-p 15672:15672 management 界面管理通路端口
-p 5672:5672 amqp 通路端口
--name:指定容器名
--hostname:設定容器的主機名,它會被寫到容器内的 /etc/hostname 和 /etc/hosts,作為容器主機IP的别名,并且将顯示在容器的bash中
-e 參數
RABBITMQ_DEFAULT_USER 使用者名
RABBITMQ_DEFAULT_PASS 密碼
4369 erlang 發現口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 間内部通信口
ip:15672通路管理界面
後續如果關閉虛拟機後再啟動服務就用
#1.查容器名
docker ps -a
#2.根據容器名啟動
docker start 容器名稱或容器id
如下圖:
2.java工程建立編寫
建立一個空的maven工程(我用的jdk1.8的)然後在pom裡導入引用包
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
導入所需包之後便可以寫一個組簡單的生産消費,根據官方樣例代碼
根據官方樣例改造成自己的
send.java
public class Send {
// 隊列名稱
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 連接配接資訊
factory.setHost("20.18.24.143");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("devTest");
factory.setPort(5672);
// 繼承
try (Connection connection = factory.newConnection();
// 建立信道
Channel channel = connection.createChannel()) {
/**
* 隊列名稱
* 持久化配置:mq重新開機後還在
* 是否獨占:隻能有一個消費者監聽隊列;當connection關閉是否删除隊列,一般是false,釋出訂閱是獨占
* 自動删除: 當沒有消費者的時候,自動删除掉,一般是false
* 其他參數
*
* 隊列不存在則會自動建立,如果存在則不會覆寫,是以此時的時候需要注意屬性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
Recv.java代碼
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 連接配接資訊
factory.setHost("20.18.24.143");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("devTest");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 消費
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// 一般固定,一般稱為會話名稱
System.out.println("consumerTag: "+consumerTag);
// 可以擷取交換機、路由鍵等資訊
System.out.println("envelope: "+envelope);
//
System.out.println("properties: "+properties);
//
System.out.println("body: "+new String(body,"utf-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
先啟動Recv消費者代碼再啟動生産者代碼Send
控制台列印如下:
[*] Waiting for messages. To exit press CTRL+C
consumerTag: amq.ctag-fwM_uUThzXI3cZBV6U6fvg
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=hello)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello World!
期初控制台隻列印到CTRL+C那裡,後面沒有,隻當生産者發送到channel中後消費者才有的消費,才有後面的日志。