MQ異步通信
初始MQ
同步通信
優點:時效性較強,可以以及得到結果
Feign就屬于同步方式–問題:
- 耦合問題
- 性能下降(中間的等待時間)
- 資源浪費
- 級聯失敗
異步通信
優點
- 耦合度低
- 性能提升,吞吐量高
- 故障隔離
- 服務無強依賴,解決級聯失敗問題
- 流量削峰
缺點
- 依賴于broker的可靠性,安全性,吞吐能力
- 架構複雜了,業務沒有明顯的流程先,不好追蹤管理
MQ常見架構
什麼是MQ:資訊隊列,存放消息的隊列,也就是時間驅動架構中的broker
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社群 | Rabbit | Apache | 阿裡 | Apache |
開發語言 | Erlang | Java | Java | Scala&Java |
協定支援 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協定 | 自定義協定 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
推薦RabbitMQ
RabbitMQ快速入門
案例
釋出者
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連接配接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連接配接參數,分别是:主機名、端口号、vhost、使用者名、密碼
factory.setHost("xxx.xx.xxx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xxx");
factory.setPassword("xxxxx");
// 1.2.建立連接配接
Connection connection = factory.newConnection();
// 2.建立通道Channel
Channel channel = connection.createChannel();
// 3.建立隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.發送消息
String message = "hello, rabbitmq1!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("發送消息成功:【" + message + "】");
// 5.關閉通道和連接配接
channel.close();
connection.close();
}
}
消費者
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連接配接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連接配接參數,分别是:主機名、端口号、vhost、使用者名、密碼
factory.setHost("xxx.xxx.xx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xxx");
factory.setPassword("xxx");
// 1.2.建立連接配接
Connection connection = factory.newConnection();
// 2.建立通道Channel
Channel channel = connection.createChannel();
// 3.建立隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
運作消費者後運作釋出者,就可以接收到釋出者的資訊(因為消費者的函數是異步,是以釋出者可以随時發,消費者一直等着)
SpringAMQP
什麼是AMQP:一種協定,API規範
SpringAMQP:底層是rabbitMQ
基礎實作
使用SpringAMQP實作Helllo World中的基礎消息隊列功能
1,引入依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2,基礎配置
配置基礎配置并且工具類并測試
基礎配置
spring:
rabbitmq:
host: xxx.xx.xxx.xx
port: 5672
username: xxx
password: xxx
virtual-host: /
測試類
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage(){
String queueName = "simple.queue";
String message = "hello,spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
}
測試結果
前面幾個是之前的,第三個和後面的是我自己設定的

上述為釋出者,消費者需要:
1,配置yml檔案
2,定義一個類
這裡的核心是使用@Component自動裝配到Spring,使用RabbitListener進行隊列的監聽,得到傳回值msg就列印
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消費者接收到消息:【"+msg+"】");
}
}
WorkQueue模型
當一個消費者能力足夠強,而另一個比較弱,合理的處理方式應該是強的消費者處理更多的消息,而弱的處理少的(根據能力分工)
是以WorkQueue模型就是為了讓同一個隊列的消費者,能力強的處理更多的消息,核心在于prefetch控制預提取的資訊數量
生産者循環發送消息到simple.queue:發送消息
@Test
public void testWorkQueue() throws InterruptedException {
// 隊列名稱
String queueName = "simple.queue";
// 消息
String message = "hello, message__";
for (int i = 0; i < 50; i++) {
// 發送消息
rabbitTemplate.convertAndSend(queueName, message + i);
// 避免發送太快
Thread.sleep(20);
}
}
消費者監聽:兩個消費者
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消費者1接收到消息:【" + msg + "】");
Thread.sleep(25);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消費者2接收到消息:【" + msg + "】");
Thread.sleep(100);
}
Work模型的使用:
- 多個消費者綁定到一個隊列,同一條消息隻會被一個消費者處理
- 通過設定prefetch來控制消費者預取的消息數量
交換機:exchange
exchange:exchange負責消息路由,而不是存儲,路由失敗則消息丢失
FanoutExchange的使用
建立兩個隊列,并進行綁定
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
//itcast.exchage
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("ylw.fanout");
}
//fanout.queue1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
//fanout.queue2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
Consumer聲明兩個消費者
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg)
{System.out.println("消費者1接收到Fanout消息:【" + msg + "】");}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg)
{System.out.println("消費者2接收到Fanout消息:【" + msg + "】");}
釋出者發送消息
@Test
public void testFanoutExchange() {
// 隊列名稱
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
// 發送消息,參數分别是:互動機名稱、RoutingKey(暫時為空)、消息
rabbitTemplate.convertAndSend(exchangeName, "", message);}