laitimes

How does the dry goods | data push platform cope with the data flood scenario?

author:Jiangjiang 111

preface

In daily work, I believe that everyone will encounter such a scene as data flood, such as when the e-commerce platform engages in activities, a large number of requests are concentrated in a short period of time, and the pressure on the system at this time is far beyond usual, if you do not take corresponding precautions in advance, the system will most likely collapse and be unusable.

Business background

Our application system generates a large amount of business data every day (can be simply understood as goods, orders, etc.), there are many external platforms that we work with need to subscribe to this data, at this time we have a data push platform inside our system responsible for pushing the internal data of our system to external partners, the data link is as follows:

How does the dry goods | data push platform cope with the data flood scenario?

Internal business systems deliver different types of business data to MQ, and the data push platform pushes data to different partners asynchronously by consuming MQ messages, performing a series of processes.

Technical background

Scenario selection

The three-plate axe that generally deals with high concurrency scenarios is caching, fuse (downgrade), and current limiting:

Caching is often used in high-QPS business scenarios, and is obviously less suitable for this kind of data push.

Fuse (downgrade) is generally used in scenarios where the call to a downstream service fails to prevent an avalanche effect. The data push platform is relatively independent, and there is no internal service call; However, external partners do have service availability problems, and various situations often lead to data push exceptions, so it is possible to use fuse (downgrade) processing for external partners that have exceptions.

Flow restriction is that when high concurrency or instantaneous high concurrency, in order to ensure the stability and availability of the system, the system at the expense of part of the request or delay in processing the request, to ensure that the overall service of the system is available, which is very suitable for our business scenario.

Fuse (downgrade) can indeed solve the problem that some external platforms are occasionally unavailable and cause our system resources to be occupied, but it cannot solve the fundamental problem brought about by our data flood scenario: the limited nature of system resources, so the data push platform chooses to limit the flow to cope with the data flood scenario.

Scenario application

There are many specific schemes of throttling, common ones are token bucket mode, leak bucket mode, counter mode, where counter mode can be subdivided into AtomicInteger, Semaphore, thread pool, etc. According to the implementation mode, this time we choose the counter method.

In the case of data push, if the synchronous push method is adopted, the push efficiency will be greatly limited by the number of MQ consumer threads (default setting 20), and if the thread pool is used, the size of the thread pool is not convenient to set (because the size of each message body varies greatly, ranging from 1K to 5M).

Combining the above factors, and with external partners for the vast majority of the next scenario is to use HTTP transport protocol, the final push using Apache's HttpAsyncClient (internal reactor model based on the reactor model) asynchronous mode to execute network requests, Callback callback to get the push results. Use the business data type as a throttling dimension to limit the flow according to the number of pushes in the current application instance.

For example, the mobile phone-related commodity data service type is PRODUCT_PHONE, the default throttling number for each business type is set to 50, and when the amount of data being pushed by this service type in the memory of a single application instance reaches 50, the data of the service type will be rejected from Article 51 until the amount of data being pushed drops below 50. For rejected messages to return to MQ for later consumption, MQ will intermittently consume retries.

The pseudocode is as follows:

//该业务类型在当前节点的流量 Integer flowCount = BizFlowLimitUtil.get(data.getBizType()); //该种业务类型对应的限流 Integer overload = BizFlowLimitUtil.getOverloadOrDefault(data.getBizType(), this.defaultLimit);  if (flowCount >= overload) {       throw new OverloadException("业务类型:" + data.getBizType() + "负载过高,阈值:" + overload + ",当前负载值:" + flowCount);  }复制代码           

A service throttling link has been added to the data push platform:

How does the dry goods | data push platform cope with the data flood scenario?

Possible problems

According to the above scheme, the maximum resource required by the system to cope with the data flood = the number of service types * the number of throttlings, and with the expansion of the service, the number of service types is also increasing, and the maximum resources required will continue to increase, but the resources of the service instance are always limited. In this case, the flow limit is only based on the amount of data of the business data type, and the effect will gradually become unsatisfactory, and the service may even crash in extreme scenarios.

Stress testing

Of course, the problems with the above scheme are only one of our ideas, and we conduct stress tests to observe the overall situation of the push system.

resource allocation

Number of application instances: 1

Instance configuration: 1 core 2G

jvm parameters:

-Xmx1g -Xms1g -Xmn512m -XX:SurvivorRatio=10 -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=5000 -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseCMSInitiatingOccupancyOnly -XX:+ExplicitGCInvokesConcurrent -XX:ParallelGCThreads=2 -Xloggc:/opt/modules/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/modules/java.hpro复制代码           

Selection of data indicators and tools

In the stress test we usually pay attention to CPU, memory, network io, database and other data indicators, in the case of not considering the network io, database and other external middleware factors, cpu, memory is the most intuitive data indicators for us to observe the stability of the system.

Arthas is Alibaba's open source JAVA diagnostic tool, available in the official documentation: arthas.aliyun.com/doc/. We use Arthas to observe the service, log in to the server, open the console, and install and start Arthas using the following command:

curl -O https://arthas.aliyun.com/arthas-boot.jarjava -jar arthas-boot.jar复制代码           

Arthas provides a dashboard command to view the real-time running status of the service JVM, such as not specifying a refresh interval, the default 5s refresh once. Typing the dashboard in the console after starting Arthas appears as follows:

How does the dry goods | data push platform cope with the data flood scenario?

The first half is mainly the thread situation in the current service JVM, and you can see that the use of CPU by each thread is extremely low and basically idle.

In the second half of the memory box, we are mainly concerned about the following data indicators:

  • heap (heap size)
  • par_eden_space (Eden-size)
  • par_survivor_space (S-zone size)
  • cms_old_gen (age)
  • gc.parnew.count (total number of young gc)
  • gc.parnew.time (young gc total time consumed)
  • gc.concurrentmarksweep.count(full gc total number)
  • gc.concurrentmarksweep.time(full gc total time consumed)

The meaning of each column is also clear, namely used, total size, maximum, and percentage used. Just looking at the nouns may not be able to remember the internal division of the JVM for a while, to help you recall:

How does the dry goods | data push platform cope with the data flood scenario?

After 5s, the Arthas console outputs as follows:

How does the dry goods | data push platform cope with the data flood scenario?

Combined with the data before the **5s**, we mainly focus on the following indicators:

  • Threading condition: Thread CPU usage has not changed significantly
  • heap (heap size): The heap usage size is increased by 3m
  • par_eden_space (Eden size): The Eden zone in the younger generation only increases by 3m, and according to the size of the Eden zone 426m and the s-zone 42m, it takes about 780 seconds (about 13 minutes) to trigger a young gc
  • par_survivor_space (S-zone size): No change
  • cms_old_gen (age): No change
  • gc.parnew.count (total number of young gc): No change
  • gc.parnew.time (young gc total time consumed): No change
  • gc.concurrentmarksweep.count (full gc total): No change
  • gc.concurrentmarksweep.time (full gc total time consuming): No change

This is a situation when the service has no traffic is basically idle, and then simulates the scene when a large amount of data of different business types is backlogged and pushed, and the data is delivered to MQ by the test students in advance through an automation script.

Backlog of 5,000 data

Use the Arthas command dashboard -i 1000 to output at 1s intervals:

How does the dry goods | data push platform cope with the data flood scenario?

After 1s:

How does the dry goods | data push platform cope with the data flood scenario?

Comparing the data findings twice:

  • Threading condition: MQ's default 20 consumer threads are active and consume CPU resources
  • heap size: The used size has increased from 293m to 478m
  • par_eden_space (Eden size): Eden uses 23m before the occurrence of young gc, the total size of Eden is 426m, and after the occurrence of young gc, Eden uses 211m, which means that at least (426-23) + 211 = 614m of the size of the object has been added within 1s
  • par_survivor_space (S-zone size): The size of S-zone is 31m before young gc, and the size of S-zone after young gc is 29m
  • cms_old_gen (age): No change
  • gc.parnew.count :1 young gc occurred
  • gc.parnew.time (young gc total time): The time increased (9018-8992) = 26 milliseconds, which is the duration of a young gc
  • gc.concurrentmarksweep.count (full gc total): No change
  • gc.concurrentmarksweep.time (full gc total time consuming): No change

According to the interval of 1s found that the young gc occurred, and the data of the old age did not change, may be caused by a short time interval, we observe according to the interval of 5s, type dashboard -i 5000 Output is as follows:

How does the dry goods | data push platform cope with the data flood scenario?

After 5s:

How does the dry goods | data push platform cope with the data flood scenario?

Comparing the data twice, the key information is as follows:

  • 7 young gcs occurred within 5 seconds
  • The old age grew from 233m to 265m, an increase of about 32m, according to the size of the old age of 512m, about 80s will occur a full gc

Backlog 1W data

Start at 1s intervals:

How does the dry goods | data push platform cope with the data flood scenario?

After 1s:

How does the dry goods | data push platform cope with the data flood scenario?

Comparing the data twice, we know:

  • Two young gcs occurred within 1s
  • At the same time, the old age increased from 304m to 314m, 1s increased by 10m, and the old age size was 512m, at this rate, about 50s will trigger a full gc

Due to the asynchronous way of data push, the downstream of the push platform has not yet completed the data push, and the upstream is still continuously consuming messages from MQ, continuing to observe:

How does the dry goods | data push platform cope with the data flood scenario?

After 1s:

How does the dry goods | data push platform cope with the data flood scenario?

Comparing the data findings twice:

  • Cpu usage for GC threads remains high
  • A full gc occurred within 1s

A second ago, the old age has used a size of 418m, the total size is 512m, 1s after the discovery triggered a full gc, according to the above data analysis of the old age at a rate of every 10m / s growth, obviously the remaining space of the old age is enough, why is there a full gc in advance?

First, let's review when the full gc happened:

The first case: the available memory of the old generation is smaller than the full object size of the younger generation, and there is no open space guarantee parameter ( -XX:-HandlePromotionFiilure).

After JDK6, the HandlePromotionFailure parameter no longer affects the space allocation guarantee policy of the virtual machine, we are using JDK8, so the first case is not satisfied.

The second case: The available memory of the old generation is smaller than the total object size of the young generation, and the space guarantee parameter is turned on, but the available memory is smaller than the average object size of the old generation after the previous young generation GC.

According to the previous analysis, the size of the object entering the old age per second is about 10m, while the current remaining size of the old age is about (512-418) = 94m, so the second case is not satisfied.

The third case: the surviving object of the young gc generation is larger than the s-zone, and it will enter the old age, but the old age is not enough memory.

In the same way as in the second case, the third case is not quite satisfied.

Fourth case: the parameter -XX:CMSInitiatingOccupancyFaction, the available memory in the old age is greater than the average size of the object that entered the old age after the previous young generation GC, but the old age has used the memory more than the proportion specified by this parameter, and the full gc is automatically triggered.

Looking at the resource configuration information of the service instance, we found that the JVM startup parameter has been added: -XX:CMSInitiatingOccupancyFraction=80, which means that the old age will trigger a full gc when it reaches 512 * 80% = 409M size. This parameter is primarily intended to solve the CMF (Concurrent Mode Failure) problem, but in some cases it can also cause full gc to be more frequent. It seems that this parameter is the reason why the old space is not full but appears in advance.

Now we know that the reason for triggering full gc in advance is due to the configuration of the CMSInitiatingOccupancyAction parameter, which is normally set to 80%, but is there such an extreme case?

The space of the old age has not been recycled much after the occurrence of full gc, and the size of the space used in the old age has always been above the threshold set by CMSInitiatingOccupancy Fraction, resulting in non-stop full gc?

Backlog 2W pieces of data

Start at 5s intervals:

How does the dry goods | data push platform cope with the data flood scenario?

After 5s:

How does the dry goods | data push platform cope with the data flood scenario?

Comparing the data findings twice:

  • Threading condition: cms garbage collection thread CPU usage is extremely high
  • Old Age: Used 511m (total size 512m)
  • Full gc number: 3 full gcs occurred within 5s
  • full gc total time spent: The total time spent increased from 15131ms to 14742ms

3 times full gc occurs within 5s, the old age is always in the case of 511m (total size 512m), the average time of each full gc (81942-79740) / 3 = 734 ms, equivalent to 2.2s of time in 5s is in full gc.

At this point, viewing the logs reveals that a large number of socket connection timeouts occurred when the data was pushed:

How does the dry goods | data push platform cope with the data flood scenario?

Looking at the **gc** log at that time, I found that the difference between the two full gcs was only about 1.4s, from 524287k to 524275k, only 12k of memory space was reclaimed, but it took 0.71s, and the system spent half the time on full gc!

How does the dry goods | data push platform cope with the data flood scenario?

Using the monitoring market grafana to view the CPU and network IO situation at that time, you can see that due to the frequent Stop the World and high CPU load problems caused by full gc, the threads related to network requests cannot be effectively scheduled, resulting in a decrease in network IO throughput.

How does the dry goods | data push platform cope with the data flood scenario?

Optimize the scenario

Problem analysis

Through the above tests, it can be found that the problems of the system are mainly: due to the downstream consumption rate (executing network requests for data push) can not keep up with the upstream delivery rate (mq consumption), resulting in jvm heap memory is gradually filled, the system frequently full gc caused the service to be unavailable, until the OOM program crashes.

Optimization ideas

In this scenario, the main bottleneck of the system is the jvm heap memory size, to avoid the system frequent full gc to achieve the purpose of improving system stability, you can start from the following two aspects.

JVM parameter optimization

The original jvm parameter was:

-Xmx1g -Xms1g -Xmn512m -XX:SurvivorRatio=10 -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=5000 -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseCMSInitiatingOccupancyOnly -XX:+ExplicitGCInvokesConcurrent -XX:ParallelGCThreads=2 -Xloggc:/opt/modules/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/modules/java.hpro复制代码           

The adjustment points are as follows:

  • The total memory of the instance is 2G, in addition to our service on the instance, there is no other memory-occupying service installed, the original heap size of jvm is only allocated 1G, a bit wasteful, so adjust the heap size of jvm to 1.5G: -Xmx1536M -Xms1536M
  • Previously, the size of the younger generation was set to 512M in the data push platform, which was not very appropriate. In the evening business peak period, after triggering the full gc through the jmap command, observe the old age and find that the resident object is about 150M, consider floating garbage, etc., the old generation allocates 521M, and then takes into account the meta space and the resources required for the thread stack, so the younger generation adjusts to 1G size: -Xmn1024M
  • The ratio of Eden and S-zones in the younger generation was adjusted from 10:1:1 to 8:1:1, avoiding too many S-zones of surviving objects after young gc, resulting in direct entry into old age: -XX:SurvivorRatio=8
  • The metaspace size is generally allocated 256m : -XX:MaxMetaspaceSize=256M -XX:MetaspaceSize=256M
  • The thread stack is typically set to 1m : -Xss1M
  • Use the ParNew garbage collector for younger generations: -XX:+UseParNewGC

The final optimized jvm parameters are:

-Xmx1536M -Xms1536M -Xmn1024M -Xss1M -XX:MaxMetaspaceSize=256M -XX:MetaspaceSize=256M -XX:SurvivorRatio=8 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=5000 -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseCMSInitiatingOccupancyOnly -XX:+ExplicitGCInvokesConcurrent -Xloggc:/opt/modules/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/modules/java.hprof复制代码           

JVM resource throttling

Through stress testing, it is found that the space utilization of the old age is too high (exceeding the -XX:CMSInitiatingOccupancyFraction parameter value) that causes frequent full gc, so whether it is possible to try to limit the upstream based on jvm heap memory usage, which has a similar back-pressure effect. Let's add a JVM resource throttlinger with the core logic of the throttling as:

Set the usage of a jvm heap memory, block the current consumer thread or directly refuse to consume when this threshold is exceeded, and then release it until the usage rate falls below the threshold.

The pseudocode is as follows:

public class ResourceLimitHandler{     /**     * jvm堆限流阈值     */    private Integer threshold = 70;     /**     * 单次睡眠时间(毫秒)     */    private Integer sleepTime = 1000;     /**     * 最大阻塞时间(毫秒)     */    private Integer maxBlockTime = 15000;      private MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();     @SneakyThrows    public void process() {         long startTime = System.currentTimeMillis();         double percent = this.getHeapUsedPercent();         //jvm heap使用率超过阈值,进入限流逻辑        while (percent >= this.threshold ) {            //资源使用过高,但超过最大阻塞时间,采用放行策略            if (this.maxBlockTime >= 0 && (System.currentTimeMillis() - startTime) > this.maxBlockTime) {                 //兜底,防止因为限流导致年轻代无新对象产生,达不到young gc 触发条件的极端情况,所以手动触发一次full gc                synchronized (ResourceLimitHandler.class) {                    if ((percent = this.getHeapUsedPercent()) >= this.threshold) {                        System.gc();                    }                }                 return;            }             TimeUnit.MILLISECONDS.sleep(this.sleepTime);             percent = this.getHeapUsedPercent();        }     }     /**     * 计算堆的使用百分比     *     * @return     */    private double getHeapUsedPercent() {         long max = this.getHeapMax();         long used = this.getHeapUsed();         double percent = NumberUtil.div(used, max) * 100;         return percent;    }     /**     * 可用堆最大值     *     * @return     */    private long getHeapMax() {         MemoryUsage memoryUsage = this.memoryMXBean.getHeapMemoryUsage();         return memoryUsage.getMax();    }     /**     * 已使用堆大小     *     * @return     */    private long getHeapUsed() {         MemoryUsage memoryUsage = this.memoryMXBean.getHeapMemoryUsage();         return memoryUsage.getUsed();    } }复制代码           

The code is still relatively simple, where the setting of the jvm heap memory threshold is more critical, and the setting of this value gives the following reference

Maximum threshold

The critical situation that triggers full gc due to the -XX:CMSInitiatingOccupancyFraction parameter is that the available space of the younger generation is fully used, and the space utilization rate of the old generation reaches the ratio set by -XX:CMSInitiatingOccupancyFraction, so the following calculation formula is obtained:

最大阈值百分比 = (年轻代可使用大小 + 老年代大小 *  CMSInitiatingOccupancyFraction 参数值)/ 堆大小复制代码           

* Younger generations can use the size :* Eden zone size + the size of a single s zone (because the two s zones are replaced in turn, and only one is always stored in the object)

Substituting the optimized jvm parameter yields the maximum threshold percentage = ( 1024 * 0.9 + 512 * 0.8 ) / 1536 = 87%

Minimum threshold

Threshold setting too low will affect the normal business processing, at least to ensure that the young gc can be triggered, and there are many cases of actually triggering young gc, here is not further discussed, for the time being only consider the most common due to the young generation space is not enough space to put down the new object of the scene, so the conclusion:

Minimum threshold percentage = Young generation can use size / Heap size Minimum threshold percentage = Young generation can use size / Heap size

Substituting the optimized jvm parameter yields The minimum threshold percentage = (1024 * 0.9)/ 1536 = 60%

Scenario validation

Practice the real chapter, we test again the way we did before

resource allocation

Number of application instances: 2

Instance configuration: 2 core 4G

Data volume: 5w data backlog in MQ

Note: The data during the test process is no longer displayed here, only the result of all the data after the final push is completed.

Before optimization

Arthas dashboard after pre-optimization push is complete:

How does the dry goods | data push platform cope with the data flood scenario?

Grafana monitors the broader market:

How does the dry goods | data push platform cope with the data flood scenario?

After optimization

After optimizing the jvm parameters and adding a resource limiter, the Arthas dashboard is complete after the push is complete:

How does the dry goods | data push platform cope with the data flood scenario?

Grafana monitors the broader market:

How does the dry goods | data push platform cope with the data flood scenario?

Results are compared

Total data push time Full gc count Full gc total time consuming Average time consuming per full gc
Before optimization Approx. 35 min 312 309232 991ms
After optimization Approx. 18 min 104 45387 436ms

Comparing before and after optimization, it can be found that after optimization, whether it is the total time spent on data pushes, the number of times full gc or the average time spent on full gc, there has been a great reduction, and the overall performance has been almost doubled.

summary

Based on the business scenario and technical background of the data push platform, we speculate that simply stream control from the task concurrency in the data flood scenario may not achieve the purpose of ensuring the stability of the system, and we have verified this conjecture through stress testing. Through the analysis, it is found that the bottleneck of the system is mainly the limited memory resources of the jvm heap, because the downstream consumption rate (performing network requests for data push) is not as good as the upstream delivery rate (consuming MQ messages, assembling push tasks), the stacked objects in the jvm continue to grow and cannot be recycled, resulting in frequent full gc, resulting in system unavailability.

Usually the mainstream current limiting method in our system is based on the number of concurrency to deal with, the need to test the students for stress testing, comprehensive consideration of network IO, database and other external middleware to obtain a relatively reasonable value, in the daily work of different environments in the service instance configuration are slightly different, the number of concurrent loads will also have certain differences. If the number of throttling concurrency is set too high, there will be a risk of service crash in high concurrency scenarios, and if the system resource level is supplemented by throttling, it can ensure that the service will not be instantly broken by the surge traffic.

Scenarios:

  • System-level full throttling prevents service crashes, such as when used in Spring MCV filters, dubbo filters, etc.
  • You need to set up a cache queue, and the size of the data object in each task in the queue varies greatly, the size of the queue is difficult to set, and there is a risk of OOM simply using the unbounded queue, which can be used with this scheme to limit the unbounded queue.

Deficiencies in the program:

  • This jvm heap memory throttling scheme is only suitable for services that use the CMS garbage collector for the older services that use the CMS garbage collector because it relies on the CMSInitiatingOccupancyAction parameter on the full gc, and most services above the **16G** memory use the G1 garbage collector.
  • The specific value of the jvm heap usage threshold depends on the jvm related parameter settings, which requires the user to have a certain understanding of the internal mechanism of the jvm.
  • It is not recommended as the only throttling processing logic, because the carrying capacity of the service in the actual scenario is also related to other factors such as network IO and database.
  • The stability and reliability of the scheme need more case verification.

Conclusion: Each scheme has its own advantages and disadvantages, limitations, according to the jvm heap memory usage for stream limiting, not applicable to all business scenarios, but as a new throttling scheme for your reference to expand the idea, play a role in throwing bricks and jade, if there is anything wrong in the text, please correct.

Original link: https://juejin.cn/post/7129656760209506311

Read on