炒雞辣雞說RabbitMQ
不求甚解
收取消息
@Service
@Slf4j
public class RabbitMqMessage {
private static final String EXCHANGE = "html";
private static final String CHECK_QUEUE = "check-queue";
@Autowired
private DoCheckTask doCheckTask;
@Bean
private Declarables declarables() {
Queue queue = new Queue(CHECK_QUEUE);
FanoutExchange exchange = new FanoutExchange(EXCHANGE);
return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange));
}
@RabbitListener(queues = CHECK_QUEUE)
public void receiverHtml(String urlContent) {
UrlContent urlContent1 = Json.toObject(urlContent, UrlContent.class);
doCheckTask.urlContentPriorityBlockingQueue.add(urlContent1);
}
}
發送消息
@Service
@Slf4j
@Configuration
public class RabbitMqMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 頁面内容廣播交換機
*/
private static final String HTML_EXCHANGE = "html";
@Bean
public Declarables declarables() {
FanoutExchange exchange = new FanoutExchange(HTML_EXCHANGE);
return new Declarables(exchange);
}
public void sendHtmlToMessageQueue(String urlContent) {
rabbitTemplate.convertAndSend(HTML_EXCHANGE, "", urlContent);
}
}
簡單來說,我們隻需要定義好消息隊列的交換器和隊列即可。交換器需要首先在你的管理頁面中定義好。必不可少的是,我們在配置檔案中定義好連接配接設定
spring.rabbitmq.host=
spring.rabbitmq.port=
spring.rabbitmq.username=
spring.rabbitmq.password=
當然,既然我們使用了消息隊列的注解,必不可少的,需要在依賴中安裝相關的包,這裡使用maven來管理包,是以相關的依賴為
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
是不是很簡單?如果你比較忙,隻要能用就行,那麼抄上面的代碼稍微再修改一下,就能滿足你的使用條件了。
還要幹嘛?
再來看看這段代碼,首先使用@Service注解,讓springboot在啟動的時候,spring容器會加載這個類,注冊為一個服務,然後@Bean來綁定我們的隊列和交換機。你可以暫時将消息隊列想象成帶有存儲功能的三層路由器。三層路由器具有交換機的功能,通過計算機網絡的知識我們知道,交換機的功能是限定在區域網路中,那麼使用同一個exchange都是同一個區域網路的(虛拟區域網路也是區域網路),是以我們在定義哪些隊列綁定哪些交換器的時候,也需要考慮業務消息的相關性,不能一股腦的全部放在同一個交換器下。
@Bean
private Declarables declarables() {
Queue queue = new Queue(CHECK_QUEUE); // 定義隊列對象:從哪個隊列中擷取資訊
FanoutExchange exchange = new FanoutExchange(EXCHANGE); // 定義交換器對象
return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange)); // 将兩者綁定起來
}
這段代碼,在項目啟動的時候,@Bean會告訴spring容器去執行這段代碼,并生成這樣的Declarables,也就是綁定成功了。然後我們隻需要定義監聽這個隊列的函數即可。
@RabbitListener(queues = CHECK_QUEUE)
public void receiverHtml(String urlContent) {
UrlContent urlContent1 = Json.toObject(urlContent, UrlContent.class);
doCheckTask.urlContentPriorityBlockingQueue.add(urlContent1);
}
這段代碼就是綁定了監聽隊列CHECK_QUEUE,當監聽到該隊列中有資料時,receiverHtml方法就會被調用,然後資料被放到urlContent中。
發送消息部分也一樣
@Bean
public Declarables declarables() {
FanoutExchange exchange = new FanoutExchange(HTML_EXCHANGE);
return new Declarables(exchange);
}
這裡也是注冊了一個交換器,為啥要在注冊一次?如果你是在一個項目中,那麼不需要再次聲明該exchange。
public void sendHtmlToMessageQueue(String urlContent) {
rabbitTemplate.convertAndSend(HTML_EXCHANGE, "", urlContent);
}
發送消息到交換器,直接使用rabbitTemplate實作即可。
求甚解
了解交換器和隊列
隊列
AnonymousQueue:代表一個匿名的,非持久的,排他的,自動删除隊列
Queue:預設的隊列
交換器
DirectExchange:它會把消息路由到那些 BindingKey RoutingKey 完全比對的隊列中。
FanoutExchange:廣播交換器:它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。
TopicExchange:主題交換器
HeadersExchange:不依賴于路由鍵的比對規則來路由消息,而是根據發送的消息内容中 headers 屬性進行比對。headers 類型的交換器性能會很差,而且也不實用,基本上不會看到它的存在。
CustomExchange:模糊比對
AbstractExchange:抽象交換器,作為基礎存在
如果隊列特别多怎麼辦
集中控制,在官網的demo中,有一種方案是将全部隊列和交換器注冊在一起
一共四個檔案,一個注冊bean的檔案叫RabbitMqConfig.java用于注冊所有的Bean。然後一個ConstNames用于存放所有的名字。一個sender和一個receiver即可。
就像這樣:
@Configuration
public class RabbitMqConfig {
@Bean
public Receiver receiver() {
return new Receiver();
}
@Bean
public DirectExchange exchange(){
return new DirectExchange(ConstNames.EXCHANGE);
}
@Bean
public Queue queue(){
return new Queue(ConstNames.QUEUE);
}
@Bean
public Binding bindingEvent(DirectExchange exchange,Queue queue){
return BindingBuilder.bind(queue).to(exchange).withQueueName();
}
@Bean
public xxxx xxxx(){
// 這裡再定義一個Sender即可
}
}
// Receiver.java
public class Receiver {
@RabbitListener(queues = ConstNames.QUEUE)
public void receiveEvent(String in) {
System.out.println(in);
}
}
// 管理名字
public class ConstNames{
public static final String EXCHANGE = "exchange";
public static final String QUEUE = "queue";
}