laitimes

How Twitter optimized the process for handling 400 billion events

How Twitter optimized the process for handling 400 billion events

introduction

Twitter processes around 400 billion events in real-time and generates a petabyte of data per day. Twitter consumes data from a variety of event sources, such as distributed databases, Kafka, Twitter event buses, and more.

How Twitter optimized the process for handling 400 billion events

Example of an event call in a Twitter feed

In this article, we will try to understand:

1.How did Twitter handle incidents in the past, and what were the problems with that approach?2.What business and customer influences prompted Twitter to move to the new architecture?3.New architecture4.Performance comparison between the old and new architectures.

To handle incidents, Twitter has its own set of internal tools, such as:

1.Scalding is a tool that Twitter uses for batch processing. 2. Heron is Twitter's own stream processing engine. 3.TimeSeriesAggregator (TSAR) is used for batch and real-time processing.

Before we dive into how event systems evolve, let's take a brief look at these four internal tools.

1. ScaldingScalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. Scalding is built on top of Cascading, a Java library that abstracts the details of the underlying Hadoop. Scalding is comparable to Pig, but provides tight integration with Scala, bringing the benefits of Scala to MapReduce jobs. 2. HeronApache Heron is Twitter's own stream processing engine, developed due to the need to process petabytes of data, increase developer productivity and simplify debugging. The streaming application in Heron is called a topology. A topology is a directed acyclic graph with nodes representing data computation elements and edges representing the flow of data flow. There are two types of nodes:1. Spouts: They connect to the data source and inject the data into the stream2.Bolts: They process the incoming data and emit the data

How Twitter optimized the process for handling 400 billion events

For more information, please refer to: https://blog.x.com/engineering/en_us/a/2015/flying-faster-with-twitter-heron

1.TimeSeriesAggregator

How Twitter optimized the process for handling 400 billion events

Twitter's data engineering team is challenged to process billions of events every day, whether in batches or in real time. TSAR is a robust, extensible, real-time event time series aggregation framework primarily used to monitor engagement: aggregate interactions with Tweets, segmented by multiple dimensions (e.g., device, engagement type, etc.).

Let's examine how Twitter works at a very high level. All Twitter features are powered by microservices spread across the globe, including more than 100,000 instances. They are responsible for generating events, which are sent to the event aggregation layer, which is built by an open-source project of Meta. This layer is responsible for grouping these events, running aggregation jobs, and storing the data in HDFS. These events are then processed and formatted to recompress the data to create a well-formed dataset.

How Twitter optimized the process for handling 400 billion events

Old architecture

How Twitter optimized the process for handling 400 billion events

Twitter's legacy architecture was based on a lambda architecture, which included a batch layer, a speed layer, and a service layer. The batch portion is a log generated by the client and stored on the Hadoop Distributed File System (HDFS) after the event is processed. Twitter has built several extension pipelines to pre-process raw logs and ingest them into the Summingbird platform as an offline source. The real-time component source for the velocity layer is the Kafka topic.

Once the data is processed, the batch data is stored in the Manhattan distributed system, while the real-time data is stored in Twitter's own distributed cache, Nighthawk. TSAR systems, such as TSAR Query Service, Query Cache, and Database, are part of the service layer.

Twitter has real-time pipelines and query services in three different data centers. To reduce batch computing costs, Twitter runs a batch pipeline in one data center and replicates the data to two other data centers.

Can you think of why real-time data is stored in a cache instead of a database?

Challenges in legacy architectures

Let's try to understand the challenges that this architecture can encounter in real-time event processing.

How Twitter optimized the process for handling 400 billion events

Let's understand this with an example:

Let's say there's a big event, like the FIFA World Cup. The Tweet source will start sending a large number of events to the Tweet topology. The bolts that parse the tweet were unable to process the event in a timely manner, and there was backpressure inside the topology. When the system is under back pressure for a long time, Heron Bolts may accumulate spout lag, which indicates high system latency. Twitter has observed that when this happens, the drop of topology lag takes a long time.

The operational solution used by the team was to restart the Heron container to start processing the flow again. This can lead to the loss of events during operations, resulting in inaccurate aggregate counts in the cache.

Now let's try to understand the example of a batch event. Twitter has several recomputation pipelines that process petabytes of data and run them every hour to sync the data into the Manhattan database. Now let's imagine if the sync job takes more than an hour, and the next job is scheduled to start. This can lead to an increase in the back pressure of the system and can lead to data loss.

As we can see, the TSAR query service integrates Manhattan and caching services to provide data to customers. Due to the possible loss of real-time data, the TSAR Service may provide inaccurate metrics to the Client.

Let's try to understand the customer and business impact that prompted them to address this issue:

1. Twitter Ads is one of Twitter's most dominant revenue models, and if its performance suffers, it will directly affect their business model. 2. Twitter offers a variety of data products and services to retrieve information on impression and engagement metrics, which can be affected by inaccurate data. 3. Another problem is that it can take several hours from event creation to availability for use because of batch jobs. This means that the data analysis or any other operations performed by the client will not have the most up-to-date data. There may be a time lag of several hours.

Now, this means that if we want to update a user's timeline based on user-generated events, or do user behavior analysis based on their interaction with Twitter's system, customers won't be able to do that because they need to wait for the batch to complete.

New architecture

How Twitter optimized the process for handling 400 billion events

The new architecture is built on Twitter data center services and the Google Cloud platform. Twitter builds an event processing pipeline that converts kafa topics into pub sub topics and sends them to Google Cloud. On Google Cloud, the streaming data flow job performs real-time aggregation and sinks the data into a BigTable.

How Twitter optimized the process for handling 400 billion events

For the service layer, Twitter uses an LDC query service on the frontend of Twitter's data center and the backend of Bigtable and Bigquery. The entire system can stream millions of events per second with low latency (about 10 milliseconds) and can be easily scaled during periods of high traffic.

This new architecture saves the cost of building batch pipelines, and for real-time pipelines, Twitter is able to achieve higher aggregation accuracy and consistently low latency. In addition, they don't need to maintain different aggregations of real-time events across multiple data centers.

Performance comparison

How Twitter optimized the process for handling 400 billion events

The new architecture provides lower latency and delivers higher throughput compared to the Heron topology in the old architecture. In addition, the new schema handles the count of delayed events and does not lose events when doing real-time aggregation. What's more, there are no batch components in the new architecture, thus simplifying the design and reducing the computational costs that existed in the old architecture.

conclusion

By migrating its legacy TSAR-based architecture to a hybrid architecture of Twitter's data centers and Google Cloud platform, Twitter was able to process billions of events in real-time and achieve low latency, high accuracy, stability, architectural simplification, and reduced operational costs for engineers.

Read on