文章目录
- 一、创建Task
-
- Client端
- Resource端
- 二、更新Task
-
- Client端
- Resource端
- 三、运行Task
-
- 1. TaskExecutor
- 3. PrioritizedSplitRunner
- 4. DriverSplitRunner
- 5. Driver
向Presto集群提交的所有查询最终都会转化成为一个个单独的Task在每个Worker节点上执行。每个task的作用就是处理一个或者多个Split,然后将处理的结果输送给下游Stage中的task。
一、创建Task
创建Task主要是调用HttpRemoteTask类的构造方法创建一个对象,并调用其start方法,而start方法最终又会调用封装在HttpRemoteTask内部的httpClient,向特定的Worker Node上的TaskResource服务发起RESTful请求,从而在特定的Worker Node上启动一个对应的SqlTaskExecution进行数据处理和计算。因此创建Task分为两部分:
Client端
在Client端创建Task的过程分为两步:创建HttpRemoteTask对象和调用该对象的start方法。启动Task的主要代码都集中在方法sendUpdate中。
// HttpRemoteTask.java
private synchronized void sendUpdate()
{
TaskStatus taskStatus = getTaskStatus();
// 将当前Task需要处理的所有Split都封装成TaskSource列表
List<TaskSource> sources = getSources();
// 将执行计划序列化以通过http发送给worker节点
Optional<byte[]> fragment = sendPlan.get() ? Optional.of(planFragment.toBytes(planFragmentCodec)) : Optional.empty();
Optional<TableWriteInfo> writeInfo = sendPlan.get() ? Optional.of(tableWriteInfo) : Optional.empty();
// 使用TaskSource列表、outputBuffers创建request请求
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
sources,
outputBuffers.get(),
writeInfo);
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);
// 生成一个Post请求
HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = setContentTypeHeaders(isBinaryTransportEnabled, preparePost())
.setUri(uriBuilder.build())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
.build();
ResponseHandler responseHandler;
if (isBinaryTransportEnabled) {
responseHandler = createFullSmileResponseHandler((SmileCodec<TaskInfo>) taskInfoCodec);
}
else {
responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(taskInfoCodec));
}
updateErrorTracker.startRequest();
ListenableFuture<BaseResponse<TaskInfo>> future = httpClient.executeAsync(request, responseHandler);
currentRequest = future;
Futures.addCallback(
future,
new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR),
executor);
}
需要将当前Task处理的数据都封装成TaskSource的列表,其中一个TaskSource代表一个Task处理的数据源,而Task处理的数据源又分为两类:Stage的输出和直接的数据源。对于Stage输出类型的数据源,TaskSource类封装了一个Stage的PlannodeId和根据该Stage上的TaskLocation生成的ScheduledSplit列表;对于直接的数据源,TaskSource类封装了一个数据源的PlannodeId和根据该数据源上的所有数据分片生成的ScheduledSplit列表。方法getSources会将两种数据源都封装成TaskSource,然后合并到一起。
Resource端
Coordinator向特定的Worker节点发送请求以启动一个SqlTaskExecution对象,用于执行Task计算任务,请求的处理均由类TaskResource完成。
// TaskResource.java
@POST
@Path("{taskId}")
@Consumes({APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
@Produces({APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());
TaskInfo taskInfo = taskManager.updateTask(session,
taskId,
taskUpdateRequest.getFragment().map(planFragmentCodec::fromBytes),
taskUpdateRequest.getSources(),
taskUpdateRequest.getOutputIds(),
taskUpdateRequest.getTableWriteInfo());
if (shouldSummarize(uriInfo)) {
taskInfo = taskInfo.summarize();
}
return Response.ok().entity(taskInfo).build();
}
对新建Task的RESTful的处理主要是由方法taskManager.updateTask来完成的,其实就是调用类SqlTaskManager中的updateTask方法。
// SqlTaskManager.java
@Override
public TaskInfo updateTask(...)
{
// tasks是一个全局缓存,根据taskId获得已经缓存的SqlTask,若没有与之对应的SqlTask,则会新建一个
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo);
}
上面的代码是通过tasks.getUnchecked(taskId)获得SqlTask对象的,其实tasks是一个com.google.common.cache.LoadingCache对象,因此,若根据taskId第一次获得对应的SqlTask对象是需要创建该SqlTask对象的,而怎么创建SqlTask对象,是在LoadingCache对象初始化的时候就已经指定了的。
// SqlTaskManager.java
tasks = CacheBuilder.newBuilder().build(CacheLoader.from(
taskId -> createSqlTask(
taskId,
locationFactory.createLocalTaskLocation(taskId),
nodeInfo.getNodeId(),
queryContexts.getUnchecked(taskId.getQueryId()),
sqlTaskExecutionFactory,
exchangeClientSupplier,
taskNotificationExecutor,
sqlTask -> {
finishedTaskStats.merge(sqlTask.getIoStats());
return null;
},
maxBufferSize,
failedTasks,
spoolingOutputBufferFactory)));
可以看出,每次taskId对应的SqlTask对象不存在的时候,都会重新创建一个。获得了SqlTask对象之后就会调用sqlTask.updateTask方法启动Task,进行计算。
// SqlTask.java
// 更新需要处理的Split列表
public TaskInfo updateTask(
Session session,
Optional<PlanFragment> fragment,
List<TaskSource> sources,
OutputBuffers outputBuffers,
Optional<TableWriteInfo> tableWriteInfo)
{
outputBuffer.setOutputBuffers(outputBuffers);
// 确定该task execution只会被创建一次
SqlTaskExecution taskExecution;
synchronized (this) {
// 如果task已经执行完成?
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return taskHolder.getFinalTaskInfo();
}
taskExecution = taskHolder.getTaskExecution();
if (taskExecution == null) {
taskExecution = sqlTaskExecutionFactory.create(
session,
queryContext,
taskStateMachine,
outputBuffer,
taskExchangeClientManager,
fragment.get(),
sources,
tableWriteInfo.get());
taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
needsPlan.set(false);
}
}
if (taskExecution != null) {
// 将sources中包含的所有的Split都合并到taskExecution现在需要处理的Split列表中
taskExecution.addSources(sources);
}
// 返回当前Task的信息
return getTaskInfo();
}
二、更新Task
Coordinator_Only、Single和Fixed类型的Task在调度的时候都是直接创建Task的,而只有Source类型的Task由于其分批调度Splits,因此有可能会多次调用scheduleTask方法,所以会判断在调用scheduleTask方法的时候,对应的Node上是否已经创建了Task:若已经创建,就更新Task;否则就创建Task。因此在Task调度阶段只有Source类型的Task才会被更新。
Client端
在Client端更新Task的过程分为两步:根据taskId查询对应的HttpRemoteTask对象和调用创建HttpRemoteTask对象的addSplits方法。这两步操作均在SqlStageExecution.assignSplits方法中完成。
Resource端
TaskResource类中的createOrUpdateTask接收到更新Task的请求,然后调用类SqlTaskManager中的updateTask方法进行处理。
三、运行Task
每个Task在TaskResource(Worker)端最终是以SqlTaskExecution运行的。
1. TaskExecutor
TaskExecutor是运行在Presto集群的每个Worker上的Presto服务的,它是用于运行实际计算任务的线程池的包装类,该类的主要作用就是处理在Worker上运行的所有Task中的Split。TaskExecutor通过Guice依赖注入SqlTaskManager中,并在每个Worker启动的时候调用其start方法运行。
// TaskExecutor.java
@PostConstruct
public synchronized void start()
{
for (int i = 0; i < runnerThreads; i++) {
addRunnerThread();
}
}
addRunnerThread
方法实际是创建TaskExecutor.TaskRunner内部类实例,以处理Split。
private class TaskRunner
implements Runnable
{
private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
// 只要当前TaskExecutor不结束并且当前线程不被中断,当前线程就一直不停地循环获取waitingSplits中的split进行处理
while (!closed && !Thread.currentThread().isInterrupted()) {
// select next worker
final PrioritizedSplitRunner split;
try {
split = waitingSplits.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread(), split);
runningSplitInfos.add(splitInfo);
runningSplits.add(split);
ListenableFuture<?> blocked;
try {
// 调用各个Split的process方法,process只会执行固定时间片长度,若过了固定的时间片Split还没有处理完毕,则也会返回
blocked = split.process();
}
finally {
// 执行完毕之后,需要将Split从runningSplits中移除
runningSplitInfos.remove(splitInfo);
runningSplits.remove(split);
}
if (split.isFinished()) {
splitFinished(split);
}
else {
// blocked.isDone表示本次执行完毕
if (blocked.isDone()) {
waitingSplits.offer(split);
}
else {
blockedSplits.put(split, blocked);
blocked.addListener(() -> {
blockedSplits.remove(split);
// reset the level priority to prevent previously-blocked splits from starving existing splits
split.resetLevelPriority();
waitingSplits.offer(split);
}, executor);
}
}
}
catch (Throwable t) {
splitFinished(split);
}
}
}
finally {
// 若代码执行到这里,说明上面的循环结束,即TaskExecutor.TaskRunner线程被中断或TaskExecutor结束。这时首先判断上述循环结束的原因,若TaskExecutor尚未结束则需要添加一个新的Runner继续执行,从而保证TaskExecutor线程池中始终保持固定数目的处理线程
if (!closed) {
addRunnerThread();
}
}
}
}
3. PrioritizedSplitRunner
所有对Split的处理均由PrioritizedSplitRunner.process完成。
// PrioritizedSplitRunner.java
public ListenableFuture<?> process()
{
// 调用split的processFor方法进行实际的split的处理
ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);
return blocked;
}
4. DriverSplitRunner
从上述代码可以看出,Split的处理由方法DriverSplitRunner.processFor完成,而其实际是调用Driver.processFor方法进行处理。
// SqlTaskExecution.DriverSplitRunner
public ListenableFuture<?> processFor(Duration duration)
{
Driver driver;
synchronized (this) {
if (this.driver == null) {
this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
}
driver = this.driver;
}
return driver.processFor(duration);
}
5. Driver
作用于Split上的一系列操作的封装类为Driver类,其对Split的操作集中在processInternal中。
// Driver.java
public ListenableFuture<?> processFor(Duration duration)
{
// split最长可执行的时间片
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
OperationTimer operationTimer = createTimer();
driverContext.startProcessTimer();
driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
try {
long start = System.nanoTime();
do {
ListenableFuture<?> future = processInternal(operationTimer);
if (!future.isDone()) {
return updateDriverBlockedFuture(future);
}
}
while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
}
finally {
driverContext.getYieldSignal().reset();
driverContext.recordProcessed(operationTimer);
}
return NOT_BLOCKED;
});
return result.orElse(NOT_BLOCKED);
}
// Driver.java
@GuardedBy("exclusiveLock")
private ListenableFuture<?> processInternal(OperationTimer operationTimer)
{
// 对于处于SourceStage的Task,若尚有未处理读取的Split,将未读取的Split加入到SourceOperator中
processNewSources();
// 如果只有一个operator操作,则执行完它
if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) {
Operator rootOperator = activeOperators.get(0);
rootOperator.finish();
rootOperator.getOperatorContext().recordFinish(operationTimer);
}
boolean movedPage = false;
if (cachedResult.get().isPresent()) {
Iterator<Page> remainingPages = cachedResult.get().get();
Operator outputOperator = activeOperators.get(activeOperators.size() - 1);
if (remainingPages.hasNext()) {
Page outputPage = remainingPages.next();
outputPages.add(outputPage);
outputOperator.addInput(outputPage);
}
else {
outputOperator.finish();
outputOperator.getOperatorContext().recordFinish(operationTimer);
}
}
else {
for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) {
Operator current = activeOperators.get(i);
Operator next = activeOperators.get(i + 1);
// 跳过阻塞的operator
if (getBlockedFuture(current).isPresent()) {
continue;
}
// 如果当前operator没有结束,而且下一个operator也没有被阻塞且需要输入
if (!current.isFinished() && !getBlockedFuture(next).isPresent() && next.needsInput()) {
// 从当前operator中获得output page,然后将该page作为输入,交给下一个operator进行处理
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(operationTimer, page);
// 对最后一个无输出的operator,我们以缓存为目的保存pages
if (shouldUseFragmentResultCache() && i == activeOperators.size() - 2 && page != null) {
outputPages.add(page);
}
// 将获得的output page交给下一个operator进行处理
if (page != null && page.getPositionCount() != 0) {
next.addInput(page);
next.getOperatorContext().recordAddInput(operationTimer, page);
movedPage = true;
}
if (current instanceof SourceOperator) {
movedPage = true;
}
}
// 若当前的operator已经完成了,则通知下一个operator:不会再有输入了,需要完成数据处理,并将结果进行刷新
if (current.isFinished()) {
next.finish();
next.getOperatorContext().recordFinish(operationTimer);
}
}
}
// 从后往前检查每个operator是否已执行完成
for (int index = activeOperators.size() - 1; index >= 0; index--) {
if (activeOperators.get(index).isFinished()) {
boolean outputOperatorFinished = index == activeOperators.size() - 1;
// close and remove this operator and all source operators
List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
Throwable throwable = closeAndDestroyOperators(finishedOperators);
finishedOperators.clear();
if (throwable != null) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
if (shouldUseFragmentResultCache() && outputOperatorFinished && !cachedResult.get().isPresent()) {
checkState(split.get() != null);
checkState(fragmentResultCacheContext.isPresent());
fragmentResultCacheContext.get().getFragmentResultCacheManager().put(fragmentResultCacheContext.get().getCanonicalPlanFragment(), split.get(), outputPages);
}
// Finish the next operator, which is now the first operator.
if (!activeOperators.isEmpty()) {
Operator newRootOperator = activeOperators.get(0);
newRootOperator.finish();
newRootOperator.getOperatorContext().recordFinish(operationTimer);
}
break;
}
}
// 若所有的operator都已经循环完毕了,但是没有发生Page的移动,我们需要检查是否有operator被block住了
if (!movedPage) {
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
// 循环所有的operator,并获得每个operator的ListenableFuture对象,isBlocked方法会进行判断:若当前operator已经执行结束,则会返回其是否在等待额外的内存
for (Operator operator : activeOperators) {
Optional<ListenableFuture<?>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
}
// 若确实有operator被阻塞住了
if (!blockedFutures.isEmpty()) {
// 任意一个ListenableFuture完成,就会解除当前Driver的阻塞状态
ListenableFuture<?> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
driverContext.recordBlocked(blocked);
// 当前Driver添加monitor实时监听是否已经解除阻塞状态
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
}
}
return NOT_BLOCKED;
}
通过上面的代码我们可以看出:每个Driver中封装了对Split的所有操作,每次执行Split计算的时候,我们都会依次遍历作用于该Split上的所有operator,取出当前operator的output page,然后将该output page作为下一个operator的input page,交给下一个operator进行处理。直到将该Driver封装的所有Operator遍历完毕。