laitimes

From T+1 to minutes, iQIYI data lake application and upgrade practice

author:DBAplus Community

This article is based on the content of Mr. Du Yifan's online speech shared in "Deeplus Live: Construction and Practice of Big Data System in Complex Scenarios of iQiyi".

From T+1 to minutes, iQIYI data lake application and upgrade practice

Share the synopsis

1. Overview of iQIYI's data center

2. Introduction to iQIYI's data lake system

3. Comparison of iQIYI data lake technologies

Fourth, iQIYI's data lake business landed

5. iQIYI data lake performance optimization

6. Follow-up plan

七、Q&A

1. Overview of iQIYI's data center

1. The data middle platform supports business development

From T+1 to minutes, iQIYI data lake application and upgrade practice

The above figure shows iQIYI's data business link. The main responsibility of the data middle office is to develop data standards and integrate data from users, businesses, and partners. Through specifications and systems, the data middle platform comprehensively manages and governs data, ensures data security, and provides service support for upper-level business.

Data scientists leverage the data services provided by the data middle office to extract valuable information to support higher-level decision-making and drive innovation. Through technological innovation, we empower content production and distribution with stronger capabilities to provide users with an excellent audio-visual interactive experience on various smart terminals.

From T+1 to minutes, iQIYI data lake application and upgrade practice

The core mission of a data middle office is to effectively manage and govern all data and maximize the value of data at the lowest possible cost.

To achieve this, we need to standardize and measure all aspects of the data system. This not only helps to ensure the quality of the data, but also improves the efficiency of data processing, thus ensuring the smooth running of data governance.

On this basis, we have built an efficient data processing link to form a complete data system to meet the data needs of various business departments. By combining data processing and artificial intelligence technologies, we are able to significantly improve the efficiency of data processing, making it easy for users at all levels, operators, data analysts, and data scientists to use data and derive valuable insights and decision support from it. This not only helps drive business innovation and growth, but also boosts the company's overall competitiveness.

2. The development process of data work

From T+1 to minutes, iQIYI data lake application and upgrade practice

1) Fragmented state

In the early days, we generated raw data directly into the data report level and stored it in a Hadoop cluster, which was a great productivity when the business was relatively simple. However, as businesses become more complex, it becomes increasingly difficult to understand and find data. The average user needs to repeatedly ask how data is used, how to find it, or leave data requirements to data workers, which can have a big impact on overall efficiency.

2) Platformization

Since 2018, we have been committed to building a platform-based system to improve the efficiency of data production and collection. We have established an offline development platform and a real-time development platform to provide users with more convenient ways to obtain and process data.

3) Standardization

In order to better regulate the use of data, we have redefined the data standards and formulated corresponding buried point delivery specifications for all behaviors. This not only improves the efficiency of burying points, but also reduces the error rate of business burying points, so that the product can better understand the method of designing burying points.

At the same time, we have standardized the data warehouse system and unified the dimensions and indicators to establish a better data service platform.

4) Intelligent

Based on the above foundations, we have established a machine learning platform and a deep learning platform to improve data quality requirements. By building a data quality model, we can better monitor and process data.

5) Systematization

Since 2021, the Data Middle Office has taken the lead in establishing a data specification working group to define the overall data specification and provide measurement standards to evaluate the overall effect of data governance.

At the same time, we introduce new technologies to improve data processing efficiency and provide users with better services.

6) Three-dimensional

Through research on new technologies in the industry, we introduced data lake and stream-batch integrated processing technology, and carried out near-real-time transformation of links. By transforming the link in near real-time, we have achieved more efficient and flexible data processing and analysis to better support business decisions.

2. Introduction to the data lake system

In the early stage of data, due to the dispersion and difficulty of obtaining data, many data silos were formed, which were not effectively integrated and managed in a timely manner, and gradually formed a data swamp.

In order to solve this problem, we began to carry out data governance, first of all, the data warehouse was governed, the unified specification of the data warehouse was reformulated, and a unified data warehouse and data mart were established, and then a data pool was formed. On this basis, we have sorted out and integrated the entire data system, and established a data lake system that can store all data and make it easy to find and use.

1. Data link

From T+1 to minutes, iQIYI data lake application and upgrade practice

The above diagram depicts the composition of iQIYI's data link.

At the data production layer, data sources are divided into two categories, namely C-side and B-side.

C-end data is mainly derived from user terminals, including terminal buried data and terminal logs. The B-end data comes from the back-end logs of its own services, partner data, as well as important business data such as video information and content information.

These data are collected by the receiving layer and the acquisition layer and collected into the data center. At the processing layer, part of the data will be processed into a unified data warehouse, and the whole data will be provided in the form of a data lake, and professional data workers will analyze the data to further explore its value.

They will conduct user portraits and use data to depict the characteristics and preferences of each user; Content comprehension is also carried out, dismantling data into different content models to provide decision support; In addition, they will also conduct report analysis, which is the most widely used way of presenting data, and users at all levels can directly understand the business situation by observing the data changes in the report.

Based on this data, data workers can also distribute content more intelligently by personalizing recommendations to users, so that users can reach the content they are most interested in.

2. Data Architecture

From T+1 to minutes, iQIYI data lake application and upgrade practice

Based on the data link situation, we combined the ideas of data warehouse modeling and data lakes to create a data middle platform architecture that integrates the ideas of data lakes.

The underlying data layer mainly includes various data sources, Pingback is buried data, which is mainly used to collect user behavior, and business data is mainly stored in various relational databases and NoSQL databases. The data in the data layer is stored in the storage layer through various collection tools in the transport layer.

The storage layer is basically stored in HDFS, a distributed file system, where raw files are stored directly in HDFS, and other structured or unstructured data is stored in Hive, Iceberg, or HBase.

At the computing layer, the offline pilot engine is used to drive Spark, MapReduce, or Trino for offline computing, and the scheduling engine Gear is used for workflow scheduling. After several iterations of Europa scheduling stream computing, Flink is currently used as the computing engine.

The development layer on top of the computing layer provides a tool suite and intermediate services for developing offline data processing workflows, integrating data, developing real-time processing workflows, and developing machine learning engineering implementations by further encapsulating the service modules of the computing layer and the transport layer.

The data lake platform manages the information of each data file and table in the data lake, and the data warehouse platform manages the data model, physical model, dimensions, indicators and other information of the data warehouse.

  • Vertically, the delivery management tool manages the most important data in the data center, and the meta-information such as the specification, field, dictionary, and timing of the Pingback buried data;
  • Modules such as metadata centers and resource centers are used to maintain the metadata of data tables or data files and ensure data security.
  • The data quality center and link governance platform ensure data quality and data link production;
  • The underlying services are provided by the cloud services team, and private and public cloud support.

The upper layer provides a data graph as a data catalog for users to find the data they need. Provide self-service applications such as Mojing and Beidou for users at different levels to self-help data work.

The entire architecture system, in terms of data integration and management, is more flexible, can accommodate all data, and through the optimization and upgrading of self-service tools, it lowers the threshold for users to meet the needs of users at different levels. Improve the efficiency and value of data use.

3. Data system

From T+1 to minutes, iQIYI data lake application and upgrade practice

The purpose of the establishment of the data middle platform is to solve the problems of inconsistent statistical caliber, duplicate development, slow response to the demand for indicator development, low data quality, and high data cost caused by the surge of data and the expansion of business. The two have the same goal to a certain extent. Combined with the concept of data lake, we have optimized and upgraded the data system of the data middle platform.

In the initial stage of the construction of the data middle platform, we unified the company's data warehouse system, conducted business research, sorted out the existing field dimension information, summarized the consistency dimension, established a unified index system, formulated a unified data warehouse specification, built the original data layer (ODS), detailed data layer (DWD), and aggregate data layer (MID) of the unified data warehouse, and built a device library, including a cumulative device library and a new device library.

On top of the unified data warehouse, the data team built a thematic data warehouse and business marketplace based on different analysis and statistical directions and specific business requirements. Topic warehouses and business marketplaces contain further processed detailed data, aggregated data, and application-layer data tables, which are used by the data application layer to provide different services to users.

In the unified data warehouse system, the original data layer and below are not open, and users can only use data engineers to process the processed data, which will inevitably cause data details to be lost. In daily use, there are often users with data analysis capabilities who want to access the underlying raw data, conduct personalized analysis or troubleshooting, so we introduce the data governance concept of the data lake, organize the data in the partition management mode of the data lake, enrich the data metadata, and build a good data metadata center.

After the governance of the data lake concept, the original data layer and other raw data, such as the original log files, are colocated in the original area, and users with data processing capabilities in this area can apply for permissions to use them.

The detailed layer, aggregation layer, thematic data warehouse, and business marketplace of the unified data warehouse are located in the product area, and the data has been processed by the data engineers of the data team as the final data product for users to use, and the data in this area has undergone data governance to ensure data quality.

  • Divide sensitive areas for sensitive data storage, and focus on controlling access rights;
  • For temporary tables or personal tables generated by users and data developers on a daily basis, the data tables in this area are the responsibility of the users themselves and can be conditionally opened to other users for use;
  • Maintain the metadata of each data through the metadata center, including table information, field information, and the corresponding dimensions and indicators of the fields, and maintain the data lineage, which includes table-level lineage and field-level lineage.
  • Maintain the asset characteristics of your data through the Data Asset Center, including management of data levels, sensitivities, and permissions.

To enable users to better self-serve their data:

  • Provide a data graph at the application layer as a data directory for users to query data, including metadata such as data usage, dimensions, indicators, and lineage, and as an entry point for permission application.
  • At the same time, it provides a self-service analysis platform to provide data users with self-service analysis capabilities.

4. A data lake is a concept of governing data

From T+1 to minutes, iQIYI data lake application and upgrade practice

In the process of building our data middle office system, the data lake is used as a means of data governance.

The value of a data lake as a data governance idea is that:

1) Be able to store all data, whether it is currently used or temporarily not used, to ensure that the data can be found when the data needs to be used;

2) The data stored in the data lake is scientifically managed, and there is no longer a need for a high degree of involvement of data engineers, and users can find and use data by themselves.

  • More flexible and cheaper data storage: HDFS storage, from three backups to single backups supplemented by other verification methods, for lossless storage, thereby saving machine resources. Iceberg tables are introduced to replace Hive tables as much as possible, making table storage more flexible.
  • More complete data: Improve the timeliness of data, store more complete data, and ensure that the original data is open in case of emergency. But storage is conditional and follows a certain life cycle.
  • Easier data finding: Storing more data at the lowest possible cost enables data finding, so develop data graph tools to make it easier for users to find data.
  • Easier data integration: Develop BabelX, Acciolog, and Venus tools.
  • More efficient data usage: Moomirror is a self-service query tool that can be customized by users. Babel is a platform for developers to develop their own data processing processes. Beidou is an operational analytics tool that analyzes user tags, such as analyzing different audiences for content.
  • More reliable data management: metadata platform, data warehouse platform, data link governance platform, and data asset platform.

5. A data lake is a data technology implementation

From T+1 to minutes, iQIYI data lake application and upgrade practice

A data lake is also a data technology implementation. Because of the characteristics of data lakes to store all data, from a technical point of view, it is necessary to study how to integrate and process data efficiently, which has led to the emergence of new storage formats and stream-batch integrated architectures. The advantage of a data lake is that it can support real-time updates of massive data, reduce storage and computing costs, and solve the pain points of traditional data processing processes.

3. Comparison of data lake technologies

In the course of researching data lake technologies, we investigated three widely used data storage formats: Delta Lake, Hudi, and Iceberg.

1.Delta Lake vs Hudi vs Iceberg

From T+1 to minutes, iQIYI data lake application and upgrade practice

The figure above is a table comparing the characteristics of the three technologies, which records the situation at the time of the survey.

After a comprehensive evaluation, we chose Iceberg as the storage format for the data table.

2. Iceberg is a tabular format

From T+1 to minutes, iQIYI data lake application and upgrade practice

Iceberg is a newly designed open-source table format that supports object storage, HDFS file storage, and its storage tables support row-level updates and row-level transactions. It's not a query engine, it's just better at storing data.

3.Hive vs Iceberg

From T+1 to minutes, iQIYI data lake application and upgrade practice

Compared with Hive tables, the biggest advantage of Iceberg tables is that they can better support row-level updates and improve data timeliness to the minute level, so the timeliness of data can be greatly improved during data processing.

Fourth, the implementation of data lake business

1. Application of data in service logs

From T+1 to minutes, iQIYI data lake application and upgrade practice

Prior to the retrofit, we used the Venus tool to extract logs from different service physical machines and send them to Kafka. This data is then inserted into Elasticsearch through real-time pathways for querying on the query platform. However, Elasticsearch is relatively expensive to deploy, so the overall cost remains high.

In addition, Elasticsearch has stability issues. If a single node fails in Elasticsearch, the write and query functions may be affected. Therefore, we decided to replace Elasticsearch with Iceberg. By using Spark or Trino directly, we can query the data in the Iceberg table.

Since Iceberg is built on top of HDFS, we can leverage HDFS clusters for storage, which effectively reduces storage costs.

In the early days of migrating data from Elasticsearch to Iceberg, we noticed a decrease in query speed. However, for the overall application, this difference is acceptable. There is not much difference between the 5-minute and 2-minute query times. Therefore, we have been continuously optimizing query performance, and we are now able to achieve queries in seconds.

2. Application of user tags

From T+1 to minutes, iQIYI data lake application and upgrade practice

User tags are used to conduct in-depth analysis of user data and assign specific tags to each user, providing important references for operations, advertising, recommendations, and other businesses.

In the old architecture, we wrote messages to the Kafka queue and then wrote them to HBase in real time, and some offline data was added to HBase in batches to correct the data.

However, there are some problems with this architecture.

First of all, the data export speed is slow, and the export can only be achieved at the day level, resulting in poor analysis timeliness. Secondly, in subsequent use, it is found that the O&M cost of Impala is high and the performance is unstable.

With the introduction of Iceberg, we have made changes to the architecture. Currently, when data is written, real-time update information and HBase update information are written to Iceberg. This enables a near-real-time data path. When you use Spark SQL or Trino to perform queries, the query efficiency is very high, which greatly improves the confirmation efficiency and data path efficiency.

The business benefits of the transformation have been significantly improved. We no longer need to rely entirely on HBase, which enables resource reuse and low O&M costs. At the same time, we have improved the data processing speed and query efficiency to better support business scenarios such as advertising recommendations.

1) Pain points of the old architecture

Real-time write: Develop real-time writes to HDFS for offline processing, and batch writes data from offline data warehouses to HBase for data correction. Our service is mainly based on HBase interface support. Offline analysis allows you to export HBase to a Hive table to support offline analysis.

In the old architecture, HBase was used as the participating engine. As a result, the data export speed is slow, and the day-level can only be exported, and the timeliness of analysis is poor. On the other hand, in subsequent use, it is found that the performance and O&M costs of Pilot are high.

2) Business benefits after transformation

After Iceberg is introduced, the architecture is adjusted to write real-time update information and HBase update information at the same time as writing to Iceberg, using a near-real-time pathway. When you use Spark SQL or Trino to query, the query efficiency is very high, improving the confirmation efficiency and data path efficiency.

3. Application of order data in CDC

From T+1 to minutes, iQIYI data lake application and upgrade practice

CDC order data is used to analyze binary logs in real time through MySQL and update the data to a Hadoop cluster for offline query.

Before the introduction of Iceberg, we used MySQL to export all MySQL to Hive every day or incrementally synchronize to Kudu for offline queries, which was difficult to adapt to ad hoc queries. In addition, the old architecture also had problems such as poor timeliness and high cost.

With the introduction of Iceberg, we have simplified the process, improved query efficiency, and reduced timeliness from days to minutes.

At the same time, Kudu nodes are no longer required, achieving resource reuse and low O&M costs. The business benefits of the transformation have been significantly improved, providing us with a more efficient and flexible solution for data processing and analysis.

4. Application of advertising data

From T+1 to minutes, iQIYI data lake application and upgrade practice

Advertising data includes ad click logs and user viewing logs, which support the timely update of advertising algorithm models and provide assistance for ad engines to deliver ads.

Originally, Hive was used to perform offline data analysis, update the data model at the day or hour level, and then form a new model in Hive and Kudu that can be called by algorithms.

At present, we have introduced Iceberg, based on the real-time data of Kafka, and processed the data in the integrated mode of stream and batch, and after the data is entered into the Iceberg table, we use Spark and Trino to analyze the algorithm side and quickly load it to the engine side. This processing method shortens the full-link process that originally took more than half an hour to 7-10 minutes, and simplifies the original architecture system into the Iceberg flow-batch integrated architecture.

5. The application of delivery at the buried point

From T+1 to minutes, iQIYI data lake application and upgrade practice

Pingback buried delivery is the most important channel in the entire data channel, Pingback buried data includes user start and exit, page display clicks and movie viewing data, the overall business basically revolves around these data, is the main data source of the unified data warehouse of the data middle platform.

Before the introduction of data lake technology, the data warehouse processing of the data middle platform used a combination of offline processing and real-time processing to provide offline data warehouse and real-time data warehouse.

The full data is constructed into a data warehouse through traditional offline analysis and processing, and stored in the cluster in the form of Hive tables.

Data with high real-time requirements is produced through real-time links and provided to users in the form of topics in Kafak.

Such an architecture has the following problems:

  • In addition to the core processing and cleaning logic, two sets of code logic need to be maintained, and when there is a rule update, it needs to be updated offline in real time at the same time, otherwise inconsistencies will occur.
  • The offline link is updated hourly and has a delay of about 1 hour, that is, the data at 00:01 may not be available until 02:00, and some downstream services with certain real-time requirements cannot accept it, and must be supported on the real-time link;
  • Although the real-time performance of real-time links is at the second level, the cost is high, and most users do not need to update them in seconds, which is enough to meet the requirements in five minutes, and the consumption of Kafka streams is not as convenient as data tables.

For these problems, the integrated data processing of Iceberg table + stream batch can better solve the above problems.

The main optimization operation is to transform the tables of the ODS layer and the tables of the DWD layer to Iceberg, and at the same time transform the parsing and data processing into Flink tasks.

In order to ensure the stability of data production and the accuracy of data, we adopt the following measures:

  • START WITH NON-CORE DATA, AND DECIDE TO USE QOS DELIVERY AND CUSTOM DELIVERY AS PILOTS BASED ON THE ACTUAL BUSINESS SITUATION.
  • After abstracting the offline parsing logic, a unified Pingback parsing SDK is formed for real-time offline unified deployment to unify the code.
  • After the Iceberg table was deployed and started production, the dual-link parallel run was carried out for two months, and the data were routinely compared and monitored.
  • After confirming that there is no problem, the upper layer is switched imperceptibly;
  • The startup and playback data related to the core data will be transformed into stream-batch integration after the overall verification is stable.

After the retrofit, the benefits are as follows:

QoS and custom delivery data links are implemented in near real-time as a whole. Hour-level delayed data is updated up to five minutes.

EXCEPT IN SPECIAL CASES, THE INTEGRATED STREAM-BATCH LINK CAN MEET REAL-TIME REQUIREMENTS, AND THE EXISTING QOS AND CUSTOM RELATED REAL-TIME LINKS AND OFFLINE RESOLUTION LINKS CAN BE TAKEN OFFLINE TO SAVE RESOURCES.

5. Optimize the performance of the data lake

1. Performance optimization - intelligent merging of small files

From T+1 to minutes, iQIYI data lake application and upgrade practice

Many problems were encountered in the process of integrated data processing and implementation, the most typical of which was the problem of small files.

Unlike Hive's storage method, Iceberg is not flexible enough to control the size of each file when partitioning data, which makes it difficult to solve the problem of small files.

Iceberg supports row-level updates, which makes each update generate a separate Iceberg file. In the case of frequent updates and large table sizes, a large number of tiny files are generated. As the number of these files increases, the performance of HDFS suffers and may even cause the Name Node to become overloaded.

Query performance degrades as the number of files that need to be accessed increases. A large number of tiny files are severely slowing down queries. Therefore, we need to implement an effective optimization strategy to merge these small files.

1) Timed merging

In addition to controlling write parameters, we also merge small files on a regular basis. However, choosing the right time for a merger is not an easy task. For example, we might choose to merge small partitions every three hours, which solves most of the business problems to some extent.

However, in some scenarios, such as when collecting hardware information and updating orders, the overall amount of data is huge and the frequency of data updates is very high.

In this case, the number of small files has accumulated very high after three hours. For other businesses, a three-hour merge is effective, but for these specific businesses, the number of files is already high after three hours.

As a result, query performance improves for a short period of time after the merge, but gradually decreases as the number of files increases. When the time threshold for scheduled merging is approaching, it may not be possible to query instantaneously.

1) Smart Merge

To solve this problem more effectively, we refer to an article from Netflix and have developed a smart merge scheme. This scheme automatically selects the partitions to be merged based on the mean square deviation of the file size under the partition.

For different lines of business, we can set different weight thresholds to achieve intelligent merging strategies. When the number of partition files is reached, this scheme can directly trigger file merging. This intelligent merging strategy can handle the needs of different lines of business at the same time, merge files in a timely manner, and avoid the expiration of the set time threshold due to the instantaneous surge in the amount of business updates.

2.性能优化——BloomFilter

From T+1 to minutes, iQIYI data lake application and upgrade practice

Compared with Impala and Kudu, Iceberg has poor performance. After in-depth analysis, it was found that Impala and Kudu use indexes to speed up queries. However, Iceberg has limited support for indexes, and queries tend to be full table scans. Since Parquet 1.12, Parquet has supported BloomFilter. In order to improve query performance, we have modified the Iceberg source code and activated support for BloomFilter.

After optimization, the query speed has been significantly improved, the order ID query time has been greatly reduced from more than 900 seconds to 10 seconds, and the overall performance is now close to the architecture of Impala + Kudu. Although it takes up more storage space, it is still a good value for money considering its excellent overall performance.

6. Follow-up plan

From T+1 to minutes, iQIYI data lake application and upgrade practice

For the follow-up planning of the data lake application in the data middle platform, there are two main aspects:

  • From the architecture level, we will continue to refine the development of each module to make the data provided by the data center more comprehensive and easier to use, so that different users can use it by themselves.
  • From the technical level, we will continue to carry out the integrated transformation of data links and batches, and continue to introduce appropriate data lake technology to improve the timeliness of data and reduce production costs.

Q&A

Q1:流量数据入湖场景下,使用MOR(Merge on Read)表,还是COW(Copy on Write)表更合适?

A1: We mainly merge and MOR when we read, so I basically didn't mention Copy on Write just now.

Q2: How do I ensure that the data definitions and business rules in the data lake are consistent? How do I check and clean data?

A2: It is supported by the architecture of the data middle platform, mainly through delivery management, meta-data center, and unified indicator platform to stipulate data definitions and business rules.

In terms of checking and cleaning data, the quality of inspection data is guaranteed by the quality platform; To clean up data, you can use the resource center to audit the overall data resources and clean up expired data.

Q3:Iceberg查询是标准SQL吗?

A3: Yes, you can use standard SQL and self-developed query engines to encapsulate Spark SQL, Hive, and Trino.