天天看點

使用伺服器發送的事件來大規模簡化實時流(譯文)

作者:閃念基因

在建構任何類型的實時資料應用程式時,試圖弄清楚如何将消息從服​務器發送到用戶端(反之亦然)是等式的重要組成部分。多年來,出現了各種通信模型來處理伺服器到用戶端的通信,包括伺服器發送事件 (SSE)。

SSE 是一種單向伺服器推送技術,它使 Web 用戶端能夠通過 HTTP 連接配接從伺服器接收自動更新。使用 SSE,資料傳輸既快速又簡單,因為沒有定期輪詢,是以無需臨時暫存資料。

這是對 Shopify 每年釋出的實時資料可視化産品——我們的黑色星期五網絡星期一 (BFCM) 實時地圖的完美補充。

我們的2021 Live Map系統很複雜,并且使用了不太适合的輪詢通信模型。雖然這個系統有 100% 的正常運作時間,但它并非沒有瓶頸。我們知道我們可以提高性能和資料延遲。

下面,我們将介紹如何實施 SSE 伺服器以簡化 BFCM Live Map 架構并改善資料延遲。我們将讨論為您的用例選擇正确的通信模型、SSE 的優勢,以及有關如何在 Golang 中使用 Nginx 實作負載平衡的可擴充 SSE 伺服器的代碼示例。

選擇實時通信模型

首先,讓我們讨論選擇如何發送消息。在實時資料流方面,存在三種通信模型:

  1. 推送:這是最實時的模型。用戶端打開到伺服器的連接配接,并且該連接配接保持打開狀态。伺服器推送消息,用戶端等待這些消息。伺服器管理連接配接的用戶端的系統資料庫以将資料推送到。可擴充性與該系統資料庫的可擴充性直接相關。
  2. 輪詢:用戶端向伺服器送出請求并立即得到響應,無論是否有消息。當沒有新消息時,此模型會浪費帶寬和資源。雖然此模型最容易實施,但擴充性不佳。
  3. 長輪詢:這是上述兩種模型的組合。用戶端向伺服器送出請求,但連接配接保持打開狀态,直到傳回資料響應。一旦傳回帶有新資料的響應,連接配接就會關閉。

沒有哪個模型比另一個更好,這實際上取決于用例。

我們的用例是 Shopify BFCM Live Map,這是一個 Web 使用者界面,可處理和可視化數百萬 Shopify 商家在 BFCM 周末的實時銷售情況。我們可視化的資料包括:

  • 每分鐘總銷售額
  • 每分鐘訂單總數
  • 每分鐘總碳補償
  • 每分鐘總運輸距離
  • 每分鐘唯一身份購物者總數
  • 最新出貨單清單
  • 熱門産品
使用伺服器發送的事件來大規模簡化實時流(譯文)

Shopify 的 2022 BFCM Live Map 前端

BFCM 是 Shopify 一年中最大的資料時刻,是以将實時資料流式傳輸到 Live Map 是一項複雜的壯舉。我們的平台正在處理來自商家的數百萬個訂單。從這個規模來看,在 BFCM 2021 期間,我們看到我們的攝取服務攝取了 3230 億行資料。

BFCM Live Map 要想取得成功,需要一個可擴充且可靠的管道,以在幾秒鐘内提供準确、實時的資料。該管道的一個關鍵部分是我們的伺服器到用戶端通信模型。我們需要既能處理傳輸的資料量,又能處理數千人同時連接配接到伺服器的負載。它需要快速完成所有這些。

我們的2021 BFCM 實時地圖通過 WebSocket 将資料傳送到表示層。表示層然後将資料存放在郵箱系統中,供 Web 用戶端定期輪詢,需要(至少)10 秒。在實踐中,這行得通,但資料必須經過很長的元件路徑才能傳遞給客戶。

資料由一個多元件後端系統提供,該系統由使用 Redis 伺服器和 MySQL 資料庫的基于 Golang 的應用程式 (Cricket) 組成。Live Map 的資料管道由一個基于多區域、多作業的 Apache Flink 應用程式組成。Flink 處理來自 Apache Kafka 主題和 Google Cloud Storage (GCS) 鑲木地闆檔案豐富資料的源資料,以生成其他 Kafka 主題供 Cricket 使用。

使用伺服器發送的事件來大規模簡化實時流(譯文)

Shopify 2021 BFCM globe 背景架構

雖然這完成了工作,但複雜的架構導緻了性能瓶頸。在我們的趨勢産品資料可視化的情況下,更改可能需要幾分鐘才能提供給客戶。我們需要簡化以改善資料延遲。

當我們接近這種簡化時,我們知道我們想要棄用 Cricket 并用基于 Flink 的資料管道取而代之。在過去的幾年裡,我們一直在投資 Flink,甚至在它之上建構了我們的流媒體平台——我們稱之為 Trickle。我們知道我們可以利用這些現有的工程能力和基礎設施來簡化我們的管道。

弄清楚我們的資料管道後,我們需要決定如何将資料傳遞給用戶端。我們檢視了我們是如何使用 WebSocket 的,并意識到它不是我們用例的最佳工具。

伺服器發送事件與 WebSocket

WebSocket 通過單個 TCP 連接配接提供雙向通信通道。如果您正在建構聊天應用程式之類的東西,這非常有用,因為用戶端和伺服器都可以通過通道發送和接收消息。但是,對于我們的用例,我們不需要雙向通信通道。

BFCM Live Map是一個資料可視化産品,是以我們隻需要服務端将資料傳遞給用戶端即可。如果我們繼續使用 WebSocket,那将不是最精簡的解決方案。另一方面,SSE 更适合我們的用例。如果我們使用 SSE,我們将能夠實作:

  • 安全的單向推送:連接配接流來自伺服器并且是隻讀的。
  • 使用普遍熟悉的 HTTP 請求的連接配接:這對我們來說是一個好處,因為我們已經在使用普遍熟悉的 HTTP 協定,是以我們不需要實作特殊的深奧協定。
  • 自動重新連接配接:如果連接配接丢失,會在一定時間後自動重新連接配接。

但最重要的是,SSE 将允許我們移除為了用戶端輪詢而在表示層上檢索、處理和存儲資料的過程。使用 SSE,我們将能夠在資料可用時立即推送。不會有更多的民意調查和閱讀,是以不會有更多的延遲。這與新的流線型管道相結合,将簡化我們的架構,随着峰值 BFCM 量進行擴充并改善我們的資料延遲。

考慮到這一點,我們決定将 SSE 作為我們 2022 Live Map 的通信模型。我們是這樣做的。

在 Golang 中實作 SSE

我們在 Golang 中實作了一個 SSE 伺服器,它訂閱 Kafka 主題并在資料可用時将資料推送到所有注冊用戶端的 SSE 連接配接。

使用伺服器發送的事件來大規模簡化實時流(譯文)

帶有 SSE 伺服器的 Shopify 2022 BFCM Live Map 後端架構

實時流式 Flink 資料管道處理來自 Kafka 主題的原始 Shopify 商家銷售資料。它還以壓縮的 Apache Parquet 檔案的形式處理 GCS 上定期更新的産品分類豐富資料。然後将這些分别計算到我們的銷售和趨勢産品資料中,并釋出到 Kafka 主題中。

以下是伺服器如何注冊 SSE 連接配接的代碼片段:

func (sseServer *SSEServer) handleConnection(rw http.ResponseWriter, req *http.Request) {
  flusher, ok := rw.(http.Flusher)

  if !ok {
    http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
    return
  }
  auth := req.Header.Get("Authorization")
  //handling authorization header
  .....

  rw.Header().Set("Content-Type", "text/event-stream")
  rw.Header().Set("Cache-Control", "no-cache")
  rw.Header().Set("Connection", "keep-alive")
  rw.Header().Set("X-Accel-Buffering", "no")
  rw.Header().Set("Access-Control-Allow-Origin", "*")
  rw.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
  rw.Header().Set("Access-Control-Allow-Headers", "Content-Type, authorization, X-Requested-With")

  // register connection message channel
  messageChan := make(chan []byte)

  // signal the Server of a new client connection
  sseServer.NewClientsChannel <- messageChan

  //keeping the connection alive with keep-alive protocol
  keepAliveTickler := time.NewTicker(15 * time.Second)
  keepAliveMsg := ":keepalive\n"
  notify := req.Context().Done()

  //listen to signal to close and unregister
  go func() {
    <-notify
    sseServer.ClosingClientsChannel <- messageChan
    keepaliveTickler.Stop()
  }()

  defer func() {
    sseServer.ClosingClientsChannel <- messageChan
  }()

  for {
    select {
    //receiving a message from the Kafka channel.
    case kafkaEvent := <-messageChan:
      // Write to the ResponseWriter in SSE compatible format
      fmt.Fprintf(rw, "data: %s\n\n", kafkaEvent)
      flusher.Flush()
    case <-keepAliveTickler.C:
      fmt.Fprintf(rw, keepAliveMsg)
      flusher.Flush()
    }

  }

}           

使用EventSource接口訂閱 SSE 端點很簡單。通常,用戶端代碼建立一個本機 EventSource 對象并在該對象上注冊一個事件偵聽器。該事件在回調函數中可用:

eventSource = new EventSource("http://localhost:8081/events")
eventSource.addEventListener('message', e => {
   var data = JSON.parse(e.data);
   console.log(data);
});           

在将 SSE 伺服器內建到我們的前端 UI 時,UI 應用程式需要訂閱經過身份驗證的 SSE 伺服器端點以接收資料。從伺服器推送到用戶端的資料在 BFCM 期間可公開通路,但身份驗證使我們能夠在站點不再公開時控制通路。預生成的 JWT 令牌由托管訂閱用戶端的伺服器提供給用戶端。我們使用開源的 EventSourcePolyfill 實作将授權标頭傳遞給請求:

eventSource = new EventSourcePolyfill("http://localhost:8081/events", {
 headers: {
   'Authorization': "bearer xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
 }
});
eventSource.addEventListener('message', e => {
   var data = JSON.parse(e.data);
   //process data
});           

訂閱後,資料會在可用時推送到用戶端。資料與 SSE 格式一緻,有效負載是用戶端可解析的 JSON。

確定 SSE 能夠處理負載

由于消息總線瓶頸,我們的 2021 年系統在 BFCM 高峰期的大量使用者會話請求下苦苦掙紮。我們需要確定我們的 SSE 伺服器能夠處理我們預期的 2022 年交易量。

考慮到這一點,我們将 SSE 伺服器建構為可水準擴充,虛拟機叢集位于 Shopify 的 NGINX 負載均衡器後面。随着負載的增加或減少,我們可以通過添加或删除 pod 來彈性擴充和減小我們的叢集大小。但是,我們必須确定每個 pod 的限制,以便我們可以相應地規劃叢集。

運作 SSE 伺服器的挑戰之一是确定伺服器将如何在負載下運作并處理并發連接配接。與用戶端的連接配接由伺服器維護,以便它知道哪些是活動的,進而知道将資料推送到哪些。這個SSE連接配接是由浏覽器實作的,包括重試邏輯。打開數萬個真正的浏覽器 SSE 連接配接是不切實際的。是以,我們需要在負載測試中模拟大量連接配接,以确定單個伺服器 pod 可以處理多少并發使用者。通過這樣做,我們可以确定如何适當地擴充叢集。

我們選擇建構一個簡單的 Java 用戶端,它可以啟動與伺服器的可配置數量的 SSE 連接配接。此 Java 應用程式被捆綁到一個可運作的 Jar 中,該 Jar 可以分發到不同區域的多個 VM 以模拟預期的連接配接數。我們利用開源的okhttp-eventsource庫來實作這個 Java 用戶端。

下面是這個 Java 用戶端的主要代碼:

public static void process(String endpoint, String bearerToken, int numberOfSessions, long pauseTimeMs, int sessionLengthMins) throws Exception{

   EventHandler handler =new EventHandler() {
       @Override
       public void onOpen() throws Exception {
           System.out.println("Open ");
       }

       @Override
       public void onClosed() throws Exception {
           System.out.println("closed ");
       }

       @Override
       public void onMessage(String s, MessageEvent messageEvent) throws Exception {
           System.out.println("message: " + messageEvent.getData());
       }

       @Override
       public void onComment(String s) throws Exception {
           System.out.println("comment: "+s);
       }

       @Override
       public void onError(Throwable throwable) {
           throwable.printStackTrace();
       }
   };
   EventSource.Builder builder = new EventSource.Builder(handler, URI.create(endpoint)).headers(Headers.of("Authorization","bearer "+bearerToken));
   for(int i = 0; i< numberOfSessions; i++) {
       TimeUnit.MILLISECONDS.sleep(pauseTimeMs);
       Thread t = new Thread(new Runnable() {
           @Override
           public void run() {
               try (EventSource eventSource = builder.build()) {
                   eventSource.start();
                   TimeUnit.MINUTES.sleep(sessionLengthMins);
               }catch(Exception ex) {
                   ex.printStackTrace();
               }
           }
       });
       t.setDaemon(false);
       t.start();
   }

}           

上證所表現有壓力嗎?

有了另一個成功的 BFCM,我們可以自信地說,在我們新的簡化管道中實施 SSE 是正确的舉措。我們的 BFCM 實時地圖實作了 100% 的正常運作時間。至于 SSE 方面的資料延遲,資料在可用後的幾毫秒内傳遞給用戶端。這比我們 2021 年系統的最少 10 秒民意調查有了很大改進。總的來說,包括我們 Flink 資料管道中的資料處理,資料在其建立時間的 21 秒内在 BFCM 的實時地圖 UI 上可視化。

我們希望您喜歡觀看 2022 BFCM Live Map 的幕後花絮,并在此過程中學到了一些提示和技巧。請記住,在為您的實時資料産品選擇通信模型時,請保持簡單并使用最适合您的用例的工具。

作者:

Bao是 Core Optimize Data 團隊的進階資料工程師。他對大型軟體系統架構和開發、大資料技術以及建構穩健、高性能的資料管道感興趣。

來源:https://shopify.engineering/server-sent-events-data-streaming

繼續閱讀