簡單介紹消息隊列
關于消息隊列,學過資料結構都知道隊列,而它則是基于隊列實作的一個消息中間件。消息隊列主要的作用是應用解耦,異步處理,流量削峰。按我的了解簡單來說消息隊列主要面向生産者,消費者,其中生産者隻需要關注于将資訊投遞給消息隊列,并不知道是哪個消費者或者何時進行消費,而消費者隻需要關注于消息隊列,從中取出消息進行消費。
RabbitMQ的簡單使用
RabbitMQ是實作AMQP協定并基于erlang語言進行開發的一個開源消息隊列,其中rabbitmq實作的AMQP 0-9-1模型如下圖
圖來源:http://rabbitmq.mr-ping.com
生産者将消息釋出給broker,也就是RabbitMQ,進入交托給Exchange(交換機),(其中有4種模式,分别是Direct,Fanout,Topic,Header,這裡不詳細解釋了),如果是非廣播模式則需要Routing key,根據routing key轉發到綁定了這個Exchange并符合routing key的queue中,就是路由轉發的意思。
安裝
windows平台的安裝:
因為RabbitMQ是基于erlang開發,是以需要配好erlang語言的環境,不然啟動rabbitmq會提示需要erlang語言環境。
下載下傳安裝erlang,以管理者身份運作,然後一直next即可 http://www.erlang.org/downloads
接下來下載下傳rabbitmq ,需要看清楚描述選擇和你安裝的erlang版本适配的rabbitmq版本下載下傳
https://www.rabbitmq.com/download.html
rabbitmq提供了一個背景管理的頁面,第一次使用需要配置下,進入rabbitmq的sbin目錄打開指令行執行下面的指令
rabbitmq-plugins enable rabbitmq_management
成功則用浏覽器通路localhost:15672,預設使用者:guest 預設密碼:guest,進去可以自己建立使用者
使用
接下來簡單寫個rabbitmq的小demo,用的是SpringBoot
首先是引用相關的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然後就是關于建立隊列,有兩種方式,分别是動态映射,靜态建立,前者是在項目中根據提供接口進行建立和配置參數,每次項目運作時會判斷rabbitmq中對應的隊列是否存在或者是否需要改變進行修改,後者則是在管理頁面進行建立配置,個人推薦第一種,就不需要每換了一個環境就要重新配置。
需要配置yml參數詳細 https://blog.csdn.net/en_joker/article/details/80103519
需要建立exchange,durable是指是否持久化,否則重新開機rabbitmq,隊列和其消息就會丢失
@Bean("deadLetterExchange")
public Exchange directExchange() {
return ExchangeBuilder.directExchange("DEADLETTER_EXCHANGE").durable(true).build();
}
建立隊列
@Bean("redirectQueue")
public Queue directQueue1() {
return QueueBuilder.durable("REDIRECT_QUEUE").build();
}
exchange與queue進行綁定
@Bean
public Binding directBinding1(@Qualifier("redirectQueue") Queue queue, @Qualifier("deadLetterExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("REDIRECT_QUEUE_KEY").noargs();
}
簡單配置完畢啟動項目,可在rabbitmq管理頁面看到已建立
生産者釋出消息 args為消息
rabbitTemplate.convertAndSend("DEADLETTER_EXCHANGE", "REDIRECT_QUEUE_KEY", args, messagePostProcessor);
配置消費者,隻需在方法上配置@RabbitListener(queues = "REDIRECT_QUEUE")
@RabbitListener(queues = "REDIRECT_QUEUE")
public void on(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true );
System.out.println("消費者消費資訊:"+new String(message.getBody()));
}
但是這樣配置的話隻是單線程的消費者,如果想多線程消費需要配置以下工廠消費者類。
接下測試一下,成功消費。
rabbitmq還提供确認機制,使得其過程更加可靠,確定消息的一緻性。