laitimes

Elasticsearch in Didi

author:Flash Gene

滴滴 Elasticsearch 简介

Brief introduction

Elasticsearch is an open-source, distributed, RESTful full-text search engine built on Lucene, with every field indexable and the ability to scale out to hundreds of servers to store and process terabytes of data, so it can store, search, and analyze large amounts of data in a fraction of the time.

Since its development, DiDi ES has undertaken most of the company's on-end text retrieval, a small number of log scenarios and vector retrieval scenarios, including map POI retrieval, order retrieval, customer service, internal search and pulse log ELK scenarios. DiDi ES in 2020 by 2. X has been upgraded to 7.6.0, and in recent years, it has continued to explore and improve in the directions of maintaining stability, controlling costs, improving efficiency, and optimizing the ecosystem.

Architecture

Elasticsearch in Didi

As shown in the figure above, the current ES service is deployed based on physical machines, Gateway and management and control are based on container deployment, and we have also investigated ES on K8S, because our ES business scenarios are mostly text retrieval on the online side, considering the stability, so we finally decided to use the physical machine deployment mode.

管控层主要负责实现以下功能:智能 segment Merge(防止 segment 膨胀导致 datanode Full GC),索引生命周期管理,索引预创建(避免每天凌晨索引集中创建,导致 Master/Clientnode OOM),租户管控等。

In addition to read, write and forwarding, the gateway layer also has query optimization capabilities (for example, rewriting BKD queries into numeric equivalent queries or range queries), three-level throttling (including AppID, index template level, and query DSL level), tenant authentication function, and SQL capabilities (based on NLPChina's open-source ES SQL capabilities and encapsulated to Gateway). Our search service only exposes the Gateway interface externally.

User console

Elasticsearch in Didi

The user console is a platform that we provide to the business side to access the product, and the main functions are:

  • Application management: allows the business side to obtain the permission to read and write indexes by applying for an AppID.
  • Index management: You can create indexes, apply for read and write permissions, create and change index mapping, and clean up and deactivate indexes.

Retrieval queries offer a variety of query methods, including Kibana, DSL, and SQL, to meet different query needs.

At the same time, the business side can see the corresponding exception analysis and slow query analysis in the query and analysis module, which is convenient for query optimization. In terms of monitoring, the business side can view the index meta information (such as the number and size of documents) and the read and write rate to monitor the running status of the system.

O&M management and control platform

Elasticsearch in Didi

The O&M management and control platform mainly meets the daily O&M requirements of RD and SRE, and its main functions include the following aspects:

  • Cluster control: displays the cluster information, exposing the logical cluster to the user, a logical cluster can contain multiple physical clusters, for example, the cluster information seen by the user is an important public cluster, but the real physical cluster contains dozens of small public important clusters
  • Tenant management (tenant meta information and tenant-level throttling)
  • Template management: Metadata management of index templates, non-updated indexes can adjust the number of shards by upgrading the version, and you can also limit the flow of index templates here
  • Anomaly analysis: template analysis, slow query analysis, and anomaly analysis
  • Operation Records: User behavior and control scheduled task records

Elasticsearch in Didi

Business scenarios

  • Online full-text search services, such as map POI start and end point search
  • MySQL real-time data snapshots, online services such as order queries
  • All-in-one log retrieval service with Kibana queries, such as trace logs
  • Time series data analysis, such as security data monitoring
  • Simple OLAP scenarios, such as internal data dashboards
  • Vector retrieval, such as customer service RAG

Deployment mode

  • Physical machine + small cluster deployment mode, the maximum cluster machine size is about 100 physical machines

Access mode

  • You can create an index in the user console and select the corresponding cluster based on your business requirements
  • The gateway is an HTTP service, which is compatible with the gateway and the business side can read and write to the Gateway through the SDK provided by the official, and the Gateway will find the corresponding ES cluster according to the index

Data synchronization mode

There are two ways to synchronize data, one is to use the synchronization center (the company's data platform provides a unified synchronization service), and the other is to write in real time through Gateway, which supports real-time and offline data synchronization.

Real-time classes:

  • Logs -> ES
  • MQ(Kafka、Pulsar)-> EN
  • MySQL -> EN
Elasticsearch in Didi

Real-time class synchronization mode

As shown in the preceding figure, there are two ways to synchronize real-time classes: one is to collect logs and MySQL binlogs from MQ through the collection tool, and then write them to ES through Flink through the unified encapsulated DSINK tool. The other is MySQL full data, which is based on the open-source DataX for full data synchronization.

Offline Classes:

  • Hive -> ES
Elasticsearch in Didi

Offline Hive->ES class

The overall idea of Hive->ES is to speed up data import through Batch Load. The Lucene file is generated via MR and then written to ES via the encapsulated ES AppendLucene plugin. The overall process of Hive->ES is shown in the figure above:

  1. Use MR to generate a Lucene file
  2. Lucene 保存在 HDFS 里
  3. Pull the file to the DataNode
  4. Import into ES

Engine iteration

Refined hierarchical guarantee

Elasticsearch in Didi

The problem solved by the fine-grained hierarchical guarantee is to minimize the impact surface when the cluster fails, including the following strategies:

  • Cluster-level isolation: There are four guarantee levels (log cluster, public cluster, independent cluster, and active-active cluster), and when a service is accessed, the desired cluster is selected in the user console. If the wrong cluster is selected, we will use DCDR (described below) to help the business migrate to other clusters without affecting the business and without being aware of it.
  • Clientnode isolation: Clientnode read/write splitting, when the clientnode is abnormal, it can quickly locate the cause of the fault and reduce the impact area. If the cluster writes slowly and the amount of writes is too large, the clientnode may cause OOM, which will only affect the writes and will not affect the query, which can reduce the impact of the business.
  • Datanode Region Isolation: When an abnormal index occurs in the cluster (for example, the abnormal index causes the entire datanode to write too slowly), you can use the label to quickly migrate the abnormal index to the specified machine to avoid affecting other services on the cluster.

Multi-active construction

Didi Cross Datacenter Replication - Didi Cross Datacenter Replication, developed by DiDi or DCDR, is capable of natively replicating data from one Elasticsearch cluster to another. As shown in the following figure, DCDR works at the index template or index level, and adopts a master-slave index design model, in which the leader index actively pushes data to the follower index, thus ensuring the strong consistency of the master-slave index data.

Elasticsearch in Didi

We investigated the official ES CCR and found that its charging is based on the pull-based model, and the timeliness is poor, so our DCDR solution is:

  • Push-based 模型,实时写入基于 Request
  • 新增 CheckPoint 避免全量数据拷贝
  • 新增 Sequence Number 保障更新操作主从一致性
  • Introduce write queues to avoid OOM caused by large amounts of data replication

DCDR solves the problem of real-time data synchronization across clusters or data centers, and we implement active-active capabilities based on management and control.

For a detailed analysis of ES DCDR, please refer to Exploring ES High Availability: A Detailed Explanation of Didi's Self-developed Cross-Data Center Replication Technology

性能专项:JDK17 + ZGC

JDK11-G1 Yong GC has a long pause time on average and does not meet the requirements of P99, such as 180 ms for POI timeout, 500 ms for P99 and 400 ms for P99. In scenarios where a large amount of data is written, GCs are frequent, which exacerbates the problem of cluster write rejection and large write latency, which does not meet business requirements.

Based on the above background, we investigated JDK17-ZGC, and tested that ZGC can control the GC pause time within 10ms, which can well solve the query time problem caused by GC. At the same time, JDK17-G1 was tested for the high-throughput scenario of logs, and it was found that the GC performance was improved by 15% compared with JDK11-G1, and JDK17 made many optimizations in vectorization support and string processing, which can alleviate the write pressure of the log cluster to a certain extent. Therefore, we decided to upgrade the ES JDK version from 11 to 17, and upgrade some business GC algorithms from G1 to ZGC, and the main work is as follows:

  • Groovy syntax upgrade, Plugin refactoring
  • Fix the problem that the syntax format causes code compilation failure
  • Fix the bug that the ES source code triggers JVM compilation
  • Rely on JAR package upgrades, class replacements, class refactoring, and annotation optimization
  • Build a ZGC monitoring indicator system
Elasticsearch in Didi

ZGC monitoring indicator system

After the payment cluster was launched on ZGC, P99 was reduced from 800 ms to 30 ms, a decrease of 96%, and the average query time was reduced from 25 ms to 6 ms, a decrease of 75%. After the log cluster is upgraded to JDK17, the write performance is improved by 15~20%, and the problem of write queue accumulation and reject is solved.

For more information about the upgrade of ES JDK17, please refer to: Unlocking the Performance Potential of DiDi ES: The Road to JDK 17 and ZGC Upgrades

Cost optimization

Elasticsearch in Didi

Cost optimization mainly includes reducing machine costs and reducing user costs.

The core of reducing machine cost is to reduce the size of storage and reduce CPU usage, that is, reduce the number of machines; The core logic of reducing user costs is to reduce business usage, so the overall cost optimization strategy of ES is as follows:

  • Index Mapping is optimized, and some fields are prohibited from being inverted or forward
  • Added ZSTD compression algorithm, reducing CPU by 15%
  • Access to the big data asset management platform, sort out useless partitions and indexes, and assist in business offline

For more information about ES support ZSTD implementation, see How to make ES low-cost and high-performance? Practical sharing of Didi's ZSTD compression algorithm》

Multi-tenant resource isolation

JDK Native Thread Pool Model:

  • 主线程调用 execute、或者 submit 等方法提交任务给线程池。
  • If the number of running worker threads in the thread pool is less than corePoolSize, create a thread to run the task immediately.
  • If the number of running worker threads in the thread pool is greater than or equal to corePoolSize, the task is queued for execution at a later time.
  • If the queue is full and the number of running workers is less than maximumPoolSize, a non-core worker will be created to run the task immediately, and when the non-core worker threads are idle for more than a certain amount of time (keepAliveTime), they will be destroyed and reclaimed.
  • If the final submitted task exceeds maximumPoolSize (the maximum number of threads), then a rejection policy is enforced.

We borrowed the idea of Presto Resource Group isolation, and the strategy is to split the original search thread pool into multiple seach thread pools according to the configuration and form a thread pool group. Due to the different query QPS and importance levels of multi-tenants, you can configure the corresponding number of threads and queue size. Isolate query requests from different Appid users through the thread pool group pattern.

Elasticsearch in Didi

The core workflow is to obtain the AppID and submit the task to the corresponding subthread group for running according to the configured AppID isolation information.

Elasticsearch in Didi

At present, this optimization is mainly used in scenarios where an index data will be used by many business parties, such as order business, and the order business will be used by various business lines of the company, so there will be a lot of query appids, and we limit the thread pool size of the specified appid through multi-tenant resource isolation to avoid CPU filling up and core business damage due to a business suddenly sending a large number of read requests.

Data Security

Elasticsearch in Didi

There are two levels of authentication and authentication:

  • Gateway level: Authentication at the Appid level
  • ES 级别(Clientnode、Datanode、MasterNode),主要是做认证

The X-Pack plug-in of ES has the ability of security authentication, but it does not support rolling upgrade and restart of the cluster, cannot be quickly rolled back, and there is a stability risk of accidentally deleting the storage.

  • The architecture is simple and the logic is clear, and only a simple string validation is required in the HTTP request processing process, and there is no need to involve the verification of TCP communication within the node.
  • Supports rolling restart and upgrade of ES clusters
  • It supports the ability of one-key switch safety authentication, and can quickly stop losses

For more information about the ES security authentication solution, please refer to: Data Security Hardening: An In-depth Analysis of DiDi's ES Security Authentication Technical Solution

Stability governance

Online text retrieval has very high requirements for stability, so in the past three years, we have mainly done the following work on stability as shown in the figure below:

Elasticsearch in Didi

Stability governance is mainly to do three things well: before, during and after. Prevention is the main thing, the thing can be quickly located and stop loss during the event, and the review is paid attention to afterwards to avoid the recurrence of the problem.

  • Pre-prevention and continuous optimization: We will take stability as the most important thing every year, standardize first, and implement stability red lines, including scheme design, code specifications, online specifications, alarm handling, and fault management. In the past three years, we have solved a total of 61 potential stability problems, such as solving the Gateway Full GC problem (the gateway will immediately return to normal after the service is throttled), metadata governance, rewriting some locks to solve ThreadLocal leaks (ThreadLocal leaks will cause sudden CPU spikes in some nodes), etc., and at the same time landing fault stop loss SOPs
  • For the construction of the monitoring and alarm system, we have done basic monitoring (such as hard disks, CPUs, etc.), metric monitoring (such as the number of shards, the number of master pending tasks, etc.), and link monitoring (monitoring MQ lag to detect link delay problems in advance).
  • The construction of the indicator system, through grafana, the monitoring panel has been built, including template indicators, shard indicators, node indicators and cluster indicators, etc., which can help us quickly locate the cause of the failure, such as CPU burst problem, and we can quickly stop loss within 5 minutes
  • On the stop-loss side, we have done self-healing system (such as disk failures and query bursts), daily active-active stream switching drills, and read and write throttling, among which read throttling is done based on Appid, index template, and query DSL, and when there is a cluster CPU burst problem, the throttling scheme will also reduce the impact on the business as much as possible

Summary and outlook

In recent years, based on ES 7.6.0, we have continued to explore and improve the aspects of maintaining stability, controlling costs, improving efficiency, and optimizing the ecosystem. At present, our ES engine has been uniformly applied to all online retrieval scenarios within Didi, and has become a benchmark for the big data architecture department in terms of stability.

We also tried some innovative solutions, such as hot and cold data separation, off-line hybrid deployments, and the use of the Flink Checkpoint mechanism instead of Translog, but these were not adopted due to performance or stability considerations. However, as the technology continues to evolve, we will continue to explore and refine these options to meet the challenges that may arise in the future.

Currently, we are using ES version 7.6 and the latest version of the community has been updated to 8.13, with a version gap of about 4 years between the two. Therefore, our focus this year is to smoothly upgrade ES to version 8.13 to address the following issues:

  • The new version of ES Master delivers better performance
  • Ability to automatically balance disk usage based on load
  • Reduce the memory footprint of segments
  • New features such as ANNs for vector retrieval are supported

In terms of performance, we're optimizing write performance for update scenarios, as well as improving the Merge strategy during queries. In addition, we will continue to explore the machine learning capabilities of new versions of ES to better support our business.

Author: Du Ruofei

Source-WeChat public account: Didi Technology

Source: https://mp.weixin.qq.com/s/nIYbtjzraQ9s-Lni24x42g