天天看點

SpringCloud之MQ筆記分享MQ異步通信

MQ異步通信

初始MQ

同步通信

優點:時效性較強,可以以及得到結果

Feign就屬于同步方式–問題:

  • 耦合問題
  • 性能下降(中間的等待時間)
  • 資源浪費
  • 級聯失敗

異步通信

優點

  • 耦合度低
  • 性能提升,吞吐量高
  • 故障隔離
  • 服務無強依賴,解決級聯失敗問題
  • 流量削峰

缺點

  • 依賴于broker的可靠性,安全性,吞吐能力
  • 架構複雜了,業務沒有明顯的流程先,不好追蹤管理

MQ常見架構

什麼是MQ:資訊隊列,存放消息的隊列,也就是時間驅動架構中的broker

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社群 Rabbit Apache 阿裡 Apache
開發語言 Erlang Java Java Scala&Java
協定支援 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協定 自定義協定
可用性 一般
單機吞吐量 一般 非常高
消息延遲 微秒級 毫秒級 毫秒級 毫秒以内
消息可靠性 一般 一般

推薦RabbitMQ

RabbitMQ快速入門

案例

釋出者

package cn.itcast.mq.helloworld;



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立連接配接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設定連接配接參數,分别是:主機名、端口号、vhost、使用者名、密碼
        factory.setHost("xxx.xx.xxx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxx");
        factory.setPassword("xxxxx");
        // 1.2.建立連接配接
        Connection connection = factory.newConnection();

        // 2.建立通道Channel
        Channel channel = connection.createChannel();

        // 3.建立隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.發送消息
        String message = "hello, rabbitmq1!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("發送消息成功:【" + message + "】");

        // 5.關閉通道和連接配接
        channel.close();
        connection.close();

    }
}
           

消費者

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連接配接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設定連接配接參數,分别是:主機名、端口号、vhost、使用者名、密碼
        factory.setHost("xxx.xxx.xx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        // 1.2.建立連接配接
        Connection connection = factory.newConnection();

        // 2.建立通道Channel
        Channel channel = connection.createChannel();

        // 3.建立隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.訂閱消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.處理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}
           
運作消費者後運作釋出者,就可以接收到釋出者的資訊(因為消費者的函數是異步,是以釋出者可以随時發,消費者一直等着)

SpringAMQP

什麼是AMQP:一種協定,API規範

SpringAMQP:底層是rabbitMQ

基礎實作

使用SpringAMQP實作Helllo World中的基礎消息隊列功能

1,引入依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

2,基礎配置

配置基礎配置并且工具類并測試

基礎配置

spring:
  rabbitmq:
    host: xxx.xx.xxx.xx
    port: 5672
    username: xxx
    password: xxx
    virtual-host: /
           

測試類

package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage(){
        String queueName = "simple.queue";
        String message = "hello,spring amqp";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}
           

測試結果

前面幾個是之前的,第三個和後面的是我自己設定的

SpringCloud之MQ筆記分享MQ異步通信

上述為釋出者,消費者需要:

1,配置yml檔案

2,定義一個類

這裡的核心是使用@Component自動裝配到Spring,使用RabbitListener進行隊列的監聽,得到傳回值msg就列印

package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消費者接收到消息:【"+msg+"】");
    }
}
           

WorkQueue模型

當一個消費者能力足夠強,而另一個比較弱,合理的處理方式應該是強的消費者處理更多的消息,而弱的處理少的(根據能力分工)

是以WorkQueue模型就是為了讓同一個隊列的消費者,能力強的處理更多的消息,核心在于prefetch控制預提取的資訊數量

生産者循環發送消息到simple.queue:發送消息

@Test
public void testWorkQueue() throws InterruptedException {
// 隊列名稱
String queueName = "simple.queue";
// 消息
String message = "hello, message__";
for (int i = 0; i < 50; i++) {
// 發送消息
rabbitTemplate.convertAndSend(queueName, message + i);
        // 避免發送太快
        Thread.sleep(20);
     }
}
           

消費者監聽:兩個消費者

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消費者1接收到消息:【" + msg + "】");
    Thread.sleep(25);
}
    
@RabbitListener(queues = "simple.queue") 
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消費者2接收到消息:【" + msg + "】");
Thread.sleep(100);
}

           

Work模型的使用:

  • 多個消費者綁定到一個隊列,同一條消息隻會被一個消費者處理
  • 通過設定prefetch來控制消費者預取的消息數量

交換機:exchange

exchange:exchange負責消息路由,而不是存儲,路由失敗則消息丢失

FanoutExchange的使用

建立兩個隊列,并進行綁定

package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
    //itcast.exchage
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("ylw.fanout");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
    //fanout.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}
           

Consumer聲明兩個消費者

@RabbitListener(queues = "fanout.queue1")

public void listenFanoutQueue1(String msg) 
{System.out.println("消費者1接收到Fanout消息:【" + msg + "】");}
@RabbitListener(queues = "fanout.queue2") 

public void listenFanoutQueue2(String msg) 
{System.out.println("消費者2接收到Fanout消息:【" + msg + "】");}
           

釋出者發送消息

@Test
public void testFanoutExchange() {    
    // 隊列名稱    
    String exchangeName = "itcast.fanout";    
    // 消息
    String message = "hello, everyone!";    
    // 發送消息,參數分别是:互動機名稱、RoutingKey(暫時為空)、消息     
    rabbitTemplate.convertAndSend(exchangeName, "", message);}