天天看点

Presto Task 执行一、创建Task二、更新Task三、运行Task

文章目录

  • 一、创建Task
    • Client端
    • Resource端
  • 二、更新Task
    • Client端
    • Resource端
  • 三、运行Task
    • 1. TaskExecutor
    • 3. PrioritizedSplitRunner
    • 4. DriverSplitRunner
    • 5. Driver

向Presto集群提交的所有查询最终都会转化成为一个个单独的Task在每个Worker节点上执行。每个task的作用就是处理一个或者多个Split,然后将处理的结果输送给下游Stage中的task。

一、创建Task

创建Task主要是调用HttpRemoteTask类的构造方法创建一个对象,并调用其start方法,而start方法最终又会调用封装在HttpRemoteTask内部的httpClient,向特定的Worker Node上的TaskResource服务发起RESTful请求,从而在特定的Worker Node上启动一个对应的SqlTaskExecution进行数据处理和计算。因此创建Task分为两部分:

Client端

在Client端创建Task的过程分为两步:创建HttpRemoteTask对象和调用该对象的start方法。启动Task的主要代码都集中在方法sendUpdate中。

// HttpRemoteTask.java
private synchronized void sendUpdate()
{
    TaskStatus taskStatus = getTaskStatus();

	// 将当前Task需要处理的所有Split都封装成TaskSource列表
    List<TaskSource> sources = getSources();
	// 将执行计划序列化以通过http发送给worker节点
    Optional<byte[]> fragment = sendPlan.get() ? Optional.of(planFragment.toBytes(planFragmentCodec)) : Optional.empty();
    Optional<TableWriteInfo> writeInfo = sendPlan.get() ? Optional.of(tableWriteInfo) : Optional.empty();
	// 使用TaskSource列表、outputBuffers创建request请求
    TaskUpdateRequest updateRequest = new TaskUpdateRequest(
            session.toSessionRepresentation(),
            session.getIdentity().getExtraCredentials(),
            fragment,
            sources,
            outputBuffers.get(),
            writeInfo);
    byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);

	// 生成一个Post请求
    HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
    Request request = setContentTypeHeaders(isBinaryTransportEnabled, preparePost())
            .setUri(uriBuilder.build())
            .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
            .build();

    ResponseHandler responseHandler;
    if (isBinaryTransportEnabled) {
        responseHandler = createFullSmileResponseHandler((SmileCodec<TaskInfo>) taskInfoCodec);
    }
    else {
        responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(taskInfoCodec));
    }

    updateErrorTracker.startRequest();

    ListenableFuture<BaseResponse<TaskInfo>> future = httpClient.executeAsync(request, responseHandler);
    currentRequest = future;

    Futures.addCallback(
            future,
            new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR),
            executor);
}
           

需要将当前Task处理的数据都封装成TaskSource的列表,其中一个TaskSource代表一个Task处理的数据源,而Task处理的数据源又分为两类:Stage的输出和直接的数据源。对于Stage输出类型的数据源,TaskSource类封装了一个Stage的PlannodeId和根据该Stage上的TaskLocation生成的ScheduledSplit列表;对于直接的数据源,TaskSource类封装了一个数据源的PlannodeId和根据该数据源上的所有数据分片生成的ScheduledSplit列表。方法getSources会将两种数据源都封装成TaskSource,然后合并到一起。

Resource端

Coordinator向特定的Worker节点发送请求以启动一个SqlTaskExecution对象,用于执行Task计算任务,请求的处理均由类TaskResource完成。

// TaskResource.java
@POST
@Path("{taskId}")
@Consumes({APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
@Produces({APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
    Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());
    TaskInfo taskInfo = taskManager.updateTask(session,
            taskId,
            taskUpdateRequest.getFragment().map(planFragmentCodec::fromBytes),
            taskUpdateRequest.getSources(),
            taskUpdateRequest.getOutputIds(),
            taskUpdateRequest.getTableWriteInfo());

    if (shouldSummarize(uriInfo)) {
        taskInfo = taskInfo.summarize();
    }

    return Response.ok().entity(taskInfo).build();
}
           

对新建Task的RESTful的处理主要是由方法taskManager.updateTask来完成的,其实就是调用类SqlTaskManager中的updateTask方法。

// SqlTaskManager.java
@Override
public TaskInfo updateTask(...)
{
	// tasks是一个全局缓存,根据taskId获得已经缓存的SqlTask,若没有与之对应的SqlTask,则会新建一个
    SqlTask sqlTask = tasks.getUnchecked(taskId);
    sqlTask.recordHeartbeat();
    return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo);
}
           

上面的代码是通过tasks.getUnchecked(taskId)获得SqlTask对象的,其实tasks是一个com.google.common.cache.LoadingCache对象,因此,若根据taskId第一次获得对应的SqlTask对象是需要创建该SqlTask对象的,而怎么创建SqlTask对象,是在LoadingCache对象初始化的时候就已经指定了的。

// SqlTaskManager.java
tasks = CacheBuilder.newBuilder().build(CacheLoader.from(
        taskId -> createSqlTask(
                taskId,
                locationFactory.createLocalTaskLocation(taskId),
                nodeInfo.getNodeId(),
                queryContexts.getUnchecked(taskId.getQueryId()),
                sqlTaskExecutionFactory,
                exchangeClientSupplier,
                taskNotificationExecutor,
                sqlTask -> {
                    finishedTaskStats.merge(sqlTask.getIoStats());
                    return null;
                },
                maxBufferSize,
                failedTasks,
                spoolingOutputBufferFactory)));
           

可以看出,每次taskId对应的SqlTask对象不存在的时候,都会重新创建一个。获得了SqlTask对象之后就会调用sqlTask.updateTask方法启动Task,进行计算。

// SqlTask.java
// 更新需要处理的Split列表
public TaskInfo updateTask(
        Session session,
        Optional<PlanFragment> fragment,
        List<TaskSource> sources,
        OutputBuffers outputBuffers,
        Optional<TableWriteInfo> tableWriteInfo)
{
    outputBuffer.setOutputBuffers(outputBuffers);

    // 确定该task execution只会被创建一次
    SqlTaskExecution taskExecution;
    synchronized (this) {
        // 如果task已经执行完成?
        TaskHolder taskHolder = taskHolderReference.get();
        if (taskHolder.isFinished()) {
            return taskHolder.getFinalTaskInfo();
        }
        taskExecution = taskHolder.getTaskExecution();
        if (taskExecution == null) {
            taskExecution = sqlTaskExecutionFactory.create(
                    session,
                    queryContext,
                    taskStateMachine,
                    outputBuffer,
                    taskExchangeClientManager,
                    fragment.get(),
                    sources,
                    tableWriteInfo.get());
            taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
            needsPlan.set(false);
        }
    }

    if (taskExecution != null) {
		// 将sources中包含的所有的Split都合并到taskExecution现在需要处理的Split列表中
        taskExecution.addSources(sources);
    }

	// 返回当前Task的信息
    return getTaskInfo();
}
           

二、更新Task

Coordinator_Only、Single和Fixed类型的Task在调度的时候都是直接创建Task的,而只有Source类型的Task由于其分批调度Splits,因此有可能会多次调用scheduleTask方法,所以会判断在调用scheduleTask方法的时候,对应的Node上是否已经创建了Task:若已经创建,就更新Task;否则就创建Task。因此在Task调度阶段只有Source类型的Task才会被更新。

Client端

在Client端更新Task的过程分为两步:根据taskId查询对应的HttpRemoteTask对象和调用创建HttpRemoteTask对象的addSplits方法。这两步操作均在SqlStageExecution.assignSplits方法中完成。

Resource端

TaskResource类中的createOrUpdateTask接收到更新Task的请求,然后调用类SqlTaskManager中的updateTask方法进行处理。

三、运行Task

每个Task在TaskResource(Worker)端最终是以SqlTaskExecution运行的。

1. TaskExecutor

TaskExecutor是运行在Presto集群的每个Worker上的Presto服务的,它是用于运行实际计算任务的线程池的包装类,该类的主要作用就是处理在Worker上运行的所有Task中的Split。TaskExecutor通过Guice依赖注入SqlTaskManager中,并在每个Worker启动的时候调用其start方法运行。

// TaskExecutor.java
@PostConstruct
public synchronized void start()
{
    for (int i = 0; i < runnerThreads; i++) {
        addRunnerThread();
    }
}
           

addRunnerThread

方法实际是创建TaskExecutor.TaskRunner内部类实例,以处理Split。

private class TaskRunner
        implements Runnable
{
    private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();

    @Override
    public void run()
    {
        try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
			// 只要当前TaskExecutor不结束并且当前线程不被中断,当前线程就一直不停地循环获取waitingSplits中的split进行处理
            while (!closed && !Thread.currentThread().isInterrupted()) {
                // select next worker
                final PrioritizedSplitRunner split;
                try {
                    split = waitingSplits.take();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }

                String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
                try (SetThreadName splitName = new SetThreadName(threadId)) {
                    RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread(), split);
                    runningSplitInfos.add(splitInfo);
                    runningSplits.add(split);

                    ListenableFuture<?> blocked;
                    try {
						// 调用各个Split的process方法,process只会执行固定时间片长度,若过了固定的时间片Split还没有处理完毕,则也会返回
                        blocked = split.process();
                    }
                    finally {
						// 执行完毕之后,需要将Split从runningSplits中移除
                        runningSplitInfos.remove(splitInfo);
                        runningSplits.remove(split);
                    }

                    if (split.isFinished()) {
                        splitFinished(split);
                    }
                    else {
						// blocked.isDone表示本次执行完毕
                        if (blocked.isDone()) {
                            waitingSplits.offer(split);
                        }
                        else {
                            blockedSplits.put(split, blocked);
                            blocked.addListener(() -> {
                                blockedSplits.remove(split);
                                // reset the level priority to prevent previously-blocked splits from starving existing splits
                                split.resetLevelPriority();
                                waitingSplits.offer(split);
                            }, executor);
                        }
                    }
                }
                catch (Throwable t) {
                    splitFinished(split);
                }
            }
        }
        finally {
			// 若代码执行到这里,说明上面的循环结束,即TaskExecutor.TaskRunner线程被中断或TaskExecutor结束。这时首先判断上述循环结束的原因,若TaskExecutor尚未结束则需要添加一个新的Runner继续执行,从而保证TaskExecutor线程池中始终保持固定数目的处理线程
            if (!closed) {
                addRunnerThread();
            }
        }
    }
}
           

3. PrioritizedSplitRunner

所有对Split的处理均由PrioritizedSplitRunner.process完成。

// PrioritizedSplitRunner.java
public ListenableFuture<?> process()
{
	// 调用split的processFor方法进行实际的split的处理
    ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);

    return blocked;
}
           

4. DriverSplitRunner

从上述代码可以看出,Split的处理由方法DriverSplitRunner.processFor完成,而其实际是调用Driver.processFor方法进行处理。

// SqlTaskExecution.DriverSplitRunner
public ListenableFuture<?> processFor(Duration duration)
{
    Driver driver;
    synchronized (this) {
        if (this.driver == null) {
            this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
        }

        driver = this.driver;
    }

    return driver.processFor(duration);
}
           

5. Driver

作用于Split上的一系列操作的封装类为Driver类,其对Split的操作集中在processInternal中。

// Driver.java
public ListenableFuture<?> processFor(Duration duration)
{
	// split最长可执行的时间片
    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);

    Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
        OperationTimer operationTimer = createTimer();
        driverContext.startProcessTimer();
        driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
        try {
            long start = System.nanoTime();
            do {
                ListenableFuture<?> future = processInternal(operationTimer);
                if (!future.isDone()) {
                    return updateDriverBlockedFuture(future);
                }
            }
            while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
        }
        finally {
            driverContext.getYieldSignal().reset();
            driverContext.recordProcessed(operationTimer);
        }
        return NOT_BLOCKED;
    });
    return result.orElse(NOT_BLOCKED);
}

// Driver.java
@GuardedBy("exclusiveLock")
private ListenableFuture<?> processInternal(OperationTimer operationTimer)
{
	// 对于处于SourceStage的Task,若尚有未处理读取的Split,将未读取的Split加入到SourceOperator中
    processNewSources();

    // 如果只有一个operator操作,则执行完它
    if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) {
        Operator rootOperator = activeOperators.get(0);
        rootOperator.finish();
        rootOperator.getOperatorContext().recordFinish(operationTimer);
    }

    boolean movedPage = false;
    if (cachedResult.get().isPresent()) {
        Iterator<Page> remainingPages = cachedResult.get().get();
        Operator outputOperator = activeOperators.get(activeOperators.size() - 1);
        if (remainingPages.hasNext()) {
            Page outputPage = remainingPages.next();
            outputPages.add(outputPage);
            outputOperator.addInput(outputPage);
        }
        else {
            outputOperator.finish();
            outputOperator.getOperatorContext().recordFinish(operationTimer);
        }
    }
    else {
        for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) {
            Operator current = activeOperators.get(i);
            Operator next = activeOperators.get(i + 1);

            // 跳过阻塞的operator
            if (getBlockedFuture(current).isPresent()) {
                continue;
            }

			// 如果当前operator没有结束,而且下一个operator也没有被阻塞且需要输入
            if (!current.isFinished() && !getBlockedFuture(next).isPresent() && next.needsInput()) {
				// 从当前operator中获得output page,然后将该page作为输入,交给下一个operator进行处理
                Page page = current.getOutput();
                current.getOperatorContext().recordGetOutput(operationTimer, page);

                // 对最后一个无输出的operator,我们以缓存为目的保存pages
                if (shouldUseFragmentResultCache() && i == activeOperators.size() - 2 && page != null) {
                    outputPages.add(page);
                }

				// 将获得的output page交给下一个operator进行处理
                if (page != null && page.getPositionCount() != 0) {
                    next.addInput(page);
                    next.getOperatorContext().recordAddInput(operationTimer, page);
                    movedPage = true;
                }

                if (current instanceof SourceOperator) {
                    movedPage = true;
                }
            }

			// 若当前的operator已经完成了,则通知下一个operator:不会再有输入了,需要完成数据处理,并将结果进行刷新
            if (current.isFinished()) {
                next.finish();
                next.getOperatorContext().recordFinish(operationTimer);
            }
        }
    }

	// 从后往前检查每个operator是否已执行完成
    for (int index = activeOperators.size() - 1; index >= 0; index--) {
        if (activeOperators.get(index).isFinished()) {
            boolean outputOperatorFinished = index == activeOperators.size() - 1;
            // close and remove this operator and all source operators
            List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
            Throwable throwable = closeAndDestroyOperators(finishedOperators);
            finishedOperators.clear();
            if (throwable != null) {
                throwIfUnchecked(throwable);
                throw new RuntimeException(throwable);
            }

            if (shouldUseFragmentResultCache() && outputOperatorFinished && !cachedResult.get().isPresent()) {
                checkState(split.get() != null);
                checkState(fragmentResultCacheContext.isPresent());
                fragmentResultCacheContext.get().getFragmentResultCacheManager().put(fragmentResultCacheContext.get().getCanonicalPlanFragment(), split.get(), outputPages);
            }

            // Finish the next operator, which is now the first operator.
            if (!activeOperators.isEmpty()) {
                Operator newRootOperator = activeOperators.get(0);
                newRootOperator.finish();
                newRootOperator.getOperatorContext().recordFinish(operationTimer);
            }
            break;
        }
    }

	// 若所有的operator都已经循环完毕了,但是没有发生Page的移动,我们需要检查是否有operator被block住了
    if (!movedPage) {
        List<Operator> blockedOperators = new ArrayList<>();
        List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
		// 循环所有的operator,并获得每个operator的ListenableFuture对象,isBlocked方法会进行判断:若当前operator已经执行结束,则会返回其是否在等待额外的内存
        for (Operator operator : activeOperators) {
            Optional<ListenableFuture<?>> blocked = getBlockedFuture(operator);
            if (blocked.isPresent()) {
                blockedOperators.add(operator);
                blockedFutures.add(blocked.get());
            }
        }
		// 若确实有operator被阻塞住了
        if (!blockedFutures.isEmpty()) {
			// 任意一个ListenableFuture完成,就会解除当前Driver的阻塞状态
            ListenableFuture<?> blocked = firstFinishedFuture(blockedFutures);
            // driver records serial blocked time
            driverContext.recordBlocked(blocked);
			// 当前Driver添加monitor实时监听是否已经解除阻塞状态
            for (Operator operator : blockedOperators) {
                operator.getOperatorContext().recordBlocked(blocked);
            }
            return blocked;
        }
    }

    return NOT_BLOCKED;
}
           

通过上面的代码我们可以看出:每个Driver中封装了对Split的所有操作,每次执行Split计算的时候,我们都会依次遍历作用于该Split上的所有operator,取出当前operator的output page,然后将该output page作为下一个operator的input page,交给下一个operator进行处理。直到将该Driver封装的所有Operator遍历完毕。