laitimes

Introduce you to the MRS CDL architecture

Abstract:MRSCDL is a real-time data synchronization service launched by FusionInsight MRS, which aims to capture and push event information in traditional OLTP databases to big data products in real time, this document will introduce the overall architecture and key technologies of CDL in detail.

This article is shared from the HUAWEI CLOUD community "MRS CDL Architecture Design and Implementation" by rujia01.

1. Introduction

MRS CDL is a real-time data synchronization service launched by FusionInsight MRS, which aims to capture and push event information in traditional OLTP databases to big data products in real time, this document will introduce the overall architecture of CDL and key technologies in detail.

2. The concept of CDL

MRS CDL (Change Data Loader) is a Kafka Connect-based CDC data synchronization service that can capture data from a variety of OLTP data sources, such as Oracle, MySQL, PostgreSQL, etc., and then transfer it to the target storage, which can be a big data store such as HDFS, OBS, or a real-time data lake Hudi.

2.1 What is CDC?

CDC (Change DataCapture) is a design model for further processing of changed data by monitoring data changes (new, modified, deleted, etc.), usually applied to data warehouses and some applications closely related to databases, such as data synchronization, backup, auditing, ETL, etc.

CDC technology has been around for some years, and more than two decades ago, CDC technology has been used to capture changes in application data. CDC technology synchronizes messages into the corresponding serial silos in a timely and efficient manner with virtually no impact on current production applications. Nowadays, the application of big data is becoming more and more common, the ancient technology of CDC has been revitalized, and docking big data scenarios has become the new mission of CDC technology.

There are already many mature CDC to big data products in the industry, such as: OracleGoldenGate (for Kafka), Ali/Canal, Linkedin/Databus, Debezium/Debezium and so on.

2.2 SCENARIOS SUPPORTED BY CDL

MRS CDL draws on the successful experience of the above mature products, using Oracle LogMinner and the open source Debezium for CDC event capture, and deploying tasks with the high concurrency, high throughput, and high reliability frameworks of Kafka and Kafka Connect.

When existing CDC products dock with big data scenarios, they basically choose to synchronize data to Message Queuing Kafka. On this basis, MRS CDL further provides the ability of data directly into the lake, and can directly dock WITH MRS HDFS and Huawei OBS as well as MRS Hudi, ClickHouse, etc., to solve the last mile problem of data.

Introduce you to the MRS CDL architecture

Table 1 SCENARIOS SUPPORTED BY MRS CDL

3. The architecture of CDL

As a CDC system, the ability to extract data from the source target and transfer to the target storage is a basic ability, on this basis, flexibility, high performance, high reliability, scalability, reentrant, security is the focus of MRS CDL, therefore, the core design principles of CDL are as follows:

  • The system architecture must meet the principles of scalability, allowing for the addition of new source and destination data stores without compromising existing system functionality.
  • The architecture should be designed to meet the separation of business priorities between different roles
  • Minimize complexity and dependencies where appropriate, and minimize risks to architecture, security, and resiliency.
  • Plug-in customer needs needs need to be met, providing common plug-in capabilities that make the system flexible, easy to use, and configurable.
  • Business security to avoid horizontal overstepping and information leakage.

3.1 Architecture Diagram/Character Description

Introduce you to the MRS CDL architecture

Figure 1 CDL architecture

MRS CDL contains two roles, CDL Service and CDL Connector, whose respective functions are as follows:

  • CDL Service: Responsible for the management and scheduling of tasks, providing a unified API interface, and monitoring the health status of the entire CDL service.
  • CDL Connector: Essentially a Worker process of Kafka Connect, responsible for the operation of the real Task, adding a heartbeat mechanism to the highly reliable, highly available, and scalable features of Kafka Connect to assist the CDL Service in monitoring the health of the cluster.

3.2 Why Kafka?

We compared Apache Kafka to various other options such as Flume and Nifi, as shown in the following table:

Introduce you to the MRS CDL architecture

Table 1 Frame comparison

For cdc systems, Kafka had enough advantages to support us in making our choices. At the same time, Kafka Connect's architecture fits perfectly into the CDC system:

  • Parallel - For a data replication task, throughput can be increased by disassembling into multiple subtasks and running them in parallel.
  • Sequencing - Kafka's partition mechanism ensures that data is strictly ordered within a partition, which helps us achieve data integrity.
  • Scalable - Kafka Connect runs connectors distributed in a cluster.
  • Ease of use - Kafka's interfaces are abstracted to improve ease of use.
  • Balancing - Kafka Connect automatically detects failures and rebalances the remaining processes based on their respective loads.
  • Lifecycle Management – Provides the lifecycle management capabilities of a complete Connector.

4. MRS CDL key technology

Introduce you to the MRS CDL architecture

Figure 2 CDL Key Technologies

4.1 CDL Job

MRS CDL provides an upper-level abstraction of the business, defining a complete business process by introducing the concept of CDL Job. In a Job, the user can select the data source and destination storage type, and can filter the tables of data to replicate.

On the basis of the Job structure, MRS CDL provides a mechanism to execute CDL Job, at runtime, using the Kafka Connect Source Connector combined with log replication technology to capture CDC events from the source data store to Kafka, and then using the Kafka Connect Sink Connector to extract data from Kafka, and pushing the final result to the destination storage after applying various transformation rules.

Provides a mechanism for defining table-level and column-level mapping transformations, and you can specify conversion rules during the process of defining a CDL Job.

4.2 DataComparison

MRS CDL provides a special job for data consistency comparisons. Users can select source and destination data store schemas, and select a variety of comparison pairs from the source and destination schemas for data comparison to ensure that the data is consistent across the source and destination data stores.

Introduce you to the MRS CDL architecture

Figure 3 The Data Comparison abstract view

MRS CDL provides a dedicated Rest API to run Data Compare Job and provides the following capabilities:

  • Provide a variety of data comparison algorithms, such as row hash algorithm, non-primary key column comparison, etc.
  • Provides a dedicated query interface that can query the synchronization report and display the execution details of the current Compare task.
  • Provides real-time source and destination storage-based repair scripts to repair out-of-sync data with one click.

The following is the Data Compare Job execution process:

Introduce you to the MRS CDL architecture

Figure 4 The Data Compare Job execution and viewing process

4.3 SourceConnectors

MRS CDL creates various source connectors through the Kafka Connect SDK that capture CDC events from various data sources and push them to Kafka. CDL provides specialized Rest APIs to manage the lifecycle of these data source connectors.

4.3.1 OracleSource Connector

Oracle SourceConnector uses the Log Miner interface provided by Oracle RDBMS to capture DDL and DML events from Oracle databases.

Introduce you to the MRS CDL architecture

Figure 5 Log Miner grabs a schematic of data

When handling DML events, cdl can also provide support if a BOLB/CLOB column exists in the table. For the processing of BOLB columns, the key points are processed as follows:

  • When an insert/update operation occurs, a series of LOB_WRITE operations are triggered.
  • LOB_WRITE is used to load files into blob fields.
  • Each LOB_WRITE can only write 1KB of data.
  • For a 1GB picture file, we organize all 1 million LOB_WRITE binary data from the operation and merge them into a single object. We will store this object in Huawei OBS and eventually give the location of the object in OBS in the message written to Kafka.

For the capture of DDL events, we create separate sessions to keep track of them. The currently supported DDL statements are as follows:

Introduce you to the MRS CDL architecture

The DDL statements supported in Table 2

4.3.2 MYSQLSource Connector

MYSQL's Binary Log (Bin Log) file sequence records all operations committed to the database, including changes to the table structure and changes to the table data. MySQL Source Connector produces CDC events by reading bin Log files and committing them to Kafka's topic.

The main supported functional scenarios of MYSQL SourceConnector are:

  • DmL events are captured and supports parallel processing of captured DML events to improve overall performance
  • Supports table filtering
  • Supports mapping of configuration tables and topics
  • In order to ensure the absolute order of CDC events, we generally require a table to correspond to only one partition, but the MYSQL Source Connector still provides the ability to write multiple partitions to meet some scenarios that require sacrificing message sequencing to improve performance
  • Provides the ability to restart a task based on a specified Bin Log file, a specified location, or a GTID to ensure that no data is lost in abnormal scenarios
  • Supports a wide range of complex data types
  • Support for capturing DDL events

4.3.3 PostgreSQLSource Connector

PostgreSQL's logical decoding feature allows us to parse change events submitted to the transaction log, which requires the output plugin to handle these changes. The PostgreSQLSource Connector uses the pgoutput plugin to do this. The pgoutput plugin is a standard logic decoding plugin provided by PostgreSQL 10+ without the need to install additional dependency packages.

PostgreSQL SourceConnector and MYSQL Source Connector have basically the same functionality except for some data types.

4.4 SinkConnectors

MRS provides a variety of Sink Connectors that can pull data from Kafka and push it to different target storage. The sink connectors that are now supported are:

  • HDFS Sink Connector
  • OBS Sink Connector
  • Hudi Sink Connector
  • ClickHouse Sink Connector
  • Hive Sink Connector

The Hudi Sink Connector and ClickHouse SinkConnector also support scheduling runs through Flink/Spark applications.

4.5 Table Filtering

When we want to capture changes to multiple tables at the same time in a CDL Job, we can use wildcard characters (regular expressions) instead of table names, i.e. CDC events that allow simultaneous capture of tables whose names satisfy the rules. When wildcard characters (regular expressions) do not strictly match the target, redundant tables are captured. To this end, cdl provides table filtering to assist in wildcard fuzzy matching scenarios. Currently, CDL supports both whitelist and blacklist filtering methods.

4.6 Uniform Data Format

MRS CDL for different data source types such as Oracle, MYSQL, PostgreSQL using a unified message format stored in Kafka, back-end consumers only need to parse a data format for subsequent data processing and transmission, avoiding the problem of data formats leading to increased back-end development costs.

4.7 Task-level log browsing

Under normal circumstances, a CDL Connector will run multiple Task threads to crawl CDC events, and when one of the Tasks fails, it is difficult to extract strongly correlated log information from the massive log for further analysis.

In order to solve the above problem, CDL standardizes the log printing of the CDL Connector and provides a dedicated REST API through which users can obtain the log files of the specified Connector or Task with one click. You can even specify a start-end time to further narrow the scope of log queries.

4.8 Monitoring

MRS CDL provides a REST API to query metric information for all core components of a CDL service, including service-level, role-level, instance-level, and task-level.

4.9 Application Error Handling

In the process of business operation, there are often some messages that cannot be sent to the target data source, which we call error logging. In CDL, there are many scenarios in which error records occur, such as:

  • The message body in the topic does not match a particular serialization pattern, resulting in improper readability
  • The table name stored in the message does not exist in the destination store, causing the message to fail to send to the target side

To deal with this problem, the CDL defines a "dead letter queue" specifically for storing error records that occur during operation. Essentially, a "dead letter queue" is a specific topic created by the Sink Connector that, when an error is logged, is sent by the Sink Connector to the "dead letterqueue" for storage.

At the same time, CDL provides a REST API for users to query these error records for further analysis at any time, and provides a Rest API that allows users to edit and resend these error records.

Introduce you to the MRS CDL architecture

图 6 CDL Application Error Handling

5. Performance

CDL uses several performance optimization schemes to improve throughput:

  • Task concurrently

We take advantage of the task parallelization feature provided by Kafka Connect, where Connect can split a job into multiple tasks to copy data in parallel, as follows:

Introduce you to the MRS CDL architecture

Figure 7 Task Concurrency

  • Use executor threads to parallelize the execution of tasks

Due to the limitations of data replication technologies such as Log Miner and Bin Log, our Source Connector can only capture CDC events sequentially, so to improve performance, we cache these CDC events to a memory queue first, and then use executor threads to process them in parallel. These threads read data from the internal queue, then process and push it into Kafka.

Introduce you to the MRS CDL architecture

Figure 8 Executor thread concurrency

6. Summary

MRS CDL is an important piece of the puzzle in the real-time data into the lake scenario, we still need to further expand and improve the data consistency, ease of use, multi-component docking and performance improvement scenarios, in the future to better create value for customers.

Read on