laitimes

ElasticSearch - Troubleshoot bulk update bulklock deadlocks

author:JD Cloud developer

First, the problem system introduction

1. Listen for MQ messages of commodity changes, query the latest information of products, and call BulkProcessor to update commodity field information in the ES cluster in batches;

2. Due to the large amount of commodity data, commodity data is stored on the ES cluster, and the entire ES cluster is divided into 256 shards, and the shards are routed according to the third-level category ID of the commodity.

For example, if the product name of a SKU changes, we will receive the MQ message of the change of the SKU, and then query the commodity interface, query the latest name of the product back, and then route according to the three-level classification ID of the SKU, find the corresponding ES cluster shard, and then update the product name field information.

Due to the huge volume of MQ messages for commodity changes, in order to improve the performance of updating ES and prevent the backlog of MQ messages, this system uses BulkProcessor for batch asynchronous updates.

The ES client versions are as follows:

<dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>6.5.3</version>
        </dependency>

           

The BulkProcessor configuration pseudocode is as follows:

//在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                // 1000条数据请求执行一次bulk
                .setBulkActions(1000)
                // 5mb的数据刷新一次bulk
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                // 并发请求数量, 0不并发, 1并发允许执行
                .setConcurrentRequests(1)
                // 固定1s必须刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(1L))
                // 重试5次,间隔1s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();           

Second, how the problem was found

1. After the start of the 618 promotion, due to the frequent change of MQ messages, the daily message volume of MQ messages has reached several times the daily level, and many products have also changed the three-level category ID;

2. When the system updates the SKU product information with the changed tertiary category ID, an error occurs when updating the product information according to the fragment routed after the modified tertiary category ID, and it retries 5 times, but it is still unsuccessful;

3. Because there is no index information of this commodity on the shard of the new route, these update requests will never be executed successfully, and a large number of abnormal retry logs will be recorded in the log files of the system.

4. Commodity change MQ messages also began to appear backlog alarms, and the consumption speed of MQ messages obviously could not keep up with the production speed.

5. Observe the UMP monitoring data of MQ message consumers, and find that the consumption performance is very stable and there is no obvious fluctuation, but the number of calls will drop off a cliff after the system consumes MQ for a period of time, and the original call volume per minute gradually drops to single digits.

6. After restarting the application, the system starts consumption again, and the number of UMP monitoring calls returns to the normal level, but after the system runs for a period of time, the consumption pause problem will still occur, as if all consumption threads have been suspended.

ElasticSearch - Troubleshoot bulk update bulklock deadlocks

Third, the detailed process of troubleshooting the problem

First, find a container that pauses the consumption of MQ messages, view the application process ID, use the jstack command to dump the entire thread stack information of the application process, and package and upload the exported thread stack information to https://fastthread.io/ for thread state analysis. The analysis report is as follows:

ElasticSearch - Troubleshoot bulk update bulklock deadlocks

Through the analysis report, it is found that there are 124 threads in the BLOCKED state, and then you can click to view the detailed stack information of each thread, and the stack information is as follows:

ElasticSearch - Troubleshoot bulk update bulklock deadlocks

Continuously viewing the detailed stack information of multiple threads, MQ consuming threads are waiting to lock <0x00000005eb781b10> (a org.elasticsearch.action.bulk.BulkProcessor), and then search according to the 0x00000005eb781b10 to find that this object lock is being occupied by another thread, occupying the thread stack information as follows:

ElasticSearch - Troubleshoot bulk update bulklock deadlocks

This thread state is in the WAITING state at this time, and it is found by the thread name that the thread should be an internal thread of the ES client. It is this thread that preempts the lock of the business thread, and then waits for other conditions to trigger the thread to execute, so that all MQ consumer service threads have been unable to obtain the lock inside BulkProcessor, resulting in the consumption suspension problem.

But why can't this thread, elasticsearch[scheduler][T#1], execute? When was it launched? And what does it do?

Since BulkProcessor is created through the builder module, we need to go deep into the builder source code to understand the creation process of BulkProcessor.

public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }           

A time scheduling execution thread pool is created internally, and the thread naming rules are similar to the names of the threads holding the locks described above, and the specific code is as follows:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }

           

Finally, the internal parameter constructor of BulkProcessor is executed inside the build method, and a periodically executed flushing task is started inside the constructor, and the code is as follows

BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
        this.onClose = onClose;
    }           
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
        if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }           
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute();
            }
        }
    }           

Through the source code discovery, the flush task is the fixed-time flush logic set when creating the BulkProcessor object, and when the setFlushInterval method parameter takes effect, a background scheduled flush task will be started. The flush interval, defined by the setFlushInterval method parameter. During the flush task, it will also preempt the BulkProcessor object lock during the run, and the execute method will be executed only after the lock is snatched. The specific method call relationship source code is as follows:

/**
     * Adds the data from the bytes to be processed by the bulk processor
     */
    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
        bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
        executeIfNeeded();
        return this;
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }

    // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();

        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }           

The add method in the above code is called by the MQ consumer service thread, which also has a synchronized keyword on the method, so the consuming MQ business thread will directly compete with the flush task execution thread. The specific MQ consumption service thread calls the pseudocode as follows:

@Override
 public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
            String source = JsonUtil.toString(commonSkuEntity);
            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            updateRequest.doc(source, XContentType.JSON);
            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            indexRequest.source(source, XContentType.JSON);
            updateRequest.upsert(indexRequest);
            updateRequest.routing(commonSkuEntity.getCat3().toString());
            fullbulkProcessor.add(updateRequest);
}             

Through the above analysis of the thread stack, it is found that all business threads are waiting for the elasticsearch[scheduler][T#1] thread to release the BulkProcessor object lock, but the thread has not released the object lock, resulting in the deadlock problem of the business thread.

Combined with the large number of exception retry logs that appear in the application log file, it may be related to BulkProcessor's exception retry strategy, and then further understand BulkProcessor's exception retry code logic. Since the BulkRequest requests submitted in the business thread are uniformly submitted to the execute method in the BulkRequestHandler object for processing, the code is as follows:

public final class BulkRequestHandler {
    private final Logger logger;
    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
    private final BulkProcessor.Listener listener;
    private final Semaphore semaphore;
    private final Retry retry;
    private final int concurrentRequests;

    BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
        assert concurrentRequests >= 0;
        this.logger = Loggers.getLogger(getClass());
        this.consumer = consumer;
        this.listener = listener;
        this.concurrentRequests = concurrentRequests;
        this.retry = new Retry(backoffPolicy, scheduler);
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
    }

    public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            });
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
            semaphore.release(this.concurrentRequests);
            return true;
        }
        return false;
    }
}

           

The BulkRequestHandler initializes a Retry task object through a constructor, and a Scheduler is passed in the object, and the object is passed in the same thread pool as the flush task, and only one fixed thread is maintained inside. The execute method will first control the number of concurrent executions according to Semaphore, which is specified by parameters when building the BulkProcessor, and the value is found to be configured as 1 through the above configuration. So only one thread is allowed to execute the method at a time. That is, the MQ consumption service thread and the flush task thread, and only one thread can be executed at the same time. Then let's take a look at how the retry task is executed, as shown in the following code:

public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
                            ActionListener<BulkResponse> listener) {
        RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
        r.execute(bulkRequest);
    }           

RetryHandler will execute the request to submit bulkRequest, listen for bulkRequest execution exception status, and then execute the task retry logic, the retry code is as follows:

private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
        }           

RetryHandler re-hands the failed bulk request to the internal scheduler thread pool for execution, and through the above code, only one fixed thread is maintained inside, and the thread pool may be occupied by another flush task for execution. So if the only thread in the thread pool is executing the flush task while the retry logic is executing, it will block the execution of the retry logic, and if the retry logic cannot be executed, Semaphore will not be released, but because the number of concurrency is configured to be 1, the flush task thread needs to wait for other threads to release a Semaphore permission before continuing execution. So a loop wait is formed here, resulting in both Semaphore and BulkProcessor object locks not being released, causing all MQ consumer threads to block before acquiring the BulkProcessor lock.

At the same time, similar problems can be found on GitHub's ES client source code client, such as: https://github.com/elastic/elasticsearch/issues/47599, so it further confirms the previous conjecture, that is, because bulk's continuous retries caused the deadlock problem inside BulkProcessor.

Fourth, how to solve the problem

Now that we have understood the cause of the problem, there are several solutions:

1. Upgrade the ES client version to the official version 7.6, and subsequent versions avoid the competition of the thread pool by physically separating the exception retry task thread pool and the flush task thread pool, but version compatibility needs to be considered.

2. Since the deadlock problem is caused by a large number of abnormal retry logic, the retry logic can be canceled without affecting the business logic, the solution can not need to upgrade the client version, but the business impact needs to be evaluated, and the failed request can be retried by other other methods.

If there is any omission, please correct it!

Author: JD Retail Cao Zhifei

Source: JD Cloud Developer Community