laitimes

Tencent Advertising Products Middle Platform Process Orchestration Engine Architecture Implementation

author:Flash Gene

This article details how the product middle platform (PS: Tencent Advertising product middle platform is responsible for the management and maintenance of products in the whole industry, and the products are used in many application scenarios such as advertising) can realize the three-high processing of services in each business scenario through the self-built process orchestration engine, thereby improving the overall R&D efficiency and ensuring the stability of the system.

01

Scenarios for the use of the process orchestration engine in the middle office of the product

1.1 Scenario 1: Commodity processing in the commodity library

The commodity library manages nearly 4 billion commodities, with a daily processing volume of 80 million + commodities, providing capacity support for many businesses, and the processing process is managed and realized through the process orchestration engine, and the main processing capabilities include: category recognition, attribute recognition, brand recognition, commodity marking, commodity understanding, commodity review, picture transfer, video transfer, creative generation, etc.

1.2 Scenario 2: Product review governance

Commodity review governance does commodity review for various scenarios of commodity trading, including: public feature review, bottom-line style review, low-quality style review, infringement review, prohibited audit, etc. Nearly 100 audit unit combinations (orchestrated nodes), and at the same time, it is necessary to support the asynchronous callback of human review to continue the complex interaction of the review process.

02

Why use a process orchestration engine

In each of the business scenarios we implement, multi-service collaborative invocation is finally assembled into a complex business process, which is the main scenario faced by every developer. In the implementation of these scenarios, we will waste a lot of energy on service docking, call order and parallel processing, timeout retry, service avalanche processing mechanism, distributed consistency, and other issues. If we can quickly realize and solve the above problems through some means, we can greatly improve our R&D efficiency and reduce the maintenance cost of the system. We only need to focus on the development of various high-cohesive, low-coupling service nodes. That's where the orchestration engine comes in.

03

The process of structuring a process

The process of building a workflow in the console is very simple, and it only requires a simple configuration to implement a process orchestration.

There are two ways to build process orchestration, one is to visually drag and drop editing, and the other is to use workflow language to define orchestration logic.

3.1 Visual drag-and-drop editing

On the console, you can drag and drop various nodes (task nodes/process control nodes) to combine complex business processes through the visual interface. Currently, the task node types that can be orchestrated by the orchestration engine include SCF, Kafka producer, and http interface.

Taking business A as an example, the process orchestration includes two task nodes: image transfer and product review, and the business process is to review the image after the image is transferred first, and then drag and drop the nodes to form the figure shown in the following figure.

Tencent Advertising Products Middle Platform Process Orchestration Engine Architecture Implementation

3.2 Code Creation

Building a process orchestration with code requires a workflow language, which is based on a DSL language that describes and defines the business logic in the workflow. When executed, the ASW Workflow Service executes the relevant steps sequentially according to the workflow definition.

To illustrate how to use the workflow language to build a process orchestration, take the process orchestration of business A as an example, write the following code, which expresses the same meaning as the above visual node dragging.

{
  "Comment": "业务A",
  "StartAt": "Parallel",
  "States": {
    "Parallel": {
      "Type": "Parallel",
      "Next": "FinalState",
      "Branches": [
        {
          "StartAt": "ImageSave",
          "States": {
            "ImageSave": {
              "Type": "Task",
              "Comment": "图片转存",
              "Resource":"resource地址,支持http协议、kafka协议、Serverless协议",
              "Next": "NewAudit"
            },
            "NewAudit": {
              "Type": "Task",
              "Comment": "审核流程",
              "Resource": "resource地址,支持http协议、kafka协议、Serverless协议",
              "InputPath": "$.inputAswStr",
              "End": true
            }
          }
        }
      ]
    },
    "FinalState": {
      "Type": "Task",
      "Comment": "DAG结束节点",
      "Resource": "resource地址,支持http协议、kafka协议、Serverless协议",
      "End": true
    }
  }
}           

04

Architectural implementation of the process orchestration engine

Tencent Advertising Products Middle Platform Process Orchestration Engine Architecture Implementation

05

The three-high processing scheme of the process orchestration engine

5.1 High Availability

As the core component of each business scenario, the availability of the system is particularly important. However, due to the large differences in the stability of the services orchestrated by each business, this also puts forward high requirements for the exception handling of the process orchestration engine. In order to ensure high availability, the commodity warehouse process orchestration engine has done the following processing: load balancing strategy, abnormal circuit breaker policy, service isolation policy, interface retry mechanism, interface timeout processing, and interface rate limiting strategy to implement task allocation. Let's take a look at each of them.

5.1.1 Load Balancing Strategy

The scheduler service is responsible for assigning new execution tasks to different executors to execute DAG tasks, and the core capabilities here should achieve balanced task distribution to ensure the smooth operation of executors. Our common load balancing strategies include:

  • Round-robin: Dispatches requests to different servers in a cyclical manner, and the biggest feature of this algorithm is that it is simple to implement. The round-robin algorithm assumes that all servers have the same ability to process requests, and the scheduler distributes all requests equally to each real server.
  • Least-Connection: In this algorithm, the scheduler needs to record the number of established connections to each server, and estimate the server situation by the number of active connections on the server. When a request is scheduled to a server, the number of connections is increased by 1, and when the connection is interrupted or times out, the number of connections is reduced by 1. This algorithm allocates requests to the server with the fewest current connections to ensure that the load is relatively balanced, which is suitable for long-term connections.
  • Hash: Uses certain characteristic data in the request (such as IP, MAC, or some information from higher-level applications) as the feature value to calculate the node that needs to fall in. The hashing algorithm guarantees that the same eigenvalue will fall on the same server every time.
  • Random: This load balancing algorithm is similar to round-robin scheduling, but is a random process when it comes to allocating processed requests. From the probability theory, it can be seen that with the increase of the number of calls to the server by the client, the actual effect tends to evenly distribute the requests to each server on the server, that is, to achieve the effect of polling.
  • Weighted Round Robin: Requests are proportionally distributed based on the weight value of the server, and the server with the higher weight receives more requests.

Suppose there are three executors 1, 2, and 3, and 300 tasks are randomly assigned to the three executors using random load balancing, and it is very likely that each executor executes 100 tasks, considering an extreme case, the tasks assigned by executor 1 happen to take a long time each, for example, each execution takes a few minutes, and each execution of executors 2 and 3 only takes a few seconds, then there will always be running tasks on executor 1, and then assign new tasks, executor 1 should not actually assign tasks, but due to the stochastic equilibrium strategy, the probability of assigning a task to the three executors is still the same, then there is a problem.

From the above analysis, it can be seen that the use of a simple random balancing strategy will have the problem of load imbalance due to the large difference in the time consumption of different tasks. In order to make the load of each executor about the same, a more intuitive idea is that the scheduler selects the executor with the smallest load each time, and the minimum load can actually be understood as the smallest number of tasks in execution, so when the scheduler initiates a request, how to know which executor has the smallest number of tasks in execution?

You can use a health check to send a health check to all executors every 1 second, and the purpose of the check is twofold: 1) to see if the node is down, 2) to check the load of the node. In this way, the scheduler maintains a list of executors and the load of each executor, which can be queried when the call is made. However, there is a problem with this, the executor load is not real-time, but updated through a health check once a second, according to the simple logic of selecting the minimum load, if there are 100 tasks in a short period of time, then all the tasks will be hit on the same executor, which will cause imbalance.

Assuming that the load of each actuator can be expressed as a percentage, 0% means no load, 100% means full load, and the fully loaded machine can no longer accept requests, then the intuitive idea is that the machine with a low load (the machine with the higher the remaining load capacity) has a higher probability of assigning the task, and if the machine load is a%, the remaining load capacity can be expressed as 100% - a%. Suppose there are three actuators 1, 2, 3 with loads of a%, b%, and c%, respectively, then the probabilities of each machine are respectively assigned.

Actuators load Residual carrying capacity The probability of being selected
01 a% 100% - a% (100% - a%) / [(100% - a%)+(100% - b%)+(100% - c%)] = (100 - a) / [(100 - a)+(100 - b)+(100 - c)]
02 b% 100% - b% (100% - b%) / [(100% - a%)+(100% - b%)+(100% - c%)] = (100 - b) / [(100 - a)+(100 - b)+(100 - c)]
03 c% 100% - c% (100% - c%) / [(100% - a%)+(100% - b%)+(100% - c%)] = (100 - c) / [(100 - a)+(100 - b)+(100 - c)]

For example, the number of tasks in the execution of the actuator is 300, which is full load, then the load calculation formula of the actuator is: the number of tasks in execution/300. At this point, we have introduced our own load balancing algorithm.

5.1.2 API calls the retry policy

The service nodes orchestrated in the process orchestration engine exchange and orchestrate information through network requests, but there is uncertainty in the network, which will cause request jitter. The support for retry of jitter scene API calls can effectively solve the scenario where the execution of orchestration tasks fails due to short-term jitter. The service orchestrated by the process orchestration engine can be used to achieve single-point retry, and the avalanche of retry storm services can be avoided. (You can refer to the relevant materials for retry storms)

The handling of network communication failures is divided into the following steps:

  1. Error perception: Different error codes are used to identify different errors, and the current process orchestration engine supports HTTP interface orchestration, in which status codes can be used to identify different types of errors.
  2. Retry decision: This step is mainly used to reduce unnecessary retries, such as HTTP 4xx errors, usually 4xx represents the client's error, at this time, the client should not retry the operation, or some custom errors in the business should not be retried. Based on these rules, you can effectively reduce the number of unnecessary retries and improve the response speed.
  3. Retry policy: The retry policy includes the retry interval and the number of retries. If the number of retries is too large or the retry interval is too small, a large amount of resources (CPU, memory, threads, and network) may be wasted.

If it persists after retrying, the fault is not a short-term fault, but a long-term fault. Then you can circuit breaker and downgrade the service, and the subsequent requests will not be retried, and the downgrade will be processed during this time to reduce unnecessary requests, and the request will be made after the server has recovered.

针对如上的重试场景,编排引擎通过 Retry 模块,配置 ErrorEquals、IntervalSeconds、MaxAttempts、BackoffRate(每次尝试重试时间间隔倍数)来支持接口重试策略的支持。 配置示例如下:

{
  "Comment": "业务A",
  "StartAt": "unit_a",
  "States":
  {
    "unit_a":
    {
      "Type": "Task",
      "Comment": "审核A单元",
      "Resource":"resource地址,支持http协议、kafka协议、Serverless协议",
"Retry": [
      {
        "ErrorEquals":["StatesTimeout"],
        "IntervalSeconds": 1,
        "MaxAttempts": 2,
        "BackoffRate": 2.0
      }
    ],
      "End": true
    }
  }
}           

5.1.3 Current limiting and fuse treatment mechanism

In a microservices system, one service may depend on another service, and another service may depend on other services. This is what we commonly call the depth of microservice calls, which is illustrated by the following figure:

Tencent Advertising Products Middle Platform Process Orchestration Engine Architecture Implementation
  • When the product review service fails, the creative service can only passively wait for the dependent service to report an error or request timeout, which will result:
  • a. The downstream connection pool will be gradually depleted;
  • b. A large number of ingress requests are piled up, and resources such as CPU and memory are gradually exhausted, which eventually leads to service downtime.
  • The upstream video dump service, which relies on creative services, will also fail for the same reason, and a series of cascading failures will eventually lead to the unavailability of the entire system.

In the microservice governance here, each service module needs to introduce a throttling and circuit breaker degradation processing mechanism to avoid a cluster avalanche, so the same scenario of service orchestration through the orchestration engine will become as follows.

Tencent Advertising Products Middle Platform Process Orchestration Engine Architecture Implementation

Since all services are orchestrated through the orchestration engine, we can understand the call depth as 1. Similarly, if the product review service fails, the orchestration engine can only wait for the request to time out, and if the same large number of requests are continuously requested, it will also cause cascading failures of the commodity gateway, orchestration engine, and product review.

The core idea of solving the avalanche cascade fault is: 1. Introduce fuses and current limiters, so that the request is not overloaded and the service fails as soon as possible to avoid avalanche. 2. The anti-pressure layer is abstracted through the message queue, and buffering and counter-pressure can be done when the downstream service has jitter. The latter is used here.

Tencent Advertising Products Middle Platform Process Orchestration Engine Architecture Implementation

5.1.4 Service Isolation Policy

Tencent Advertising Products Middle Platform Process Orchestration Engine Architecture Implementation

Based on the execution characteristics of different businesses, the process orchestration engine of the commodity library is independently deployed in clusters according to the dimensions of hotspots and user scenarios to achieve real physical isolation.

5.2 High Performance

In terms of high performance, the engine has been fully stress tested, and the core has continuously optimized and improved the performance of the storage engine in terms of IO time, large object data structure, concurrent processing, and the use of dependent component packages. Optimization details are not covered in detail in this article.

5.3 Extensible

In terms of scalability, the engine is deployed on Tencent Cloud TKE using docker, and the engine itself is stateless, so the scheduling strategy based on the TKE platform realizes dynamic scaling of resources based on resource pressure.

Author: Pei Bo

Source-WeChat Official Account: Tencent Cloud Developer

Source: https://mp.weixin.qq.com/s/GFQ5BFMH7-L-HUfgSgokig

ad

Read on