天天看點

何為消息隊列

1.系統間通信技術

一般來說,大型應用通常會被拆分稱多個子系統,這些子系統可能會部署在多台機器上,也可能隻是一台機器的多個程序中,這樣的應用就是分布式應用。而分布式應用的子系統之間并不是完全獨立的,它們需要互相通信來共同完成某個功能,這就涉及系統間通信了。

目前業界通常有兩種方式來實作系統間通信,其中一種是基于遠端過程調用的方式,也就是我們常說的RPC調用;另外一種是基于消息隊列的方式;(後續單獨加RPC和消息隊列的差別及各自的優劣勢)

2.為何要用消息隊列

消息隊列的主要應用場景:異步、解耦、削峰;

  1. 異步

    一些非必要的業務邏輯以同步的方式運作,太耗費時間。将消息寫入消息隊列,非必要的業務邏輯以異步的方式運作,加快響應速度。

    場景說明:使用者注冊後,需要發注冊郵件和注冊短信。傳統的做法有兩種 1.串行的方式;2.并行方式

    a、串行方式:将注冊資訊寫入資料庫成功後,發送注冊郵件,再發送注冊短信。以上三個任務全部完成後,傳回給用戶端。

    何為消息隊列
    b、并行方式:将注冊資訊寫入資料庫成功後,發送注冊郵件的同時,發送注冊短信。以上三個任務完成後,傳回給用戶端。與串行的差别是,并行的方式可以提高處理的時間。
    何為消息隊列
    假設三個業務節點每個使用50毫秒,不考慮網絡等其他開銷,則串行方式的時間是150毫秒,并行的時間可能是100毫秒。

因為CPU在機關時間内處理的請求數是一定的,假設CPU1秒内吞吐量是100次。則串行方式1秒内CPU可處理的請求量是7次(1000/150)。并行方式處理的請求量是10次(1000/100)

小結:如以上案例描述,傳統的方式系統的性能(并發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?

引入消息隊列,将不是必須的業務邏輯,異步處理。改造後的架構如下:

何為消息隊列

按照以上約定,使用者的響應時間相當于是注冊資訊寫入資料庫的時間,也就是50毫秒。注冊郵件,發送短信寫入消息隊列後,直接傳回,是以寫入消息隊列的速度很快,基本可以忽略,是以使用者的響應時間可能是50毫秒。是以架構改變後,系統的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了兩倍。

2. 解耦

系統間耦合性太強,彼此強依賴,互相影響。

場景說明:使用者下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的接口。如下圖:

何為消息隊列

傳統模式的缺點:假如庫存系統無法通路,則訂單減庫存将失敗,進而導緻訂單失敗,訂單系統與庫存系統耦合。

如何解決以上問題呢?引入應用消息隊列後的方案,如下圖:

何為消息隊列

訂單系統:使用者下單後,訂單系統完成持久化處理,将消息寫入消息隊列,傳回使用者訂單下單成功。

庫存系統:訂閱下單的消息,采用拉/推的方式,擷取下單資訊,庫存系統根據下單資訊,進行庫存操作。

假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入消息隊列就不再關心其他的後續操作了。實作訂單系統與庫存系統的應用解耦。

3. 削峰

流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。

應用場景:秒殺活動,一般會因為流量過大,導緻流量暴增,應用挂掉。為解決這個問題,一般需要在應用前端加入消息隊列。

a、可以控制活動的人數

b、可以緩解短時間内高流量壓垮應用

何為消息隊列

使用者的請求,伺服器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接抛棄使用者請求或跳轉到錯誤頁面。秒殺業務根據消息隊列中的請求資訊,再做後續處理。

每天 0:00 到 12:00,A 系統風平浪靜,每秒并發請求數量就 50 個。結果每次一到 12:00 ~ 13:00 ,每秒并發請求數量突然會暴增到 5k+ 條。但是系統是直接基于 MySQL 的,大量的請求湧入 MySQL,每秒鐘對 MySQL 執行約 5k 條 SQL。

一般的 MySQL,扛到每秒 2k 個請求就差不多了,如果每秒請求到 5k 的話,可能就直接把 MySQL 給打死了,導緻系統崩潰,使用者也就沒法再使用系統了。

但是高峰期一過,到了下午的時候,就成了低峰期,可能也就 1w 的使用者同時在網站上操作,每秒中的請求數量可能也就 50 個請求,對整個系統幾乎沒有任何的壓力。

如果使用 MQ,每秒 5k 個請求寫入 MQ,A 系統每秒鐘最多處理 2k 個請求,因為 MySQL 每秒鐘最多處理 2k 個。A 系統從 MQ 中慢慢拉取請求,每秒鐘就拉取 2k 個請求,不要超過自己每秒能處理的最大請求數量就 ok,這樣下來,哪怕是高峰期的時候,A 系統也絕對不會挂掉。而 MQ 每秒鐘 5k 個請求進來,就 2k 個請求出去,結果就導緻在中午高峰期(1 個小時),可能有幾十萬甚至幾百萬的請求積壓在 MQ 中。

這個短暫的高峰期積壓是 ok 的,因為高峰期過了之後,每秒鐘就 50 個請求進 MQ,但是 A 系統依然會按照每秒 2k 個請求的速度在處理。是以說,隻要高峰期一過,A 系統就會快速将積壓的消息給解決掉。

消息隊列的優點上面已經講了,缺點有哪些呢?

1、系統可用性降低

系統引入的外部依賴越多,越容易挂掉。本來你就是 A 系統調用 BCD 三個系統的接口就好了,ABCD 四個系統還好好的,沒啥問題,你偏加個 MQ 進來,萬一 MQ 挂了咋整?MQ 一挂,整套系統崩潰,你不就完了? 如何保證消息隊列的高可用呢?下一章講解

2、系統複雜度提高

硬生生加個 MQ 進來,你怎麼保證消息沒有重複消費?怎麼處理消息丢失的情況?怎麼保證消息傳遞的順序性?頭大頭大,問題一大堆,痛苦不已。

3、一緻性問題

A 系統處理完了直接傳回成功了,人都以為你這個請求就成功了;但是問題是,要是 BCD 三個系統那裡,BD 兩個系統寫庫成功了,結果 C 系統寫庫失敗了,咋整?你這資料就不一緻了。

是以消息隊列實際是一種非常複雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉,做好之後,你會發現,媽呀,系統複雜度提升了一個數量級,也許是複雜了 10 倍。但是關鍵時刻,用,還是得用的。

3.消息隊列的功能特點

消息隊列,包含2個關鍵詞:消息和隊列。

消息是指在應用間傳送的資料,消息的表現形式是多樣的,可以簡單到隻包含文本字元串,也可以複雜到有一個結構化的對象定義格式。

對于隊列,從抽象意義上來了解,就是指消息的進和出。從時間順序上說,進和出并不一定是同步進行的,是以需要一個容器來暫存和處理消息。是以,一個典型意義上的消息隊列,至少需要包括消息的發送、接收和暫存功能。

何為消息隊列
  • Broker:消息進行中心,負責消息的接收、存儲、轉發等。
  • Producer:消息生産者,負責産生和發送消息到消息進行中心。
  • Consumer:消息消費者,負責從消息進行中心擷取消息,并進行相應的處理。

    但是在生産環境應用中,對消息隊列的要求遠不止基本的消息發送、接收和暫存。在不同的業務場景中,需要消息隊列産品能解決諸如消息堆積、消息持久化、可靠投遞、消息重複、嚴格有序、叢集等各種問題。

  • 消息堆積

    生産者、消費者速度差别過大,消息進行中心的系統資源被耗盡,導緻機器挂掉甚至整個消息隊列不可用。

    給消息隊列設定閥值。

  • 消息持久化

    如果業務場景不允許有消息的丢失,那麼就要将消息持久化。持久化方案有很多種,比如将消息存到本地檔案、分布式檔案系統、資料庫系統中等。

  • 可靠投遞

    可靠投遞是不允許存在消息丢失的情況。從消息的整個生命周期來分析,消息丢失的情況一般發生在如下過程中:

    從生産者到消息進行中心

    從消息進行中心到消息消費者

    消息進行中心持久化消息

  • 消息重複

    有些消息隊列為了支援消息可靠投遞,會選擇在接收到消息後先持久化到本地,然後發送給消費者。當消息發送失敗或者不知道是否發送成功時(比如逾時),消息的狀态是待發送,定時任務不停地輪詢所有的待發送消息,最終保證消息不會丢失,這就帶來了消息可能會重複的問題。

  • 嚴格有序

    有些業務場景,需要按生産消息時的順序來消費。

  • 叢集

    排除單點故障引起的服務中斷,提高服務的可用性。

    多個節點負載均衡,提高消息通信的吞吐量。

  • 消息中間件

    中間件:非底層作業系統軟體、非業務應用軟體,不是直接給最終使用者使用的,不能直接給客戶帶來價值的軟體系統。

    消息中間件關注于資料的發送和接收,利用高效、可靠的異步消息傳遞機制繼承分布式系統。

4.設計一個簡單的消息隊列

看了那麼多文字描述,不如自己動手實踐一遍體會深刻。下面是用Java語言寫一個簡單的消息隊列。

消息隊列的完整使用場景中至少包含三個角色。

  • 消息進行中心:負責消息的接收、存儲、轉發等。
  • 消息生産者:負責産生和發送消息到消息進行中心。
  • 消息消費者:負責從消息進行中心擷取消息,并進行相應的處理。

消息進行中心

消息進行中心Broker類的實作

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author wangmin
 * @description 消息進行中心
 * created at 2020/5/27 4:37 下午
 */
public class Broker {

	//隊列存儲消息的最大數量
	private final static int MAX_SIZE = 3;

	//儲存消息資料的容器
	private static ArrayBlockingQueue<String> messageQueue =  new ArrayBlockingQueue<>(MAX_SIZE);


	/**
	 * 生産消息
	 * @param msg
	 */
	public static void produce(String msg) {
		if (messageQueue.offer(msg)) {
			System.out.println("成功向消息中心投遞消息:" + msg + ",目前暫存的消息數量是:" + messageQueue.size());
		} else {
			System.out.println("消息進行中心暫存的消息達到最大負荷,不能繼續放入消息!");
		}
		System.out.println("====================");
	}

	public static String consume() {
		String msg = messageQueue.poll();
		if (msg != null) {
			//消費條件滿足情況,從消息容器中取出一條消息
			System.out.println("已經消費消息:" + msg + ",目前暫存的消息數量是:" + messageQueue.size());
		} else {
			System.out.println("消息進行中心内沒有消息可供消費!");
		}
		System.out.println("====================");
		return msg;
	}
}
           

作為一個消息進行中心,至少要有一個資料容器用來儲存接收到的消息。Java中的隊列(Queue)是提供該功能的一種簡單的資料結構,同時為簡化對隊列操作的并發通路處理,我們選擇了它的一個子類ArrayBlockingQueue。該類提供了對資料的插入、擷取、查詢等操作,其底層将資料以數組的形式儲存。

有了消息進行中心類之後,需要将該類的功能暴露出去,這樣别人才能用它來發送和接收消息。是以,我們定義了BrokerServer類用來對外提供Broker類的服務。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author wangmin
 * @description 用來對外提供Broker類的服務
 * created at 2020/5/27 5:09 下午
 */
public class BrokerServer implements Runnable {

	public static int SERVICE_PORT = 9999;

	private final Socket socket;

	public BrokerServer(Socket socket) {
		this.socket = socket;
	}

	@Override
	public void run() {
		try {
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			PrintWriter out = new PrintWriter(socket.getOutputStream());
			while (true) {
				String str = in.readLine();
				if (str == null) {
					continue;
				}
				System.out.println("接收到原始資料:" + str);

				if (str.equals("CONSUME")) { //CONSUME表示要消費一條消息
					//從消息對列中消費一條消息
					String message = Broker.consume();
					out.println(message);
					out.flush();
				} else {
					//其他情況都表示生産消息放到消息隊列中
					Broker.produce(str);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws IOException {
		ServerSocket serverSocket = new ServerSocket(SERVICE_PORT);
		while (true) {
			BrokerServer brokerServer = new BrokerServer(serverSocket.accept());
			new Thread(brokerServer).start();
		}
	}
}
           

Java中涉及伺服器功能的軟體一般少不了套接字(Socket)和線程(Thread),因為需要通過線程的方式将應用啟動起來,而伺服器和應用的用戶端需要用Socket進行網絡通信。

用戶端通路

有了消息進行中心後,自然需要有相應用戶端與之通信來發送和接收消息。

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;

/**
 * @author wangmin
 * @description 用戶端通路消息進行中心
 * created at 2020/5/27 5:26 下午
 */
public class MqClient {

	/**
	 * 生産消息
	 * @param message
	 * @throws Exception
	 */
	public static void produce(String message) throws Exception {
		Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
		try {
			PrintWriter out = new PrintWriter(socket.getOutputStream());
			out.println(message);
			out.flush();
		} catch (Exception e) {
			throw e;
		}
	}

	/**
	 * 消費消息
	 * @return
	 * @throws Exception
	 */
	public static String consume() throws Exception {
		Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
		try {
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			PrintWriter out = new PrintWriter(socket.getOutputStream());
			//先向消息隊列發送字元串"CONSUME"表示消費
			out.println("CONSUME");
			out.flush();
			//再從消息隊列擷取一條消息
			String message = in.readLine();
			return message;
		} catch (Exception e) {
			throw e;
		}
	}
}
           

因為用戶端和服務端是通過網絡通信的,是以顯然也是通過Socket來實作的。生産消息就是通過網絡與消息進行中心通信的,将資料寫入輸出流中,這就模拟了生産消息并發送到消息隊列的過程。消費消息實際是先向消息進行中心伺服器寫入字元串“CONSUME”,表示目前需要消費一條消息,然後通過Socket的輸入流從消息進行中心伺服器擷取消息資料,再傳回給調用者。

以上是通用的用戶端通路代碼,接下來是生産消息和消費消息的示例。

生産消息:

/**
 * @author wangmin
 * @description 生産者用戶端
 * created at 2020/5/27 5:34 下午
 */
public class ProduceClient {
	public static void main(String[] args) throws Exception {
		MqClient.produce("Hello World");
	}
}
           

執行main方法,可以在BrokerServer類的控制台看到消息被寫入隊列中。

因為微隊列設定了大小為3,是以如果執行了4次,則會看到超過隊列容量,不能繼續放入消息了。

何為消息隊列

消費消息:

/**
 * @author wangmin
 * @description 消費者用戶端
 * created at 2020/5/27 5:28 下午
 */
public class ConsumeClient {
	public static void main(String[] args) throws Exception {
		String message = MqClient.consume();
		System.out.println("擷取的消息為:" + message);
	}
}
           

執行main方法,可以在ConsumeClient類的控制台看到消費了一條消息。

何為消息隊列

從BrokerServer類的控制台可以看到接收到“CONSUME”字元串并消費了消息。

何為消息隊列

如果消息隊列中沒有消息,則會從控制台看到提醒。

何為消息隊列