天天看點

消息隊列面試必問解析(上)示例流程

示例流程

消息隊列面試必問解析(上)示例流程

網關發送消息之後,如何接收後端服務的秒殺結果,又如何給APP傳回響應呢?

網關接收後端服務秒殺結果,實作方式不隻一種,這裡給個簡單方案。

public class RequestHandler {
  
  // ID生成器
  @Inject
  private IdGenerator idGenerator;
  // 消息隊列生産者
  @Inject
  private Producer producer;
  // 儲存秒殺結果的Map
  @Inject
  private Map<Long, Result> results;

  // 儲存mutex的Map
  private Map<Long, Object> mutexes = new ConcurrentHashMap<>();
  // 這個網關執行個體的ID
  @Inject
  private long myId;

  @Inject
  private long timeout;

  // 在這裡處理APP的秒殺請求
  public Response onRequest(Request request) {
    // 擷取一個程序内唯一的UUID作為請求id
    Long uuid = idGenerator.next();
    try {

      Message msg = composeMsg(request, uuid, myId);

      // 生成一個mutex,用于等待和通知
      Object mutex = new Object();
      mutexes.put(uuid, mutex)

      // 發消息
      producer.send(msg);

      // 等待後端處理
      synchronized(mutex) {
        mutex.wait(timeout);
      }

      // 查詢秒殺結果
      Result result = results.remove(uuid);

      // 檢查秒殺結果并傳回響應
      if(null != result && result.success()){
        return Response.success();
      }

    } catch (Throwable ignored) {}
    finally {
      mutexes.remove(uuid);
    }
    // 傳回秒殺失敗
    return Response.fail();
  }

  // 在這裡處理後端服務傳回的秒殺結果
  public void onResult(Result result) {

    Object mutex = mutexes.get(result.uuid());
    if(null != mutex) { // 如果查詢不到,說明已經逾時了,丢棄result即可。
      // 登記秒殺結果
      results.put(result.uuid(), result);
      // 喚醒處理APP請求的線程
      synchronized(mutex) {
        mutex.notify();
      }
    }
  }
}      

網關在收到APP秒殺請求後,直接給MQ發消息。

消息的内容,并不一定是APP請求的Request,隻要包含足夠字段:比如使用者ID、裝置ID、請求時間等。

還需包含這個請求ID和網關ID。

  • 如果發消息失敗,可直接給APP傳回秒殺失敗結果
  • 成功發送消息後,線程就阻塞等待秒殺結果。這不可無限等待,需設等待逾時時間。

等待結束後,去存放秒殺結果的Map中查詢是否有傳回的秒殺結果

  • 有就建構Response,給APP傳回秒殺結果
  • 沒有,按秒殺失敗處理
消息隊列面試必問解析(上)示例流程

給APP傳回結果的,隻能是處理APP請求的那個線程。

這是處理APP請求的線程,接下來我們來看一下,網關如何來接收從後端秒殺服務傳回的秒殺結果。

可用RPC傳回秒殺結果:網關節點是RPC服務端,後端服務為用戶端。

網關發的消息包含網關ID,後端服務可通過網關ID找到對應網關執行個體,秒殺結果需包含請求ID,這請求ID也是從消息中擷取。

網關收到後端服務秒殺結果後,用請求ID為Key,把結果存到秒殺結果的Map,然後通知對應的處理APP請求的線程,結束等待。

處理APP請求的線程,在結束等待後,會去秒殺結果Map中查詢結果,然後再給APP傳回響應。

處理過程流程圖

消息隊列面試必問解析(上)示例流程

這并非性能最優方案,處理APP請求的線程需要同步等待秒殺結果,可考慮使用異步方式。

1、秒殺的了解:

APP–發送秒殺請求–》網關(也是RPC服務端,和配置中心保持長連接配接,比如nacos,将其路由和配置資訊定時的發送給配置中心,配置中心對其進行管理,定時的清除當機的網關路由資訊,如超過一定時間沒有接收到網關的心跳包)–》将其APP請求做一定的封裝,增加網關id和網關執行個體中唯一的請求id發送給消息隊列,為了保證消息不丢失,網關對其發送消息出現的異常進行處理,如逾時異常,直接傳回秒殺失敗,網關發送消息的這個過程中可能涉及到分布式事務,使用消息隊列的分布式事務進行處理,然後網關需要等待一段時間,等待秒殺服務端使用RPC調用網關執行個體的接收秒殺結果,為此建立一個新對象,将其請求id做為key,新對象做為value放入CurrentMap中,調用新對象的逾時wait方法進行等待秒殺結果–發送封裝的APP請求,包含網關id和請求id–》消息隊列接收APP請求消息,為了保證消息不丢失,開啟Sync_Flush參數将消息儲存到磁盤,并且為了防止一台機器磁盤出問題,叢集需要2台機器都有消息才确認請求–從消息隊列中拉取消息–》秒殺服務端,為了低延遲執行風控、預占庫存,拿到消息中網關id,從本地路由中查詢網關id的執行個體資訊,如果擷取不到調用網關執行個體時,需先從配置中心擷取到網關的路由資訊,秒殺服務端也需和配置中心保持長連接配接,定時的從配置中心拉取網關的路由資訊,儲存到本地,使用RPC調用網關執行個體的接收秒殺結果的方法,為了保證消息不丢失,先執行消費邏輯,再響應消息隊列,如果根據網關id擷取不到網關執行個體,或者确認消息隊列逾時或出現異常,秒殺服務端復原事務,此過程也涉及到分布式事務,為了防止消費重複消息,接口的幂等性,将請求id和網關id做為唯一鍵。也為了防止消息積壓,消息隊列中的主題隊列和消費組中的消費者一一對應,保證消息被快速消費。

2、秒殺異步,APP發送請求給網關,網關接收請求後将請求做一定的封裝(包括請求id,網關id,賬戶id),然後發送到消息隊列中,響應APP請求,無需等待後需的流程,然後秒殺成功以否直接傳回,後續流程處理完使用短信的形式告知使用者是否秒殺成功,不知道這樣做法是否可行。

3、最近在撸rocketmq的源碼,搞了namesrv、logging、logappend子產品,想成為commiter,立個flag,等後續JMQ出來,撸其源碼,也想成為commiter,道阻且長,持續進化。

解答:技術上都沒什麼問題。

從業務角度,有一些不同的看法。

對于秒殺這種場景,宏觀上的設計應該是傾向于利用有限的資源處理短時間内海量的請求,保證服務不當機。有少量請求處理出錯(注意是後端錯誤,使用者不可見)或消息丢失,是可以接受的。

畢竟秒殺拼的就是運氣,某個使用者秒殺請求在處理的時候丢失,和處理成功但沒秒到,對于使用者來說都是運氣不好而已。

基于這樣的設計理念,很多保證資料可靠性的做法都可以犧牲掉,用于換取系統更大的吞吐量比較劃算。