laitimes

Demystifying eBay Kafka's cross-data center high-availability solutions

author:Flash Gene
Demystifying eBay Kafka's cross-data center high-availability solutions

01

Demystifying eBay Kafka's cross-data center high-availability solutions

Forewords

Preface

With the rapid iteration and maturity of Internet services, the rapid flow of data and real-time analysis and processing have always been important driving forces to promote business innovation. Quickly capture business data, quickly analyze data characteristics, obtain real-time business insights and feed back to business operations, and the operational efficiency of this closed-loop determines business competitiveness. As a hub engine for data flow, Kafka has gradually become the best choice for data access and data distribution due to its excellent performance and rich development ecosystem.

As a distributed data engine, Kafka's performance and stability in single-data center deployments have been proven in production. However, how to ensure high reliability and continuity of services and data between multiple data centers, the industry does not have a unified and convenient solution. In order to build a highly available data link, Kafka developers often need to build complex detection capabilities and switching mechanisms on the application side to ensure that when the Kafka cluster or even the data center is unavailable, the data link can still run continuously and the data is complete. This complex application logic and maintenance cost burdens application development, and the switchover process in the event of a failure often inevitably introduces human operations, resulting in data link interruptions. Therefore, it is of great business value to provide Kafka cross-data center high availability solution with application-agnostic application.

02

Demystifying eBay Kafka's cross-data center high-availability solutions

Backgrounds

Background

Rheos, eBay's real-time big data platform, offers three business products: Kafka, Flink, and Rheos SQL. At eBay, Kafka supports the company's internal Near-Real-Time data link business groups such as advertising, payment and risk control, marketing, AI and search. So far, eBay has more than 100 Kafka production clusters, and more than one trillion data is accessed every day.

eBay's data centers are mainly distributed in three US regions: LVS (Las Vegas), SLC (Salt Lake City) and RNO (Reno). There are three main ways to deploy Kafka clusters: multi-standalone, active-active with mirroring, and local-aggregation with mirroring. Each Kafka cluster in each deployment mode logically forms a stream, and each Kafka cluster in the same stream reuses the same configuration and metadata (topic, ACL). The topology of each deployment mode and the upstream and downstream access modes are described as follows.

Many places are independent

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 1. Kafka cluster topology with independent streams in multiple locations

In the deployment shown in the figure above, the Kafka clusters of the three data centers are independent of each other, and the upstream producer application and downstream consumer application coordinate access points to ensure the end-to-end flow of data. If the upstream application needs to switch the access point, the downstream application needs to switch synchronously. This deployment method is generally suitable for decoupling the data architecture of various lines of business within the department.

The two places are mutually equipped

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 2. The two places have a Kafka cluster topology for Stream

In the deployment shown in the preceding figure, the Kafka clusters of the two data centers are backed up to each other. For each topic, specify one of the data centers as the primary and the other as the standby center, and the Mirrormaker component is responsible for replicating data from the primary to the standby in real time. Upstream applications can only send data to the topic's primary data center, while downstream applications can read the topic from any data center. When the upstream application needs to switch data center writes, the write topic needs to be replaced at the same time, and the downstream application must also adapt to change the topic to continue reading (the downstream application can also subscribe to two topics at the same time, namely topicN.slc and topicN.lvs in the figure above, to avoid this replacement).

Multi-location mutual availability

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 3. Kafka cluster topology with multiple Streams is mutually available

In the deployment shown in the preceding figure, each Kafka cluster of a Stream is divided into two layers: local and aggregation. The data of the local layer Kafka cluster is copied by the Mirrormaker component to the aggregation layer Kafka cluster in real time, so each cluster of the aggregation layer contains the sum of the data of each cluster in the local layer. Upstream applications write to the Kafka cluster in the local layer, and downstream applications receive from the Kafka cluster in the aggregation layer. Upstream and downstream applications can flexibly select access points from the same data center to achieve lower latency, and upstream and downstream applications can switch access points to completely decouple.

03

Demystifying eBay Kafka's cross-data center high-availability solutions

Architecture

Architectural design

Design Ideas

In the challenge of achieving high availability across data centers, the multi-region independent mode is relatively simple, but the data of each center is isolated from each other, and the historical data cannot be traced back when downstream applications switch centers. Both centers contain full data, but the upstream application switches access points and metadata management is complicated. In the multi-region mutual backup mode, each center contains full data, and the customer interface is simple and intuitive, flexibly expanding multi-center deployment to achieve higher-level disaster recovery scenarios.

When designing a cross-data center high-availability solution with no sense of application, we try to keep the customer interface similar to the native Kafka interface, so that customers familiar with Kafka application development can see at a glance when implementing the launch, greatly reducing the learning cost.

Comprehensively considered, we adopt the multi-site mutual backup mode to realize this solution.

Architectural overview

The overall architecture of the scheme is represented in the following figure. Dsicovery Service is responsible for monitoring the health of each Kafka cluster and reporting the results to the database. PubSubLib periodically calls the Discovery Service to sense whether the cluster to which the current client is connected is healthy and decide whether to switch clusters. Based on the open-source Mirrormaker2 component to copy cross-cluster data in real time, and customize the offset mapping relationship between the source and target sides of the module periodically, Offset Management Service is based on this computing aggregation layer of the Kafka cluster between the Kafka clusters in each data center of the aggregation layer, so that consumers can seamlessly achieve breakpoint continuation when cutting data centers. Next, we will dive into the design and implementation details of each component.

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 4. Overall architecture for cross-data center high-availability scenarios

Component analysis

Demystifying eBay Kafka's cross-data center high-availability solutions

Discovery Service

The core function of this component is to periodically detect the health of each cluster in the platform, and at the same time, select the appropriate target cluster for registered producers and consumers. In order to reflect the cluster health status as realistically as possible, the probe service detects whether the Kafka service is healthy and responsive based on the Kafka native protocol, so as to avoid false negative scenarios, such as the port of the Kafka service is reachable but the service is unavailable. An overview of the architecture of the probe service can be expressed in the following figure.

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 5. Metadata probe service schema diagram

The probe service itself is designed as a stateless service, and service instances are deployed in various data centers to avoid a single point of failure. To achieve better scalability, each service instance is responsible for only a part of the cluster's exploration tasks. One of the instances is elected as the leader and is responsible for assigning a task to each instance. The overall workflow of the service can be described as:

1) Detect whether the service group has currently elected a leader, if not, run for leader, and decide to be a leader or follower according to the election results;

2) If it is a leader, check the service instances registered in the current service group, collect all Kafka clusters that need to be explored, and assign the corresponding exploration task to each service instance;

3) Each service instance receives the assigned task, and sends a Kafka metadata request to each Kafka cluster node specified in the task to explore the activity, if it can receive a response within the specified time, mark the node as available, and declare the cluster as healthy when more than half of the nodes in a cluster are available and the number of available is not less than 3;

4) The service instance stores the exploration results in the database, and updates the target clusters of registered consumers and producers as needed for clients to query according to the health status of the cluster.

Each service instance performs tasks periodically with this process, and is currently configured with a cycle interval of 1 minute. The service group is prefronted with a loadbalancer for the client to call, and the service instance reads the database to query the cluster status associated with the client after receiving the client request, and returns the corresponding target cluster with the client attributes (such as the producer's co-locality configuration). At the same time, the service instance stores the probe results in a local cache to cope with instantaneous high-concurrency probe requests from various clients in the same application.

Demystifying eBay Kafka's cross-data center high-availability solutions

Offset Management Service

In the Kafka cross-data center high-availability architecture, we adopt the Kafka deployment method of local-aggregation and multi-region mutual provisioning. Applications write data to the local layer cluster and copy it to each cluster in the aggregation layer through Kafka Mirrormaker, and downstream applications consume kafka data from the aggregation layer. Although each cluster in the aggregation layer has full data, the message offset where the same data is located in each aggregation cluster is different due to the parallel replication and cross-import of data from each cluster in the local layer, as well as between replication processes or message retransmission. Therefore, when consumers switch between different aggregation clusters, if they continue to consume directly from the offset of the previous cluster, there is a risk of data leakage consumption. In our solution, the Offset Management Service is implemented to ensure that no data is lost when consumers switch aggregation clusters, so as to achieve the goal of at-least-once semantics.

Workflow for Offset Management Service

Demystifying eBay Kafka's cross-data center high-availability solutions
Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 6. Offset Management Service workflow

The workflow of Offset Management Service is shown in Figure 6, which is mainly divided into the following steps:

1) The Offset mapping fetcher module is used to obtain the message offset value (offset) of each Local cluster and the aggregation cluster for the same data, forming a set of upstream and downstream message offset mapping, and we store all the message offset mapping to the offset mapping store as the basis for message offset calculation.

2) The Group offset fetcher module is used to regularly obtain the offset that each consumer group is consuming, as an example in the above figure, the data of the application is consuming SLC, and the module will periodically track the offset of the consumer group on the SLC aggregation cluster.

3) The Group offset calculator module calculates the largest offset on other aggregation clusters without losing data according to the offset applied on the current aggregation cluster, and submits it to the corresponding cluster.

From a global perspective, the whole behavior is that when the application consumes data from an aggregation cluster, the Offset Management Service will monitor the consumer group of the application and submit a maximum message shift that will not lose data to other aggregation clusters, so that when the consumer group switches clusters, it will be Offset Management from the previous Offset Management The message displacement submitted by the service begins to be consumed to achieve seamless breakpoint continuation.

When implementing this solution, the application uses the platform-customized PubSubLib, which is combined with the Discovery Service to ensure that all instances in the same consumer group are consumed from the same aggregation cluster at the same time. This ensures that the Offset Management Service obtains the latest offset of the consumer group from the corresponding aggregation cluster and accurately submits it to the remaining clusters.

Acquisition and storage of upstream and downstream message displacement maps

Demystifying eBay Kafka's cross-data center high-availability solutions

Data replication from Kafka local layer (upstream) cluster to aggregation layer (downstream) cluster We use the Mirrormaker component, and have made some customizations to Mirrormaker so that it can periodically snapshot and record the offset mapping relationship of local and aggregation clusters, and write to the internal topic of the local Kafka cluster. The Offset Management Service obtains the message displacement relationship of all data center locals and aggregations by listening to these internal topics and stores them in the offset mapping store. The data content of each offset mapping is shown in the figure below, and the current maximum offset (LEO) usage of each upstream cluster that is additionally recorded will be expanded in detail in the next section when discussing offset calculation.

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 7. Message displacement mapping table contents

We have two implementations for the storage of offset mapping:

  • Database-based implementation
  • Implementation based on local memory + Kafka persistence

In the early stage of the project, we stored offset mapping in MySQL, which has the advantage of simple and convenient implementation of debugs, and the use of composite indexes can also achieve good query performance. However, as the number of topics and partitions increases, the amount of data in the offset mapping table expands dramatically, easily reaching tens of millions, which degrades the insert and query performance of MySQL.

Considering the natural aging properties and read-only characteristics of offset mapping, we change the storage of offset mapping to a local memory + Kafka persistence implementation solution to improve the scalability of the system and meet the scenarios of larger data volume. When the offset mapping is received by the Offset Management Service, it caches the data to local memory and sends it to the Kafka cluster where offset mapping is stored. When the service is restarted, unexpired offset mapping is loaded into memory from the Kafka cluster. All OFFSET queries return results directly in memory, ensuring query performance. The Offset Management Service is flexible in our design and shards data based on streams. On the server side, we have an expiration cleanup mechanism for offset mapping to avoid memory bloat.

Calculation of message drift across clusters

Demystifying eBay Kafka's cross-data center high-availability solutions

As mentioned earlier in the article, each cluster of the aggregation layer copies data from each cluster in the local layer and cross-merges it, and a message in the local layer is copied and the offset of each aggregation layer cluster is inconsistent. With the offset mapping table, the Offset Management Service can convert an offset of an aggregation cluster to the offset of each local cluster. Therefore, when calculating the offset2 of aggregation cluster D according to the offset1 of aggregation cluster C, as long as the message before cluster D offset2 corresponds to the offset of each local cluster is less than or equal to the offset of each local cluster before cluster C offset1, the consumer group can be guaranteed to switch from cluster C offset1 to cluster D When offset2 continues to read data, all unread messages on cluster C can be read in full on cluster D.

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 8. A topic partition conversion of message displacement across clusters

Figure 8 shows an example of a dual-DC cluster computing offset. Cluster A/B is two local clusters, data is copied to C/D two aggregation clusters, and an offset mapping table is on the right. Due to the huge amount of data recorded for each piece of data offset mapping, we take regular recording. Let's explain the calculation process step by step, based on the offset 8 offset of cluster C in Figure 8 to calculate the offset of cluster D as an example:

1) Find the offset set corresponding to each local cluster according to the cluster C offset '8' (hereinafter referred to as C8): The lookup table found that the offset mapping of C8 does not exist, and continued to look forward for the mapping record less than offset '8' in C in the offset mapping table, and found the two offset mappings B3 -> C5 and A4 -> C7.

2) According to 1) the offset of each local cluster obtained respectively find the offset of cluster D: respectively, find the offset of A4 and B3 corresponding to D in the table, you can find A4 -> D8, and cluster B looks forward to find B2 -> D5.

3) Take the minimum value from the offset candidate obtained by 2): take the smaller between D8 and D5, so the offset of D is calculated as 5 for C8.

The above calculation method works well when all local clusters have traffic, but when a local cluster has no traffic, the algorithm will get a fairly small offset, so that the consumer group will repeatedly consume a large amount of data when switching the aggregation cluster. In order to reduce duplicate consumption, we have enhanced the algorithm to exclude local clusters where data traffic has stopped during the calculation.

When recording each offset mapping, we will obtain the maximum offset (log end offset (LEO) of each local cluster for the same partition at this time, and record it together in the offset mapping table.

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 9. After the introduction of LEO, a topic partition cross-cluster message displacement calculation

Figure 9 is an example of offset calculation after adding LEO, the offset mapping table on the right has an additional column for recording the LEO of all upstream clusters, and the following is an example of the offset 8 calculation cluster D of cluster C in Figure 9 to explain the new calculation process:

1) Find the offset corresponding to each local cluster according to C8: According to C8, you can find A4 -> C7 and B2 -> C6 offset mapping.

2) According to 1) each local cluster offset is found to find the offset corresponding to D: the mapping records found are A4->D6 (A:5, B:2) and B2->D3 (A:3, B:2).

3) According to the results obtained by 2) determine whether there is a local cluster with traffic stopped for result pruning: it can be seen that there is no new data import (no change in LEO) between the two mapping records, and the last record of local B has been imported to D3 (LEO==B2), at which point the search results B2->D3 can be safely ignored.

4) According to 3) take the minimum value in the result after pruning: after discarding D3, only D6 remains, which is the final value found.

For the above example, C8 computes a D cluster offset of 6. If no pruning is done, the result is D3, and when the subsequent offset value in C is converted to the offset of D, the result will be D3, which is bound to introduce a lot of unnecessary duplicate messages.

Demystifying eBay Kafka's cross-data center high-availability solutions

Client SDK (Pub-Sub Library)

With the Discovery Service, clients can automatically connect based on selected health clusters. We only need to add a scheduled probe thread on the client to query the metadata probe service, so that all producer or consumer instances in the application can know the current reachability status of each Kafka cluster. Each time it receives the return result of the metadata probe service, the HA Client compares the results of the previous round of detection stored locally and triggers a cluster switchover when it finds that the two have changed.

Rheos provides three interfaces to use the Rheos HA Client, namely Generic Kafka HA Client, Flink HA Connector, and Rheos SQL.

Generic Kafka HA Client

Demystifying eBay Kafka's cross-data center high-availability solutions

To achieve fast switchover, we pre-initialize KafkaProducers to each healthy local cluster in each HA Producer instance, and designate one of them as the primary producer and the rest as the standby producer. When it is detected that you need to switch the Kafka cluster, select a healthy producer from the standby producer list to quickly upgrade to the primary producer.

The switching logic of HA Consumer is similar to that of HA Producer, except that there is only one Kafka Consumer in HA Consumer, which is the main Kafka consumer. The reason for this is that each aggregation cluster is full data, and HA consumers can only read from one of them. In addition, the Offset Management Service periodically synchronizes the consumer group offset of the primary Kafka to the standby Kafka, which requires that there cannot be a consumer instance of the consumer group in the standby Kafka, otherwise the consumer group offset cannot be written to the target Kafka.

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 10. Generic Kafka HA Client switch schematic

For producers, in order to make sending messages more reliable, we provide an additional mechanism that when a message fails to the primary Kafka, HA Producer will retry sending to another standby Kafka cluster in response to intermittent failures other than cluster health factors. The mechanism behind this is that when the main producer sends data, the HA Client creates a callback function. When an exception is returned, this callback function will call the alternate producer to resend the data once. To enable this feature, simply enable the Cross DC Retry option when creating the HA Producer. Retrying across clusters leads to more reliable messaging, but has the side effect of breaking the order of data in the same partition. Because when the data is forwarded by the standby producer to another alternate Kafka, they are considered to be from different data sources in MirrorMaker, and the data will be mirrored by different read and write tasks, so the original order of data cannot be guaranteed.

Flink Kafka HA Connector 与 Rheos SQL

Demystifying eBay Kafka's cross-data center high-availability solutions

The Flink Kafka HA Connector was developed based on Flink-1.13. Among them, Kafka HA Sink Connector implementation is relatively simple, based on SinkFunction's Sink Connector, we replace the client in the community's KafkaSinkFunction with the HA client described above, so that Kafka Sink Connector has the ability to automatically switch between active and standby Kafka clusters. The implementation of the Source Connector is relatively complex. Since Flink-1.12, the community has introduced a new Source interface (FLIP-27), which introduces concepts such as Enumerator, Split, SourceReader, etc.

Enumerator generates Splits from data sources and assigns them to each SourceReader. Based on the assigned Split, SourceReader reads the corresponding data. In our implementation of the Kafka HA Source Connector, in addition to basic Split creation and allocation, Enumerator also acts as a metadata probe thread. During the runtime, Enumerator periodically queries the metadata probe service to detect if the main Kafka has switched, and once the switch is found, Enumerator will generate a SwitchClusterEvent and send it to all SourceReaders. After receiving the cluster switch notification, SourceReader will first stop the current KafkaConsumer (at this point the consumer may have encountered problems and stop consumption), and query the state management service for the corresponding offset in the new main Kafka according to the current offset of the assigned topic-partition, and then start a new KafkaConsumer to consume data from the new main Kafka.

Demystifying eBay Kafka's cross-data center high-availability solutions

Figure 11. Flink Kafka Ha Source Connect architecture

In the process of creating a checkpoint by Flink job (the same applies to savepoint), the Kafka HA Source Connector will write the allocation result of the Split, the information of the current main Kafka, and the offset of the corresponding topic-partition to the checkpoint in order to resume data consumption when the job restarts abnormally. In order to prevent the checkpoint generated before being invalid after the main Kafka switch. When a Flink Job resumes from a checkpoint, if Enumerator finds that the main Kafka stored in the checkpoint is different from the currently detected primary Kafka, Enumerator will send a cluster switchover notification to SourceReader. SourceReader requests the corresponding offset in the new main Kafka from the state management service based on the offset stored in the checkpoint, and restores the data consumption from the converted offset.

For Rheos SQL Connector, SQL API is a high-level encapsulation of the Flink API, and the underlying implementation is consistent.

04

Demystifying eBay Kafka's cross-data center high-availability solutions

Summary

Summary

This paper discusses the idea of designing Kafka cross-data center high availability scheme based on local-aggregation cluster topology, and supports the high availability and continuity of upstream and downstream data and services. Cross-regional high availability has always been a hot topic in the Kafka community, and the industry has not yet formed a unified standard solution. The author hopes that the content of this article can be introduced and can give some reference and reference to peers.

Authors: Chao Hui, Wang Yu, Hao Wei

Source: WeChat public account: eBay Technology Club

Source: https://mp.weixin.qq.com/s/Rx3k6yvhThfqp_hiP_EIMg