laitimes

iQIYI Data Lake Combat - Real-time lakehouse integration

author:Flash Gene

01

overview

Data is the basic data for insight into users, markets, and operational decisions, and is widely used in iQIYI in scenarios such as recommendation, advertising, user growth, and marketing. iQIYI's big data business used the Lambda architecture to meet the needs of massive data processing and timeliness, but the cost of development, maintenance and resources was high, and there were also data silos at the same time. In recent years, data lake technologies such as Iceberg, Hudi, and Delta Lake have emerged to provide the foundation for building a unified data architecture. In 2020, the iQIYI big data team introduced Iceberg as the base of the data lake, and based on Iceberg + Flink, it built a stream-batch integrated data production architecture to replace the traditional Lambda architecture, creating a real-time lakehouse, which has been implemented in various scenarios such as BI data warehouse, membership, recommendation, and logs. Currently, petabytes of data are processed every day, and the latency is reduced from hours to 1 to 5 minutes, greatly improving the efficiency of data circulation. This article introduces the iQIYI integrated architecture and the practice process of real-time lakehouse construction.

02

Traditional Lambda architecture: offline + real-time data production links

iQIYI Data Lake Combat - Real-time lakehouse integration

Figure 1 Lambda Architecture

iQIYI adopts a typical data architecture of data integration, data warehousing, and data development, and applies data from sources such as buried points, back-end logs, databases, and indicators to scenarios such as recommendation, search, marketing, and reports. The construction of the entire data architecture is divided into two links, as shown in Figure 1:

  1. Offline channel: Using tools such as HDFS, Hive, and Spark, an offline data warehouse is built based on the hierarchical data warehouse architecture of ODS, DWD, and DWS. Through tools such as Venus log collection, MySQLIO CDC synchronization, and Hubble monitoring platform, different data sources are integrated into the offline data warehouse, and two development platforms are provided: Moojing offline analysis and Babel offline computing, to support data application development. Hive tables in a data warehouse are partitioned by hours, and there is usually an hour-level delay from data generation to business applications.
  2. Real-time pathway: The value of data decays rapidly with the growth of the life cycle, and the delay of offline pathway makes it impossible for businesses to quickly utilize the value of data, so a real-time path is built. Refer to offline data warehouses to build real-time streaming data warehouses using Kafka and Flink. Data integration tools such as Venus, MySQLIO, and Hubble support the integration of different data sources into real-time streaming data warehouses. At the data development level, the RCP real-time computing platform and RAP real-time analysis platform are added to support real-time data application development. The real-time data path can achieve a latency of seconds.

Although the above Lambda data architecture can meet the requirements of business scenarios, it brings the following problems:

  1. High system complexity: The service provider needs to provide multiple components, and the business needs to learn multiple components and develop and maintain both offline and real-time programs.
  2. Data consistency is difficult to guarantee: There are two sets of codes, real-time and offline, and it is a great challenge to achieve unified data processing logic and data consistency.
  3. High resource cost: There are two sets of duplicate storage and computing in multiple links of the data link, which is costly.
  4. High data latency: Although the latency of real-time data is low, the storage time is short, and the data query and analysis requirements still rely on offline data, with T+1 latency.

In order to solve these problems, we integrated the data production link and built a real-time lake house based on the data lake technology.

03

Real-time lakehouse integration

Introduction to data lakes

Before introducing the real-time data lakehouse, let's briefly introduce the data lake (for more background introduction, please refer to the previously published article "iQIYI Data Lake in Practice"). iQIYI built a data lake based on Iceberg, using HDFS as the underlying storage, Alluxio as the caching layer, and Iceberg as the table format tool, and the metadata of the table is managed through the Hive metastore, as shown in Figure 2. The business uses computing engines such as Spark, Flink, and Trino to query and process lake data on different development platforms.

iQIYI Data Lake Combat - Real-time lakehouse integration

Figure 2 iQIYI Data Lake

Iceberg is the on-ramp to the read-write data lake and the most important component of the real-time lakehouse architecture, which is as follows:

iQIYI Data Lake Combat - Real-time lakehouse integration

Figure 3 Iceberg architecture

Iceberg is divided into a data layer, a metadata layer, and a Catalog layer. In our data lake architecture, the Catalog layer is the Hive metastore, and the data layer data files are stored on HDFS. The metadata layer stores statistical information such as the schema, partitions, and maximum and minimum values of each column. Iceberg uses snapshots for read/write isolation, each write cycle corresponds to a snapshot, new data is written to a new data file first, and then a metadata file is generated, a snapshot is generated through the commit operation, and the atomic update is updated to the catalog, and the snapshot with a successful commit can be read.

Iceberg 具备如下特性:

  • Controllable data latency: The faster the snapshot is generated, the lower the data latency, which can be less than 1 minute.
  • Integration of stream and batch: Iceberg supports both Spark and Flink engines to read and write data in batch mode and stream mode, achieving the integration of stream and batch at the storage level.
  • Low resource cost: The underlying storage is on HDFS, which supports columnstore and data compression, which is theoretically comparable to the storage cost of Hive and much smaller than the storage cost of Kafka. Iceberg adds statistics to the metadata layer that can be used for query optimization, and the query cost is lower than Hive.
  • Support data changes: Tables in Iceberg V2 format support row-level data changes, which can better integrate database data.

Based on the above features, Iceberg can completely replace Hive and replace Kafka in scenarios with minute-level latency.

Real-time lakehouse architecture

iQIYI Data Lake Combat - Real-time lakehouse integration

Figure 4 Real-time lakehouse architecture

Based on Iceberg and Flink, we built a real-time lakehouse architecture as shown in Figure 4. The data of each layer of ODS, DWD, DWS is stored in Iceberg, and the data of each layer is produced in real time through Flink streaming consumption of upstream data. Each table in the data warehouse can be processed in real-time by the RAP real-time analysis platform, the RCP real-time computing platform, and the Moojing offline analysis platform and the Babel offline computing platform in batch mode. In terms of data integration, Venus log collection, MySQLIO CDC synchronization, and Hubble monitoring support writing log or metric data to Iceberg. Relying on Iceberg's ability to support row-level data changes, Flink CDC can be used to synchronize all data in the database in real time. The data architecture of a data lakehouse has the following characteristics:

  • Integrated data lakehouse: It has the flexibility of a data lake and the structured data management capabilities of a data warehouse, which can store structured, semi-structured, and unstructured data in a unified manner, form a unified data base, and eliminate data silos.
  • Stream-batch integration: The real-time and offline data warehouse production channels under the Lambda architecture are combined into one, and each level of the data warehouse produces one piece of data, which can be used for batch computing of data reading by hour and day, as well as real-time computing of streaming consumption. One set of data warehouse avoids the problem of developing two sets of code and aligning the calculation logic, which can greatly improve the efficiency of data development
  • Low resource cost: Compared with the real-time and offline data warehouse production channels, the data production and storage costs of a real-time data lakehouse with integrated streaming and batch are lower. Compared with Hive, the Iceberg metadata layer has richer statistics, which also helps improve query performance and reduce query costs.
  • Low latency: Iceberg supports minute-level data visibility, and the real-time data lakehouse can achieve minute-level latency by writing to Iceberg in real time. The data user does not need to use stream and batch data fusion on the application side to support the latest full data, simplifying the processing logic on the business side.

challenge

Although the real-time lakehouse architecture has great benefits, combined with iQIYI's data system, it needs to deal with the following challenges when applied in the production environment:

  • Single-task production of multiple tables: In iQIYI's data sources such as buried points and container logs, logs from different businesses are mixed together, and they need to be split into ODS layer tables of different businesses according to log characteristics. More than 500 tables may be split into a single task consuming a data source, and the largest task needs to be split into more than 3,000 tables, for a total of tens of thousands of tables. The current Flink Iceberg Sink can only be written to a single table, and a single Flink task cannot add as many Iceberg Sinks. Therefore, how to solve the problem of producing multiple tables in a single task is the primary problem we face.
  • Data production progress evaluation: You need to evaluate the data production progress in the process of producing data in real time, so that the data users can understand the completeness of the data in the data lakehouse and trigger downstream batch tasks.
  • Backlog monitoring of streaming consumption: For downstream streaming consumption tasks, you need to provide consumption backlog monitoring like consuming Kafka.
  • Data integrity assurance: In the Lambda architecture, when the data path fails, the data can be corrected by rescheduling batch calculations to ensure the final completeness of the data. The lakehouse architecture also needs to have a mechanism to ensure data integrity.

Let's take a look at how we can address these challenges.

04

solution

Produce multiple tables in a single task

iQIYI Data Lake Combat - Real-time lakehouse integration

图 5 原生 Flink Iceberg Sink

The native Flink Iceberg Sink consists of a writer operator and a committer operator, as shown in Figure 5. The Task of each Writer operator writes the data to the Iceberg data file, and then encapsulates the file information in the WriterResult and sends it to the Committer operator at Checkpoint. The parallelism of the Committer operator is 1, there is only one Task, and after collecting the WriterResult sent by all the Tasks of the Writer operator, submit the file information to the Iceberg table to generate a new Snapshot. The native Flink Iceberg Sink only supports writing to one table, if you write multiple tables, you need to add multiple sink operators, if there are too many, the Flink job will not run. Therefore, in order to support the scenario of writing a large number of tables at the same time for a task, we developed a Flink Iceberg Sink that supports multi-table writing, as shown in Figure 6.

iQIYI Data Lake Combat - Real-time lakehouse integration

Figure 6 Flink Iceberg Sink with Multi-Table Write

相比原生 Flink Iceberg Sink,主要改造点如下:

  1. 定义 MultiTableRow 类型。 相比 Row,增加了所属的 Table 名称。
  2. In order to prevent the Task of each Writer operator from writing too many tables, causing too many small files, excessive memory usage, and performance problems, the Partitioner operator is added before the Writer operator. The Partitioner operator is used to route the data of the same table to several Writer Tasks, and one Writer Task only handles writes to some tables.
  3. Writer 算子基于 MultiTableRow 中的表名字加载表,将数据写入到对应表的文件。 在 Checkpoint 时,首先按表名汇总各表新写入的文件构建 MultiTableWriteResult 对象,MultiTableWriteResult 对象相比 WriterResult 增加了表名信息。 然后按表名 Shuffle 后发送给 Committer 算子。
  4. Committer 算子的并行度不为 1,为默认并行度。 在 Checkpoint 时,每个 Committer 算子的 Task 基于收到的MultiTableWriteResult 汇总各表的写入文件,提交到对应的表生成新的 Snapshot。
iQIYI Data Lake Combat - Real-time lakehouse integration

图 7 使用 MultiTable Flink Iceberg Sink 生产多表并自动负载均衡

When using a Flink Iceberg Sink with multiple table writes, it is very important to design a suitable partitioner, which may lead to an unbalanced load of each task of the writer operator if not implemented properly. As shown in Figure 7, in our Partitioner implementation, a Shuffle policy is dynamically generated based on the size of newly written data to each table and the number of slots allocated by Flink tasks. The data of large tables is routed to multiple Writer Tasks, and the data of multiple small tables is routed to a Writer Task, ensuring that the amount of data processed by each Writer Task is basically balanced, achieving the best write performance and reducing the number of small files. Using Flink Iceberg Sink, which writes multiple tables, we produced more than 3,000 tables in one online task and more than 500 tables in multiple tasks.

Data production progress assessment

For example, a batch task that calculates the hourly data of a table needs to be triggered after the data is complete. Therefore, Iceberg also provides partition integrity like a Hive table. We modified the Flink Iceberg Sink to achieve it, and for the native single-table sink, our transformation solution is shown in Figure 8, and our self-developed multi-table sink transformation solution is similar.

iQIYI Data Lake Combat - Real-time lakehouse integration

Figure 8 Iceberg Table Production Progress Assessment

The Flink Iceberg Sink is responsible for writing data and contains two operators: Writer and Committer. The Task of the Writer operator is responsible for writing data to Iceberg's data file, and at the same time as writing the data, it records the maximum time encountered max(ts) based on the value of the specified time column. At checkpoint, each Writer Task passes max(ts) and the newly written file to the Committter, and when the Committer submits the file to generate a new Snapshot, it will record the minimum value of the received max(ts) minus the allowable delay time as the Watermark corresponding to the Snapshot and record it in the Summary of the Snapshot. The data lake platform periodically reads the Wartermark in the Snapshot Summary of the table to determine that if it belongs to a new hour, the data partitioned in the previous hour is complete, and submits the complete information to the progress management service, which in turn triggers the corresponding batch tasks on the offline computing platform.

Streaming consumption backlog monitoring

In stream computing scenarios, data production latency is an important data quality metric. Flink Kafka Souce uses specialized services (such as Kafka Manager and Burrow) to calculate the difference between the offset produced and the offset submitted by the consumption to evaluate the consumption backlog and determine whether there is a delay in data production. Streaming consumption Iceberg also needs to have a similar mechanism, and the community version of Flink Iceberg Source does not have similar features and needs to be modified to implement.

Flink Iceberg Source 基于 Flink FLIP-27 中的 Source Interface 实现,如图 9 所示。 该接口中有两个重要的组件:Split Enumerator 和 Reader,功能如下。

  • Split Enumerator: Discovers a new Snapshot for the Iceberg table, reads the files in the Snapshot, and splits them into Split chunks.
  • Reader: receives the Split block allocated by the Split Enumerator, reads the data in the Split block, and sends it to the downstream operator.
iQIYI Data Lake Combat - Real-time lakehouse integration

图 9 Flink Source Interface 核心组件

Iceberg's Snapshot is the smallest granularity of data writing and consuming to obtain a new file, and our solution is to calculate the latency of the current consumption of Snapshot within the task. Specifically, in the Split Enumerator component, the current time minus the generation time of the newly obtained Snapshot is integrated into Flink's Metrics system as a delay metric. Flink's Metrics has been connected to the Prometheus+Grafana-based monitoring and alerting platform, which can easily configure delayed monitoring and alerting.

Data integrity assurance

How can I fix the data in a real-time data lakehouse and ensure the completeness of the data when a component service failure or code bug causes data loss? Although the streaming task can retrospectively recalculate the data, it will cause the delay of processing new data, and it is difficult to determine the exact location of the backtrack, resulting in the possibility that the data may still be duplicated or lost, and the upstream data may be expired and deleted. Streaming tasks are developed on the RCP Realtime Compute platform, and our solution is to extend the RCP Realtime Compute platform to support a Flink application that can run in both streaming and batch mode, and use batch tasks to correct the results of streaming tasks, as shown in Figure 10.

iQIYI Data Lake Combat - Real-time lakehouse integration

Figure 10 Data Correction

Normally, an app only runs continuous production data in streaming mode. When there is a problem with the data, if the stream mode job is kept running, a new task is scheduled to run in batch mode to correct the data of the downstream table by table partition. RCP also supports scheduled batch tasks, which can overwrite the flow calculation results with the batch calculation results of each day. The runtime parameters and configurations of batch tasks and flow tasks are different, and RCP supports the use of different data sources and running configurations for SQL development applications in the flow and batch run mode. At the same time, when starting a task, the current running mode is passed through the JVM parameters, so that the task can adjust the execution logic based on different modes. Based on the RCP stream batch integration function, you can use homogeneous and heterogeneous data sources to correct the result table produced by the stream task without affecting the stream task.

05

Landing effect

After the launch of the real-time lakehouse integrated architecture, it has been implemented in the following scenarios:

  • Venus: Venus is iQIYI's log platform, which is responsible for back-end log collection, storage, and analysis. Originally, logs were stored in Elasticsearch, which was separated from the big data system, but now most of the logs are stored in the lake, corresponding to the ODS layer tables of the real-time lakehouse. Originally, container logs were collected from a unified Kafka topic, then divided into business topics by business, and then written to Elasticsearch. After migrating to the real-time lakehouse architecture, the unified Kafka topic is split by business and written to Iceberg directly, eliminating the need for business topics. Venus writes more than 10,000 Iceberg tables in total, and writes petabytes of logs every day with a latency of 5 minutes, saving tens of millions of dollars in costs every year. For a detailed introduction to Venus' transformation into the lake, see the previously published article "iQIYI Data Lake Practice-Evolution of Log Platform Architecture Based on Data Lake"
  • Pingback:Pingback is the general name of iQIYI's buried data,Most of the buried data needs to go through the data warehouse architecture ODS、DWD、DWS hierarchical processing。 We use the real-time lakehouse Iceberg table instead of the Hive table in order of data warehouse hierarchy from front to back and business importance from lowest to highest. At the same time, it also promotes the migration of some businesses that can accept minute-level latency from Kafka data to Iceberg tables. At present, more than 1,300 Iceberg tables are online, hundreds of terabytes of data are added every day, and the latency of each layer of Iceberg tables is at least 1 minute.
  • Database data: Flink CDC supports transparent switching of full and incremental data and writes to Iceberg in real time. At present, nearly 100 tables have been synchronized with advertising, membership, user growth and other businesses.

06

Planning for the future

The real-time lakehouse integrated architecture has been fully verified in practice, and has achieved great benefits in terms of cost savings, improving data timeliness, and reducing data complexity.

  1. We will continue to launch more business scenarios to completely replace Hive and Kafka data that accepts minute-level latency.
  2. Upgrade Flink CDC from 2.x to 3.x to support automatic schema changes, reducing the maintenance cost of database data ingestion into the lake.
  3. Iceberg does not support features such as updating some columns and continuing to build pipelines based on change data, which limits some use cases, and is introducing Paimon, an emerging data lake technology, as a replacement for such scenarios.

Author: Big Data Team

Source-WeChat public account: iQiyi technical product team

Source: https://mp.weixin.qq.com/s/EifwIXT6mbcDrZWsaAaWrQ