laitimes

Homework help is based on the design and optimization practice of the StarRocks portrait system

Author | Homework helps big data teams

Planning | Liu Yan

Background

In order to improve the learning efficiency of children, homework help uses algorithms, rules and other technical means to establish user portraits through various behavioral data such as search questions, answers, counseling and other behavioral data, as well as result data such as tutoring effects, which are used for differentiated tutoring to improve learning efficiency. Based on the characteristics of portrait labels and combined with StarRocks, we have built a set of portrait circle system that is relatively suitable for all scenarios. This article mainly introduces the system design of this portrait service, tag access, and the optimization method of insider performance.

Label features

Homework help is based on the design and optimization practice of the StarRocks portrait system

Note: The symbolic variable is determined when the crowd is created.

Scenario design thinking

In order to ensure that the system supports flexible and scalable business requirements, the architecture is reasonable, the system is stable after implementation, and the performance meets expectations, sort out relevant issues and think before design.

If all the above label types are satisfied, the conventional wide table and label bitmap design cannot meet the requirements. It is necessary to cross the behavior class data with modifiers and the regular label, and often the two types of data are stored in different tables or data structures, while supporting second-level queries to use the regular join can not be satisfied, the most reasonable way is still to use the cross-link ability of the bitmap, for different rule groups to form a bitmap, and then the results are crossed. Using the bitmap structure, the user-unique identification string cuid must be converted into a numeric type guid.

How to convert user unique identities into numeric globally unique auto-incrementing GUIDs, and use the same set of mappings for live and offline labels. Offline timeliness is not enough, so you must use the real-time scheme to form a mapping relationship, and then synchronize to the offline hive to supplement the offline tag, the mapping must cover both the real-time and offline tags of all user IDs.

There will be more and more labels, and each tag basically needs to go through a series of operations such as production calculation, supplementary GUID, data verification alarm, write storage, atomic switching on the line, etc., and at the same time need to control the access cost and post-maintenance cost of the new tag. To this end, it is necessary to decouple the label production part and the label access part, abstract the access process, implement it in accordance with the specified specifications, and achieve label configuration access as much as possible, unified management, and support long-term platform construction compatibility. Label production can also be landed in parallel with multiple people according to the business direction.

Performance assurance requires the use of real data for relevant testing, and to ensure that each link of the design can improve the relevant processing power linearly according to the expansion of resources. For example, data inbound, insider query, real-time cuid->guid mapping, etc.

In terms of stability assurance, it is necessary to configure relevant monitoring alarms for key links, set up plans and do fault drills.

Overall scheme design

Overview of the program

Homework help is based on the design and optimization practice of the StarRocks portrait system

It is probably composed of three parts: portrait service, real-time tag access, and offline tag access.

(1) The portrait service is mainly responsible for label configuration management, label enumeration value interpretation mapping, crowd circle selection crowd package management, other functional system docking, label data access configuration management and rapid rollback capabilities.

(2) Real-time tag access is mainly responsible for three parts: tag access specification, cuid->guid mapping and backup, and real-time tag storage. With abstract tools, tasks can be configured.

(3) Offline tag access is mainly responsible for tag access specifications and configured access (tag data assembly, cuid->guid mapping, verification, monitoring, warehousing, etc.).

As a full-scenario MPP database, StarRocks supports a variety of table models, second-level real-time analysis, concurrent queries and other capabilities, and at the same time has a bitmap storage structure and supporting UDF functions, which reduces the engineering complexity of bitmap storage, crossover, management, etc., so we finally chose StarRocks as the storage of tags.

According to the requirements of the scenario, performance, flexibility and other factors, the label information is abstracted into the following types for storage. Each classification will correspond to a query template to solve problems in different business scenarios. Due to factors such as read and write performance, tag update timeliness, and idempotent access, multiple StarRocks table models are supported for the same type, and the same tag can also be stored in tables of different service types.

Homework help is based on the design and optimization practice of the StarRocks portrait system

Portrait services

There are two core competencies of portrait service. The first crowd circle selection ability, characterized by the internal system qps is not high, the second level returns. The second single-user id rule judgment capability, characterized by high qps, returns at the 10 millisecond level. The second is not within the scope of this system design, only says the crowd selection part, the general implementation process is as follows:

Request DSL parameter parsing and validation: Split the crowd selection DSL into multiple independent expressions and combination relationships by label, and then supplement the implicit conditions according to the label configuration information, and verify the rationality of each expression.

Query logic optimization: Combine expressions when tags are stored with the same table, reduce the amount of data returned by a single expression, and accelerate query speed.

Expression to SQL: According to the query template corresponding to the abstract type, the optimized and merged expression is converted into multiple subqueries, and then the combined relationship is combined to form the entire SQL.

Perform SQL circled people.

Create table statements and query templates

Homework help is based on the design and optimization practice of the StarRocks portrait system

Performance testing

(1) Profile + Agg test

The live scene does not use PK mainly because REPLACE_IF_NOT_NULL and local column updates are not supported, and this ability is required for inbound decoupling between labels. The performance tests are as follows:

Homework help is based on the design and optimization practice of the StarRocks portrait system

Conclusion 1: Test 1/2 shows that the query time-consuming point is fragment 1 stage Scan operation including Merge-on-Read process [OLAP_SCAN_NODE], to_bitmap [PROJECT_NODE], bitmap_union [AGGREGATION_NODE], and fragment 0 stage is very small due to the small amount of data.

Conclusion 2: Test 2/3 vs. Consider optimizing Scan time. After increasing the number of buckets, the Scan time consumption decreases significantly. An increase in the number of tablets causes an increase in scan parallelism. doris_scanner_thread_pool_thread_num default 48, the number of tablets before and after the adjustment is 5->25 are in this range, in addition to the profile information, you can also view the corresponding time Scan related monitoring through the Manager. You can increase the number of threads according to the cluster load to improve query speed.

Conclusion 3: Test 3/5 to consider optimizing bitmap_union time-consuming and take into account write load balancing. Partitioned with Range guid, 5kw is a step, and bucket is set to 5. Each tablet has about 1kw of data and a difference of less than 5kw, avoiding the hot issue of single-partition write hotspot caused by high activity of some guids. The same 5160W+ data volume bitmap_union the time consumption is reduced by about 700ms.

Conclusion 4: Test 3/4 compared with the query time-consuming performance after considering the where condition, because the amount of returned data is reduced by an order of magnitude bitmap_union (to_bitmap (guid)) The time consuming is significantly reduced, and the performance bottleneck is mainly manifested in the Scan stage. Because the grade column is scanned more after the where condition is increased, the increase in time consumption mainly consumes the data scanning and merge process in this column, and there is no better optimization method.

(2) Fact + Dup test

The real-time scenario Fact + Agg/Uniq and Profile + Agg are similar, and the relevant optimizations can be combined with the above conclusions. The test data for the Offline Scenario Fact + Dup model is as follows:

Homework help is based on the design and optimization practice of the StarRocks portrait system

Conclusion 1: Test 1/2 can see that the query time consumption point is:

Scan procedure [OLAP_SCAN_NODE].

The two-stage group by guid [the first AGGREGATION_NODE of Fragment2 AGGREGATION_NODE and Fragment1]. Group by time consuming mainly for HashTable build time with count(1) result updates, which essentially depends on the number of data strips returned by scan and the Size of HashTableSize.

to_bitmap [fragment1's first PROJECT_NODE] and bitmap_union [fragment1's second AGGREGATION_NODE] operator, the overall optimization idea is shown in the above test conclusion. Conclusion 2: Test 2/3 analyzes whether or not the bitmap index is added, the query has a certain degree of push down to the storage layer [simd filter], the bitmap index is increased but not applied, because the discrimination is too low to go the bitmap index [filter criteria enumeration number / total number of data bars].

Conclusion 3: [Speculation not tested] For testing 1 DUPLICATE KEY(guid), DISTRIBUTED BY HASH(guid), if you do not use GUIDs as a sort and bin to distribute the data evenly, then because each node has all the GUIDs, the HashTableSize is basically 5 times that of the current nodes, which in turn affects the query time will be longer.

Conclusion 4: Test 4 Analyze fragment 1/2 The actual parallelism calculation formula is as follows. Appropriately increasing the number of tablets [partition, bucket] and exec instance num can speed up queries. This acceleration process acts on all time-consuming points in Conclusion 1.

The number of tablets is taken when the number of tablets [without replicas] is less than parallel_fragment_exec_instance_num * number of BE

When the number of tablets [without replicas] is greater than the number of parallel_fragment_exec_instance_num * BE, the number of exec_instance_num * BE is taken

(3) kv + Agg test

This section is mainly used to store user collections with fewer tag enumeration values, so the amount of data is not much, and it is returned within basic 1s.

Based on the query template, the main possible performance bottleneck points when the amount of data is large are:

Scan procedure [OLAP_SCAN_NODE]: Bitmap object deserialization and SegmentRead procedure. Enable_bitmap_union_disk_format_with_set optimizations can be considered.

bitmap_union operator, if you adjust the distribution of bitmap elements according to the above optimization scheme, you will need to add more rows to the table, and the performance of the data may not be good. You need to choose balance after testing the data.

(4) Supplementary explanation

Pits encountered :

Query bitmap_or (to_bitmap (field A), to_bitmap (field B)), and calculate incorrectly when field A/B has null values. Resolved by ifnull(to_bitmap (field name), bitmap_empty()).

In the case of Uniq model multi-copy excluding external interference, 5be nodes, no partitions, 5 buckets, 2 replicas, data distribution, and normal tablet status. When querying, 4 Be nodes work, one of which scans 2 tablets, and the uneven distribution of tasks received by bees causes the overall time consumption to increase. Feedback from StarRocks students.

It is not reasonable to add the where condition to be more time-consuming than a full scan Scan. See profile type performance test conclusion 4 and fact type performance test conclusion 1 for related tests. It should be possible to filter the where part of the data through simd, so that the amount of data in the merge process is reduced, which reduces the query time. Feedback from StarRocks students.

Testing to exclude inaccuracies caused by uneven scheduling of the be task is performed in a single copy.

The optimization idea is mainly based on the understanding of StarRocks and other OLAP technologies, guessing the implementation process and thinking about the optimization method, combined with specific tests and viewing explain, profile, manager monitoring to verify the effect of iterative understanding to achieve optimization effect.

Real-time tag access

Real-time tag access is roughly divided into one specification and three types of Flink tool tasks. Specification refers to the specification of a specified Kafka Topic written after a live label calculation. The three categories of Flink tool tasks refer to 1. Cuid-> guid mapping process. 2. Data distribution according to label type. 3. Each label data is written independently to the StarRocks table. Note that the whole process is done in the order of cuid doing kafka partition partitions.

Access specifications

The tag calculation class task uniformly outputs the tag results in the following format, writes to the specified kafka topic, and partitions according to cuid.

{"header":{"type":"", "cuid":"cuid"}, "body":{"xxx":"xxx",...}} The type table is a label type and is globally unique. sys_offline_cuid. sys_cguid_mapping reserves a complement for the type word and outputs the newly mapped data.

body is the result data of the tag, and the access process does not do additional processing.

Mapping process

The mapping process logic is very simple to get the global auto-incrementing numeric GUID and cuid to form a one-to-one mapping relationship. This process generally exists in the following steps 1. Check task LRU out-of-heap memory 2. Memory does not exist to check codis 3.codis does not exist by taking the new number through the issuer 4. Cache mapping information layer by layer.

This process stability is the key to the entire system, combined with the existing numberer and codis capabilities of the job as the main reference for selection. Use the numberer to generate a globally unique auto-incrementing numeric id guid, and use codis to store the cuid and guid relationship. To ensure a one-to-one mapping relationship, the mapping process is designed as an flink task. Think about it as follows:

Business Reality:

Cuid totals 1.4 billion, with a daily increase of 20W per hour 30+ per second during the peak of the daily million. Full real-time label data up to 10W qps

Theoretical Resource Estimation:

Numberer: 3W qps is supported by default, and it takes 13 hours for the first data to initialize, after which a maximum of 30+ qps can meet the needs without additional resources.

CODIS: 1.4 billion mapping data stores about 200G [buffer part not considered], and 12 pods can support about 50W qps per pod with 16G memory.

flink tasks:

qps depends on the amount of label data written by upstream kafka about 10W qps.

Calculate the value A obtained by dividing the total memory footprint of the active cuid mapping in the past N months by each task 500M to 1G of memory outside the heap, and divide the upstream kafka data by 10W qps of qps that can be processed by a single task when determining the memory hit ratio to obtain the value B, and then calculate the flink parallelism max(A, B) + to give a certain buffer decision to the expected development of the business.

The upstream kafka topic needs to be partitioned according to cuid and the number of partitions should preferably be more than 3 times the degree of flink parallelism [depending on the amount of label data added in the future].

After the task restart, the maximum qps generated on codis is less than 10W, if the flink task LRU cache is enough for the usual codis qps to be up to 30+, the current codis resource configuration has met the requirements.

The task itself only focuses on cuid, and the data other than cuid can not be parsed.

Potential Risk Considerations:

Data delay: Because the usage scenario is more used for reach, a certain degree of delay is acceptable, and a large delay triggers an alarm to pause the reach.

Cuid dirty data, when the guid exceeds integer.MAX_VALUE after the StarRocks bitmap query performance degrades. Add cuid strict verification logic, set the daily cuid incremental monitoring according to the actual situation of the business, after the manual inspection, if the cuid dirty data is not much, it can not be processed, because the error cuid will not receive the touch information. If there is a lot of cuid dirty data, you need to reset the transmitter position and restore to a certain point in time data and then re-brush all the labels and crowd package data.

Codis+ hairscaler replaced by mysql primary key auto-increment, this scheme has not been tested in the current scenario can meet the needs, the disadvantage is that the flink task will cause a relatively large impact on mysql after the restart [flink incremental checkpoint unmaintained storage so it is not used for the time being], do a good job of mysql qps throttling will cause a period of data delay. The advantage is that the task implementation is simplified while avoiding some special cases that cause data errors caused by the same cuid being assigned multiple GUIDs.

Distribution process

The mapping data is distributed to a separate kafka topic based on the tag type, which facilitates table-level control when writing to StarRocks.

Into the StarRocks process

Use the flink-starrocks-connector to write label data to StarRocks. Pay attention to the configuration of parameters such as write frequency, number of data rows, and data size.

Cuid offline supplemental mapping

The real-time active tag stream data is connected, and the cuid that is not covered in real time is supplemented in hourly increments to prevent omissions and the first initialization data.

Offline tag access

When the calculation of the conventional label data is completed, it can be uniformly written to the specified high table [see below for the table creation statement], and the high table can be used as the medium to decouple the label development and access. Label data with decoration and behavior can be automatically accessed directly using the basic warehouse table and label source data information.

Offline access is roughly divided into two types of data sources, high-table access and digital warehouse behavior data access.

High-meter access

After the tag is calculated, it is written to the high table [sorted by cuid], and the tagkv is a map structure, where key is the tag name.

In the high table, if the incremental data data is connected to the incremental logic, if the full label is fully accessed.

hive 建表 sqlcreate table picasso_all(

cuid string comment 'unique id under the same user unique identification system',

tagkv Map comment 'combining tag kv data'

)

partitioned by (dt string, tagk string)

stored as parquet

Data access for digital warehouse behavior:

Can only be applied to a single table and must contain cuid

Access steps

Task entry: Obtain the name of the target table to be imported through the profile service interface, and then create a parallel access task through the scheduling system API, and the following is the execution logic for each task.

Status check: According to the target table name, obtain the data source information, hive field mapping and other information corresponding to the tag of this table [currently only support hive data source] to check the status of dependent data.

Data validation: Use metadata configuration rules as the standard to verify label data, such as label enumeration value reasonableness, numeric label value range, null value rate, etc.

Data assembly: According to different business scenarios, use insert overwrite directory select to assemble data [scene matching SQL templates, supplemental GUIDs, etc.] and write to cos/hdfs and other storage.

Data import: Create tables/partitions and use starRocks Broker Load to import data.

Atomic switching: Call the profile service interface, complete the table related field verification, exchange temporary partitions / tables with online data, archive temporary partitions / tables for rollback

Restore Status: Deletes the temporary files that result from this process.

Data assembly

Homework help is based on the design and optimization practice of the StarRocks portrait system

Future planning

Label content also needs to be iterated continuously, and this part is mainly driven by business requirements.

Single-user rule judgment ability support, which is used to solve the problem of participating in qualification judgments such as certain activities and rights.

Multi-table redundancy of label data, DSL supports automated routing queries based on crowd selection to speed up the calculation of crowd counts.

Real-time and offline tag access is currently achieved through generalized tools, which can be considered and further opened up with the scheduling system and data map system to achieve label production and access platformization.

Label accuracy is the core, in order to ensure accuracy, it is also necessary to enrich the data verification part of the label access process, and support more data verification methods such as distribution year-to-year.

About the Author:

Sun Jianye, who joined the operation gang in 2019, has been responsible for the construction of big data in multiple businesses.