天天看点

使用服务器发送的事件来大规模简化实时流(译文)

作者:闪念基因

在构建任何类型的实时数据应用程序时,试图弄清楚如何将消息从服​务器发送到客户端(反之亦然)是等式的重要组成部分。多年来,出现了各种通信模型来处理服务器到客户端的通信,包括服务器发送事件 (SSE)。

SSE 是一种单向服务器推送技术,它使 Web 客户端能够通过 HTTP 连接从服务器接收自动更新。使用 SSE,数据传输既快速又简单,因为没有定期轮询,因此无需临时暂存数据。

这是对 Shopify 每年发布的实时数据可视化产品——我们的黑色星期五网络星期一 (BFCM) 实时地图的完美补充。

我们的2021 Live Map系统很复杂,并且使用了不太适合的轮询通信模型。虽然这个系统有 100% 的正常运行时间,但它并非没有瓶颈。我们知道我们可以提高性能和数据延迟。

下面,我们将介绍如何实施 SSE 服务器以简化 BFCM Live Map 架构并改善数据延迟。我们将讨论为您的用例选择正确的通信模型、SSE 的优势,以及有关如何在 Golang 中使用 Nginx 实现负载平衡的可扩展 SSE 服务器的代码示例。

选择实时通信模型

首先,让我们讨论选择如何发送消息。在实时数据流方面,存在三种通信模型:

  1. 推送:这是最实时的模型。客户端打开到服务器的连接,并且该连接保持打开状态。服务器推送消息,客户端等待这些消息。服务器管理连接的客户端的注册表以将数据推送到。可扩展性与该注册表的可扩展性直接相关。
  2. 轮询:客户端向服务器发出请求并立即得到响应,无论是否有消息。当没有新消息时,此模型会浪费带宽和资源。虽然此模型最容易实施,但扩展性不佳。
  3. 长轮询:这是上述两种模型的组合。客户端向服务器发出请求,但连接保持打开状态,直到返回数据响应。一旦返回带有新数据的响应,连接就会关闭。

没有哪个模型比另一个更好,这实际上取决于用例。

我们的用例是 Shopify BFCM Live Map,这是一个 Web 用户界面,可处理和可视化数百万 Shopify 商家在 BFCM 周末的实时销售情况。我们可视化的数据包括:

  • 每分钟总销售额
  • 每分钟订单总数
  • 每分钟总碳补偿
  • 每分钟总运输距离
  • 每分钟唯一身份购物者总数
  • 最新出货单清单
  • 热门产品
使用服务器发送的事件来大规模简化实时流(译文)

Shopify 的 2022 BFCM Live Map 前端

BFCM 是 Shopify 一年中最大的数据时刻,因此将实时数据流式传输到 Live Map 是一项复杂的壮举。我们的平台正在处理来自商家的数百万个订单。从这个规模来看,在 BFCM 2021 期间,我们看到我们的摄取服务摄取了 3230 亿行数据。

BFCM Live Map 要想取得成功,需要一个可扩展且可靠的管道,以在几秒钟内提供准确、实时的数据。该管道的一个关键部分是我们的服务器到客户端通信模型。我们需要既能处理传输的数据量,又能处理数千人同时连接到服务器的负载。它需要快速完成所有这些。

我们的2021 BFCM 实时地图通过 WebSocket 将数据传送到表示层。表示层然后将数据存放在邮箱系统中,供 Web 客户端定期轮询,需要(至少)10 秒。在实践中,这行得通,但数据必须经过很长的组件路径才能交付给客户。

数据由一个多组件后端系统提供,该系统由使用 Redis 服务器和 MySQL 数据库的基于 Golang 的应用程序 (Cricket) 组成。Live Map 的数据管道由一个基于多区域、多作业的 Apache Flink 应用程序组成。Flink 处理来自 Apache Kafka 主题和 Google Cloud Storage (GCS) 镶木地板文件丰富数据的源数据,以生成其他 Kafka 主题供 Cricket 使用。

使用服务器发送的事件来大规模简化实时流(译文)

Shopify 2021 BFCM globe 后台架构

虽然这完成了工作,但复杂的架构导致了性能瓶颈。在我们的趋势产品数据可视化的情况下,更改可能需要几分钟才能提供给客户。我们需要简化以改善数据延迟。

当我们接近这种简化时,我们知道我们想要弃用 Cricket 并用基于 Flink 的数据管道取而代之。在过去的几年里,我们一直在投资 Flink,甚至在它之上构建了我们的流媒体平台——我们称之为 Trickle。我们知道我们可以利用这些现有的工程能力和基础设施来简化我们的管道。

弄清楚我们的数据管道后,我们需要决定如何将数据交付给客户端。我们查看了我们是如何使用 WebSocket 的,并意识到它不是我们用例的最佳工具。

服务器发送事件与 WebSocket

WebSocket 通过单个 TCP 连接提供双向通信通道。如果您正在构建聊天应用程序之类的东西,这非常有用,因为客户端和服务器都可以通过通道发送和接收消息。但是,对于我们的用例,我们不需要双向通信通道。

BFCM Live Map是一个数据可视化产品,所以我们只需要服务端将数据传递给客户端即可。如果我们继续使用 WebSocket,那将不是最精简的解决方案。另一方面,SSE 更适合我们的用例。如果我们使用 SSE,我们将能够实现:

  • 安全的单向推送:连接流来自服务器并且是只读的。
  • 使用普遍熟悉的 HTTP 请求的连接:这对我们来说是一个好处,因为我们已经在使用普遍熟悉的 HTTP 协议,所以我们不需要实现特殊的深奥协议。
  • 自动重新连接:如果连接丢失,会在一定时间后自动重新连接。

但最重要的是,SSE 将允许我们移除为了客户端轮询而在表示层上检索、处理和存储数据的过程。使用 SSE,我们将能够在数据可用时立即推送。不会有更多的民意调查和阅读,所以不会有更多的延迟。这与新的流线型管道相结合,将简化我们的架构,随着峰值 BFCM 量进行扩展并改善我们的数据延迟。

考虑到这一点,我们决定将 SSE 作为我们 2022 Live Map 的通信模型。我们是这样做的。

在 Golang 中实现 SSE

我们在 Golang 中实现了一个 SSE 服务器,它订阅 Kafka 主题并在数据可用时将数据推送到所有注册客户端的 SSE 连接。

使用服务器发送的事件来大规模简化实时流(译文)

带有 SSE 服务器的 Shopify 2022 BFCM Live Map 后端架构

实时流式 Flink 数据管道处理来自 Kafka 主题的原始 Shopify 商家销售数据。它还以压缩的 Apache Parquet 文件的形式处理 GCS 上定期更新的产品分类丰富数据。然后将这些分别计算到我们的销售和趋势产品数据中,并发布到 Kafka 主题中。

以下是服务器如何注册 SSE 连接的代码片段:

func (sseServer *SSEServer) handleConnection(rw http.ResponseWriter, req *http.Request) {
  flusher, ok := rw.(http.Flusher)

  if !ok {
    http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
    return
  }
  auth := req.Header.Get("Authorization")
  //handling authorization header
  .....

  rw.Header().Set("Content-Type", "text/event-stream")
  rw.Header().Set("Cache-Control", "no-cache")
  rw.Header().Set("Connection", "keep-alive")
  rw.Header().Set("X-Accel-Buffering", "no")
  rw.Header().Set("Access-Control-Allow-Origin", "*")
  rw.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
  rw.Header().Set("Access-Control-Allow-Headers", "Content-Type, authorization, X-Requested-With")

  // register connection message channel
  messageChan := make(chan []byte)

  // signal the Server of a new client connection
  sseServer.NewClientsChannel <- messageChan

  //keeping the connection alive with keep-alive protocol
  keepAliveTickler := time.NewTicker(15 * time.Second)
  keepAliveMsg := ":keepalive\n"
  notify := req.Context().Done()

  //listen to signal to close and unregister
  go func() {
    <-notify
    sseServer.ClosingClientsChannel <- messageChan
    keepaliveTickler.Stop()
  }()

  defer func() {
    sseServer.ClosingClientsChannel <- messageChan
  }()

  for {
    select {
    //receiving a message from the Kafka channel.
    case kafkaEvent := <-messageChan:
      // Write to the ResponseWriter in SSE compatible format
      fmt.Fprintf(rw, "data: %s\n\n", kafkaEvent)
      flusher.Flush()
    case <-keepAliveTickler.C:
      fmt.Fprintf(rw, keepAliveMsg)
      flusher.Flush()
    }

  }

}           

使用EventSource接口订阅 SSE 端点很简单。通常,客户端代码创建一个本机 EventSource 对象并在该对象上注册一个事件侦听器。该事件在回调函数中可用:

eventSource = new EventSource("http://localhost:8081/events")
eventSource.addEventListener('message', e => {
   var data = JSON.parse(e.data);
   console.log(data);
});           

在将 SSE 服务器集成到我们的前端 UI 时,UI 应用程序需要订阅经过身份验证的 SSE 服务器端点以接收数据。从服务器推送到客户端的数据在 BFCM 期间可公开访问,但身份验证使我们能够在站点不再公开时控制访问。预生成的 JWT 令牌由托管订阅客户端的服务器提供给客户端。我们使用开源的 EventSourcePolyfill 实现将授权标头传递给请求:

eventSource = new EventSourcePolyfill("http://localhost:8081/events", {
 headers: {
   'Authorization': "bearer xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
 }
});
eventSource.addEventListener('message', e => {
   var data = JSON.parse(e.data);
   //process data
});           

订阅后,数据会在可用时推送到客户端。数据与 SSE 格式一致,有效负载是客户端可解析的 JSON。

确保 SSE 能够处理负载

由于消息总线瓶颈,我们的 2021 年系统在 BFCM 高峰期的大量用户会话请求下苦苦挣扎。我们需要确保我们的 SSE 服务器能够处理我们预期的 2022 年交易量。

考虑到这一点,我们将 SSE 服务器构建为可水平扩展,虚拟机集群位于 Shopify 的 NGINX 负载均衡器后面。随着负载的增加或减少,我们可以通过添加或删除 pod 来弹性扩展和减小我们的集群大小。但是,我们必须确定每个 pod 的限制,以便我们可以相应地规划集群。

运行 SSE 服务器的挑战之一是确定服务器将如何在负载下运行并处理并发连接。与客户端的连接由服务器维护,以便它知道哪些是活动的,从而知道将数据推送到哪些。这个SSE连接是由浏览器实现的,包括重试逻辑。打开数万个真正的浏览器 SSE 连接是不切实际的。因此,我们需要在负载测试中模拟大量连接,以确定单个服务器 pod 可以处理多少并发用户。通过这样做,我们可以确定如何适当地扩展集群。

我们选择构建一个简单的 Java 客户端,它可以启动与服务器的可配置数量的 SSE 连接。此 Java 应用程序被捆绑到一个可运行的 Jar 中,该 Jar 可以分发到不同区域的多个 VM 以模拟预期的连接数。我们利用开源的okhttp-eventsource库来实现这个 Java 客户端。

下面是这个 Java 客户端的主要代码:

public static void process(String endpoint, String bearerToken, int numberOfSessions, long pauseTimeMs, int sessionLengthMins) throws Exception{

   EventHandler handler =new EventHandler() {
       @Override
       public void onOpen() throws Exception {
           System.out.println("Open ");
       }

       @Override
       public void onClosed() throws Exception {
           System.out.println("closed ");
       }

       @Override
       public void onMessage(String s, MessageEvent messageEvent) throws Exception {
           System.out.println("message: " + messageEvent.getData());
       }

       @Override
       public void onComment(String s) throws Exception {
           System.out.println("comment: "+s);
       }

       @Override
       public void onError(Throwable throwable) {
           throwable.printStackTrace();
       }
   };
   EventSource.Builder builder = new EventSource.Builder(handler, URI.create(endpoint)).headers(Headers.of("Authorization","bearer "+bearerToken));
   for(int i = 0; i< numberOfSessions; i++) {
       TimeUnit.MILLISECONDS.sleep(pauseTimeMs);
       Thread t = new Thread(new Runnable() {
           @Override
           public void run() {
               try (EventSource eventSource = builder.build()) {
                   eventSource.start();
                   TimeUnit.MINUTES.sleep(sessionLengthMins);
               }catch(Exception ex) {
                   ex.printStackTrace();
               }
           }
       });
       t.setDaemon(false);
       t.start();
   }

}           

上证所表现有压力吗?

有了另一个成功的 BFCM,我们可以自信地说,在我们新的简化管道中实施 SSE 是正确的举措。我们的 BFCM 实时地图实现了 100% 的正常运行时间。至于 SSE 方面的数据延迟,数据在可用后的几毫秒内交付给客户端。这比我们 2021 年系统的最少 10 秒民意调查有了很大改进。总的来说,包括我们 Flink 数据管道中的数据处理,数据在其创建时间的 21 秒内在 BFCM 的实时地图 UI 上可视化。

我们希望您喜欢观看 2022 BFCM Live Map 的幕后花絮,并在此过程中学到了一些提示和技巧。请记住,在为您的实时数据产品选择通信模型时,请保持简单并使用最适合您的用例的工具。

作者:

Bao是 Core Optimize Data 团队的高级数据工程师。他对大型软件系统架构和开发、大数据技术以及构建稳健、高性能的数据管道感兴趣。

来源:https://shopify.engineering/server-sent-events-data-streaming

继续阅读