天天看點

RabbitMq簡介及使用

1.Message Queue(MQ)

消息隊列Message Queue
是一種跨程序的通信機制,用于在系統之間進行傳遞消息。
MQ作為消息中間件,可以進行異步處理請求,進而減少請求響應時間和解耦。
           

2.消息隊列的使用場景

  • 應用解耦:

    多應用間通過消息隊列對同一消息進行處理,避免調用接口失敗導緻整個過程失敗;

    系統間消息傳遞通過MQ進行解耦,“消息隊列”是在消息的傳輸過程中儲存消息的容器

    RabbitMq簡介及使用
  • 異步處理:

    應用間并發處理消息,相比串行處理,減少處理時間;

    比如使用者為了使用某個應用,進行注冊,系統需要發送注冊郵件,驗證短信

    RabbitMq簡介及使用
  • 限流削峰:

    廣泛應用于秒殺或搶購活動中,避免流量過大導緻處理請求阻塞的情況

    RabbitMq簡介及使用

3.RabbitMQ簡介

RabbitMQ是支援多種消息協定,易于部署和使用的開源消息代理伺服器,用于在分布式系統中存儲轉發消息
由以高性能、健壯以及可伸縮性出名的Erlang語言編寫;提供了成熟的高并發,高可用的解決方案
           
  • 可以根據實際業務情況動态地擴充叢集節點
  • 在叢集中的機器上設定鏡像,使得在部分節點出現問題的情況下仍然可用
  • 支援多種用戶端語言,如:Python、Ruby、.NET、Java等
  • RabbitMQ提供了一個易用的使用者界面,使得使用者可以監控和管理消息、叢集中的節點等。

4.Window環境下安裝與使用RabbitMQ

  • 安裝Erlang程式運作環境

    下載下傳位址http://www.erlang.org/downloads

  • 安裝RabbitMQ伺服器

    下載下傳位址http://www.rabbitmq.com/

    啟動RabbitMQ服務

  • 激活RabbitMQ管理控制台

    cd sbin

    rabbitmq-plugins.bat enable rabbitmq_management

  • 通過浏覽器進行通路

    http://localhost:15672/

    使用者名:guest 密碼:guest

5.Linux環境下安裝

  • 下載下傳Erlang運作環境RPM包

    https://www.erlang-solutions.com/resources/download.html

  • 下載下傳RabbitMQ伺服器安裝包

    http://www.rabbitmq.com/install-rpm.html#downloads

  • 安裝rpm包

    rpm -ivh --nodeps esl-erlang_21.2.6-1_centos_7_amd64.rpm

    rpm -ivh --nodeps rabbitmq-server-3.7.13-1.el7.noarch.rpm

  • 啟用服務

    rabbitmq-server

  • 啟用控制台

    rabbitmq-plugins enable rabbitmq_management

6.RabbitMQ相關概念

  • Producer

    生産者,即消息的提供者

  • Consumer

    消費者,即消息的使用者

  • Message

    消息,即程序之間進行通信的資料

  • Queue

    隊列,即消息存放的容器,消息以先進先出的方式進行存儲

  • Vhost

    虛拟主機,用于存儲消息隊列

7. Java 用戶端通路RabbitMQ

  • maven工程的pom檔案中添加依賴
<dependency> 
		<groupId>com.rabbitmq</groupId>
		<artifactId>amqp-client</artifactId>
 		<version>5.7.3</version>
 	</dependency> 
           
  • 建立消息生産者
  • 建立消息消費者

8.RabbitMQ消息的狀态

  • Ready

    消息已經被送到隊列,等待被消費

  • Unacked

    消息已經被消費者認領,但是還沒有被确認“已經被消費”

  • Unacked狀态下消費者斷開連接配接則消息回到“Ready”

    沒有确認,但是用戶端也沒有斷開連接配接則一直處于“Unacked”

  • Finished

    調用basicAsk()方法後,表示消息已經被消費,從隊列中移除

9. RabbitMQ的工作模式

  • Simple簡單模式

    一個生産者,一個消費者

    RabbitMq簡介及使用
  • Work工作模式
一個生産者,多個消費者,每個消費者擷取到的消息唯一
在多個消息的情況下,WorkQueue會将消息分派給不同的消費者,每個消費者會接收到不同
的消息,并且可以根據消息的處理速度來接受不同數量的消息,進而讓消費者程式發揮最大
的性能
适合在叢集的環境中做異步處理,最大發揮每一台伺服器的性能
           
RabbitMq簡介及使用
  • publish/subscribe釋出訂閱模式:
一個生産者發送的消息會被多個消費者擷取
釋出訂閱模式中生産者不在直接與隊列進行綁定,而是将資料發送給交換機(Exchan{ge)
交換機Exchage負責将資料按照某種規則送入與之綁定的隊列,進而供消費者使用
該模式下交換機的類型為扇形交換中心(Fanout exchange)
           
RabbitMq簡介及使用
  • Routing 路由模式
釋出訂閱模式是交換機無條件将所有消息分發給綁定的隊列;而路由模式則是根據routing
key有條件的将資料篩選後分發給綁定的隊列。
發送消息到交換機時要指定路由key ,消費者将隊列綁定到交換機時也需要指定路由key
路由模式下交換機的類型是直接交換中心 (Direct exchange)
           
RabbitMq簡介及使用
  • Topic主題模式
主題模式是在Routing模式比對基礎上,提供了對RouteKey模糊比對的功能,可以簡化編
程,主題模式下交換機的類型為主題交換中心(Topic exchange)
主題模式下,模糊比對表達式規則為
*比對單個關鍵字
#比對所有關鍵字
           
RabbitMq簡介及使用

10.RabbitMQ消息确認機制

  • RabbitMQ提供了監聽器來接收消息投遞的狀态
  • 消息确認涉及兩種狀态:
Confirm代表生産者将消息送達了Broker時産生的狀态,後續會出現兩種情況:
	ack代表Broker已經将資料接收
	nack代表Broker未接收到消息
Return代表消息被正常接收,但Broker沒有對應的隊列進行投遞,消息被退回給生産者的狀
态
上述狀态表示生産者與Broker之間的消息投遞情況,與消費者是否接收/确認消息無關
           

11.SpringBoot整合RabbitMQ

11.1生産者端
  • 導入依賴
<dependency> 
		<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
 	</dependency> 
           
  • 配置檔案
spring: 
	rabbitmq: 
		username: root 
		password: root 
		host: 192.168.8.105 
		port: 5672 
		virtual-host: /myvhost 
		publisher-returns: true #開啟消息傳回機制 
		publisher-confirm-type: correlated #消息确認 
		template: 
			mandatory: true #将消息退回給生産者
           
  • 生産者端,配置MQ
@Configuration 
public class RabbitMQConfig { 
//開啟确認、傳回 
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { 
	@Override 
	public void confirm(CorrelationData correlationData, boolean b, String s) { 
	System.out.println("消息的ID:"+correlationData.getId()); 
	if(!b){ 
	System.out.println("消息拒收的原因:"+s); 
	} 
  }
};

RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { 
	@Override 
	public void returnedMessage(Message message, int i, String s, String s1, String s2) { 
		System.out.println("=================="); 
		System.out.println("傳回的錯誤編碼:"+i); System.out.println("傳回的錯誤描述:"+s); 
		System.out.println("交換機:"+s1); 
		System.out.println("路由key:"+s2); 
		System.out.println("消息主體:"+new String(message.getBody())); 
		System.out.println("=================="); 
	} 
};

//配置交換機 
@Bean 
public TopicExchange orderExchange(){ 
	return new TopicExchange("order_exchange"); 
}
@Bean 
public RabbitTemplate rabbitTemplate(ConnectionFactory factory){ 
	RabbitTemplate template = new RabbitTemplate(factory); 
	template.setMessageConverter(messageConverter()); 
	template.setMandatory(true);//強制退回給生成者 
	template.setConfirmCallback(confirmCallback); 
	template.setReturnCallback(returnCallback); 
	return template; 
  }
}
           
  • 注入RabbitTemplate
@Component 
public class OrderProducer { 
	@Autowired 
	private RabbitTemplate template; 
	public void sendOrder(String exchange, String routingKey, OrderDTO orderDTO){
		CorrelationData data = new CorrelationData(orderDTO.getOrderSn()); 
		//參數data:是發送消息的附加資料,用來自定以消息的ID 
		template.convertAndSend(exchange,routingKey,orderDTO,data); }
           
11.2消費者端
  • 添加依賴
  • 配置檔案
spring: 
	rabbitmq: 
		username: root 
		password: root 
		virtual-host: /myvhost 
		port: 5672 
		host: 192.168.8.105 l
		istener: 
			simple: 
				acknowledge-mode: manual #手動簽收
           
  • 配置MQ
@Configuration 
public class RabbitMQConfig implements RabbitListenerConfigurer { 
	@Override 
	public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { 
		registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); 
	}
	@Bean 
	public MessageHandlerMethodFactory messageHandlerMethodFactory(){ 
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); 
		factory.setMessageConverter(mappingJackson2MessageConverter()); 
		return factory;
	}
	//消息反序列化轉換器 
	@Bean 
	public MappingJackson2MessageConverter mappingJackson2MessageConverter(){
		return new MappingJackson2MessageConverter(); 
	} 
}

           
  • 綁定隊列到交換機,監聽隊列處理消息
@Component 
public class OrderConsumer { 
	@Bean 
	public Queue orderQueue(){ 
		return new Queue("order_queue",true,false,false); 
	}
	@Bean 
	public TopicExchange topicExchange(){ 
		return new TopicExchange("order_exchange"); 
	}//綁定對列到交換機
	@Bean 
	public Binding binding(){ 
		return BindingBuilder.bind(orderQueue()).to(topicExchange()).with("order.#"); 
	}
	@RabbitListener(queues = "order_queue") 
	@RabbitHandler 
	public void processOrder(@Payload OrderDTO orderDTO, Channel channel,@Headers Map header){ 
		System.out.println("訂單消費者處理訂單消息:"+orderDTO.getOrderSn()+"-- ->"+orderDTO.getCreateTime()); 
		Long tag = (Long)header.get(AmqpHeaders.DELIVERY_TAG); 
		try {
			channel.basicAck(tag,false); 
		} catch (IOException e) { 
			e.printStackTrace(); 
		}
	} 
}