通过上文分析,我们已经知道了 DDP 的基本架构和如何初始化,本文就看看其核心 Reducer 的静态架构。
目录
[源码解析] PyTorch 分布式(10)------DistributedDataParallel之Reducer静态架构
0x00 摘要
0x01 引论1.1 调用
0x02 Reducer 定义
0x03 Bucket
3.1 设计
3.2 定义
3.2.1 BucketReplica有几个
3.2.2 关键
3.2.3 具体定义
3.3 设置
0x03 BucketReplica
3.1 Views
3.3 初始化
0x04 查询类
4.1 VariableIndex
4.1.1 成员变量
4.1.2 定义
4.2 VariableLocator
4.2.1 定义
4.2.2 成员变量
4.2.2.1 初始化
4.2.2.2 使用
0x05 累积相关类
5.1 grad_accumulators_
5.1.1 初始化
5.1.2 使用
5.2 gradAccToVariableMap_
5.2.1 初始化
5.2.2 使用
5.3 numGradHooksTriggeredMap_
5.3.1 初始化
5.3.2 使用
5.4 numGradHooksTriggeredMapPerIteration_5.4.1 使用
5.5 perIterationReadyParams_
5.5.1 设置
5.5.2 重置
5.5.3 使用
5.6 使用过的参数
5.6.1 论文
5.6.2 初始化
5.6.3 重置
5.6.4 设置
5.6.5 使用
5.7 计算梯度支撑类
5.7.1 RpcContext
5.7.2 hooks_
5.7.3 comm_hook_
5.7.3.1 概念
5.7.3.2 使用
5.7.4 runGradCallbackForVariable
5.7.4.1 Reducer
5.7.4.2 DistAutogradContext
0xFF 参考
通过上文分析,我们已经知道了 DDP 的基本架构和如何初始化,本文就看看其核心 Reducer 的静态架构。Reducer提供了反向传播中梯度同步的核心实现。
本系列其他文章如下:
深度学习利器之自动微分(1)
深度学习利器之自动微分(2)
[源码解析]深度学习利器之自动微分(3) --- 示例解读
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构
[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑
[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法
[源码解析] PyTorch 分布式(1)------历史和概述
[源码解析] PyTorch 分布式(2) ----- DataParallel(上)
[源码解析] PyTorch 分布式(3) ----- DataParallel(下)
[源码解析] PyTorch 分布式(4)------分布式应用基础概念
[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用
[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store
[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组
[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇
[源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化
Reducer 的创建代码如下,是在_ddp_init_helper 之中。
调用的 parameters 举例如下, parameters[0] 就是 rank 0 上模型的 parameters,可以看到其只有 [0] 元素有意义,这个 [0] 原始本身包括 20 个元素:
bucket_indices 举例如下:
关于 tensor indices,就是给所有的tensor一个index,从0开始递增,一直到 tensors.size()。假如模型的 parameters 一共有20个张量,则 tensor index 从 0 到 19,分成 6 个buckets,则在这6个buckets之中,每个 tensor index 都是唯一不重复的。
python代码无意义,我们只能看看C++。
于是我们来到了 torch/lib/c10d/reducer.h 和 torch/lib/c10d/reducer.cpp。
Reducer提供了反向传播中梯度同步的核心实现,其定义相当复杂,我们甚至需要去掉一些不重要的成员变量以便展示:
Reducer 的关键成员变量如下。
我们接下来一一分析这些成员变量。
在规约梯度之前将梯度批处理在一起可以降低开销和/或加快完成时间。但是只能对同一设备上相同类型的梯度进行批处理。
桶是梯度的集合,统一设备上相同类型的梯度被放到同一个桶之中。在代码之中,Bucket 就是桶的概念。
在每次向后传播中,将所有参数梯度中的张量复制到桶中,并在AllReduce之后将平均梯度复制回桶中。为了加速复制操作,存储桶始终与参数在同一设备上创建。如果模型跨越多个设备,DDP会考虑设备关联性,以确保同一存储桶中的所有参数都位于同一设备上。AllReduce的顺序也会对结果产生影响,因为它决定了多少通信可以与计算重叠。DDP按model.parameters()的相反顺序启动AllReduce。
为了更好的说明,我们首先要分析一下 BucketReplica 是什么。我们从注释出发看看。
首先,一个桶 Bucket 有多个BucketReplica,每一个模型对应一个BucketReplica。
但是只用了一个 [0] 元素,因为目前不支持单进程多设备模式,所以假定桶里只有一个replica。
再结合前文代码,未来不会支持 SPMD。parameters 就是 [ToyModel] 这个模型列表的参数集合,parameters[0] 就是 ToyModel 的参数。
综合以上我们知道:
DDP 原来是希望像 DP 那样支持 SPMD,所以本进程就需要维护多个 GPU 之上的多个模型副本的参数,即,parameters 就是一个数组,数组中每个元素是一个模型副本的参数。
parameters 被赋值为 <code>Reducer.replicas_</code>,而 <code>Reducer.replicas_</code> 用来赋值给 bucket.replicas。
因为未来不支持Reducer.replicas_,所以只有 parameters[0] 有意义。
所以我们得出结论:
BucketReplica 就是一个模型的待求梯度参数组。replica 对应一个 device (GPU)上的模型副本的参数信息(部分),即,一个 replica 代表了 [1..N] 个需要被规约的梯度,这些梯度拥有同样的 dtype,位于同样的设备上。
事实上,只有 bucket.replicas[0] 有意义,就对应了上面代码中的 [self.module] 之中的部分需求导张量,就是 parameters[0] 。
我们再总结一下 Bucket 的关键:
replicas 成员变量就是 bucket 对应的各个BucketReplica。一个 BucketReplica 代表了 [1..N] 个需要被规约的梯度,这些梯度拥有同样的 dtype,位于同样的设备上。
只有 bucket.replicas[0] 有意义,就对应了本模型的待求梯度参数组之中本bucket对应的张量。
如何赋值?就是使用 Reducer.replicas_ 来赋值,而 replicas_ 就是参数 parameters。我们下面就会介绍。
variable_indices 成员变量用来记录本桶之中有哪些variable 的index。
如何赋值?使用前面介绍的 bucket_indices 进行赋值。
如何使用?intra_bucket_index 是bucket.variable_indices的序号,利用序号得到真正的variable index。后文会依据代码再进行阐释。
最后,Bucket 具体定义如下:
Reducer 的成员变量buckets_ 是关键,这是Reducer 之中所有的桶。
在初始化函数中有如何初始化 buckets_,核心是:
找到本bucket在 bucket_indices 之中的 index。
在 parameters 之中找到 index 对应的张量。
在 BucketReplica 之中配置这些张量,就是本bucket应该规约的张量。
用图例表示如下,这里假设 bucket index 是 1,即第 2 个桶,所以 variable_indices 对应了 bucket_indices 中的相应部分。比如 BucketReplica[0] 里面是 Tensor 4,5,6,而variable_indices就是 Tensor 4,5,6 分别的 index。
下图中的 bucket_indices 是 Reducer 构造函数的参数之一。
如前面讨论的,一个 BucketReplica 代表了 [1..N] 个需要被规约的梯度,这些梯度拥有同样的 dtype,位于同样的设备上。是一个模型待求梯度参数的一部分,具体是哪些,由 bucket 的 variable_indices 决定。
其关键成员变量为:
<code>std::vector<at::Tensor> variables</code> 是构成此bucket副本的variable。我们在这里使用refcounted value,这样我们就可以在完成规约之后,轻松地将bucket内容 unflatten 到参与变量中。
<code>at::Tensor contents</code> :把桶的内容展平的结果,即Flattened (1 dimensional) 之后的结果。
<code>std::vector<at::Tensor> bucket_views_in</code> :提供了从输入角度在 contents 之中查看具体梯度的方法。
<code>std::vector<at::Tensor> bucket_views_out</code> :提供了从输出角度在 contents 之中查看具体梯度的方法。
具体可以参见如下注释:
关于 <code>std::vector<at::Tensor> bucket_views_in</code> 和 <code>std::vector<at::Tensor> bucket_views_out</code> 的进一步说明:
在 PyTorch 之中,视图是指创建一个方便查看的东西,视图与原数据共享内存,它只是将原有的数据进行整理,直接显示其中部分内容或者进行重排序后再显示出来。
每个 view 都将按照如下布局(sizes + strides)创建,这个布局与grad的预期布局相匹配。
bucket_views_in 和 bucket_views_out 这两个变量提供在 contents 之中操作具体梯度的方法,或者说,它们提供了视图(views),该视图可以操作contents 之中每个张量的梯度。用户把这两个变量作为入口点来把每个梯度的数据从 content 之中移入和移出。
我们为<code>bucket_</code>视图保留两种状态的原因是:如果注册了DDP通信钩子(communication hook), <code>bucket_views_out</code> 可以用钩子的 <code>future_work</code>值重新初始化。所以我们需要为<code>bucket_views_in[i].copy_(grad)</code> 保留一个对 replica 原始 contents 的单独视图引用。
<code>bucket_views_in[i].copy_(grad)</code>和 <code>grad.copy_(bucket_views_out[i])</code> 提供了将梯度数据移入/移出contents的方便方法。
另外,以下三个成员变量存储桶的每个flat张量信息,比如offsets存储了各个张量在flat bucket contents中的offset。
BucketReplica 具体定义为:
目前为止,逻辑如下,如前所述,每个bucket只有 replicas[0] 有意义。
部分初始化的代码在 Reducer::initialize_buckets 之中。
initialize_bucket_views 具体代码如下,这里需要对几个 PyTorch 函数进行说明。
as_strided :依据现有tensor以及给定的步长来创建一个视图(类型仍然为tensor),与原数据共享内存,不存储诗句,所以两个view都不是真实的存储,只是视图。
narrow :返回一个新的张量,其是原来张量的缩小版。
initialize_bucket_views 主要逻辑是:
遍历replica的张量,针对每一个张量,依据其是dense还是sparse进行不同处理,最后插入到replica.bucket_views_in之中。
把 replica.bucket_views_out 设置为 replica.bucket_views_in,正常应该是相等的。
如果<code>gradient_as_bucket_view_</code>设置为true,则需要处理两种情况:
当调用 rebuild_buckets 重建 bucket时,initialize_bucket_view 可以在initialize_bucket内调用,如果grad在上一次迭代中已经定义/计算过,则需要将旧的grad复制到新的bucket_view中,并让grad指向新的bucket_view。
initialize_bucket_view 也可以在构建时候在 initialize_bucket 内调用。在构建时间内不会定义 Grad,
在这种情况下,不要让梯度指向bucket_view,因为对于全局未使用的参数,梯度应保持为未定义。
具体代码如下:
具体如下图:
另外,mark_variable_ready_sparse, mark_variable_ready_dense, finalize_backward 都有对 contents 赋值。
以下两个类用来让 autograd hook 函数确定张量对应桶。
VariableIndex 就是确定某个 tensor 在某个桶中的位置。这个对于 autograd hook 有用。对于autograd hook 回调,回调函数所在进程只是知道自己的梯度张量,但是回调函数需要知道这个张量位于哪个replica,以及位于replica之中哪个位置,这样才能进一步规约。
Reducer 等类的实例之中,只有一个 VariableIndex 的成员变量,这个独立成员变量是:
VariableIndex 更多是作为其他成员变量的一部分或者参数存在,比如在 Reducer 之中,gradAccToVariableMap_ 就是使用了 VaribaleIndex。
VariableIndex 定义如下:
在 Reducer 的构造函数中,有如下代码用于autogrid_hook的设定,这是给每个 replica 上的每个张量设置了一个 hook。如果autograd hook 不知道此梯度对应哪个 bucket,就无法告诉 DDP,这个 bucket 整体ready了。
如何找到桶?需要使用下面的 VariableLocator。
VariableLocator 用来在 bucket 之中确定一个varaible。为了找到一个张量位置,我们需要知道在哪个桶,在桶的张量之中的哪个位置。
哪个桶 : <code>bucket_index</code> 是<code>Reducer.buckets_</code>列表的位置,表示 <code>buckets_</code> 之上的一个bucket。
桶副本的哪个位置 : <code>intra_bucket_index</code> 是在 bucket.replica 之中 vector 域的 variable index。
Reducer 的成员变量为:
如何初始化?
问题:variable_locators_[variable_index] 在不同的桶之间,不会重复吗?不会,因为 VariableLocator(bucket_index, intra_bucket_index++) 从定义上看,bucket_index 和 intra_bucket_index 的组合是唯一的。
我们给出一个例子。关于 tensor indices,就是给所有的tensor一个index,从0开始递增,一直到 tensors.size()。假如模型的 parameters 一共有12个张量,则 tensor index 从 0 到 11。假如分成 6 个buckets,则在这6个buckets之中,每个 tensor index 都是唯一不重复的。
这样,对应的 variable_locators_ 是:
如何使用?我们用下面做为例子。
当 autograd hook 调用时候,使用 VariableIndex index 来回调,
autograd_hook 最终调用到 mark_variable_ready_dense,这里进而通过 variable_locators_ 来确定桶,然后进行后续操作。
以下是梯度累积相关类。
grad_accumulators_ 可以认为是一个矩阵,矩阵的每个item就是一个 AccumulateGrad(Node类型),就是用来计算梯度的。目前看来,这里只是一个bookkeeping作用。
具体如下图,variable1 是一个实际的 张量,grad_accumulators_ 中的一个item 就指向 variable1 的 AccumulateGrad。
如何初始化?在 Reducer 构建函数之中有:
grad_accumulator 返回的是 Node,也就是 AccumulateGrad,是一个Node类型,我们取出了检查校验代码。
gradAccToVariableMap_ 的定义如下:
作用是给每个 Node 一个对应的VariableIndex,具体如图,下面就给 variable 1 一个 index 1:
如何初始化?在 Reducer 构造函数中有如下,就是给每个需要求导的 Varaible 一个VariableIndex。
gradAccToVariableMap_ 的使用如下,search_unused_parameters 就是遍历查找 <code>gradAccToVariableMap_</code>,如果某一个accumulator 函数没有在 <code>gradAccToVariableMap_</code> 里面,就说明不用计算梯度。
记录在本张量的梯度就绪之前,该张量的 autograd_hook 应该被调用几次。第一次迭代之后,不再增加,所以这个数值应该就是1或者0。用来设置 unused_parameters_ 和 配置 numGradHooksTriggeredMapPerIteration_。
如何初始化?在构建函数之中有:
第一次迭代之后,后续调用 autogrid_hook 就递增加一。
如何使用?这里会reset。
这里也会进行处理。如果为0,则插入unused_parameters_。
在本张量的梯度就绪之前,该张量的 autograd_hook 还需要被调用几次。如果为0,就说明这个桶应该整体就绪了。
本成员变量是使用 numGradHooksTriggeredMap_ 来重置。
如何使用?在静态图情况下,如果不是第一次迭代(此时刚刚产生梯度),就会把 <code>numGradHooksTriggeredMapPerIteration_[index]</code> 递减,如果为0,就说明该变量就绪,可以进行集合操作梯度规约了。
当新一次迭代时候,会重置这个值,prepare_for_backward 会调用到 reset_bucket_counting。
而且是使用 numGradHooksTriggeredMap_ 来重置。
具体逻辑我们展示一下:
对于 张量 2,就没有使用过,所以 delay_all_reduce 方法 之中直接放入到未使用参数。
对于 张量 1:
numGradHooksTriggeredMap_ 初始化是 0。
第一次迭代之后变成 1。
后向传播时候,调用 prepare_for_backward 和 reset_bucket_counting,把 <code>numGradHooksTriggeredMap_</code>赋值给 <code>numGradHooksTriggeredMapPerIteration_</code>。
autograd_hook 之中会递减,然后如果是 0,就设置此变量为 ready,可以规约了。
每个迭代之中,perIterationReadyParams_ 表示就绪的参数。
就是如果某个variable是就绪状态,就插入到 perIterationReadyParams_。
在反向传播之前,会重置这个变量。
就是遍历perIterationReadyParams_,如果没找到,就返回。
在 rebuild_buckets 方法中会调用 ensure_prior_reduction_finished,里面会调用这两个方法来校验。
以下两个变量用来记录本地使用过的参数,其标示在未启用同步的情况下(no_sync is on),在当前迭代或者 no_sync session 之中,这些参数是否在本地被使用过。
每个模型副本对应map中的一个张量,每个张量是参数数量的一维int32(one-dim int32)张量。
这些张量在autograd_hook中标记,以指示已使用了相应的参数。这些张量会在当前迭代或无同步会话(no_sync session)的后向传播结束时进行allreduce,以计算出全局未使用的参数。
此处可以结合论文看看。
全局未使用参数(Globally Unused Parameters)的梯度在向前和向后过程中应保持不变。检测未使用的参数需要全局信息,因为在一个DDP过程中,一个参数可能在一次操作中不存在,但可能在另一个过程的同一次迭代中参与训练。因此DDP在位图中维护本地未使用的参数信息,并启动额外的AllReduce以收集全局位图。由于位图比张量尺寸小得多,因此模型中的所有参数共享同一位图,而不是创建每桶位图(per-bucket bitmaps)。位图位于CPU上,以避免为每次更新启动专用CUDA内核。但是,某些ProcessGroup后端可能无法在CPU 张量上运行AllReduce。例如,ProcessGroupNCCL仅支持CUDA张量。此外,由于DDP应该与任何定制的ProcessGroup后端一起工作,它不能假设所有后端都支持CPU张量。为了解决这个问题,DDP在同一设备上维护另一个位图作为第一个模型参数,并调用非阻塞拷贝操作(non-blocking copy)将CPU位图移动到设备位图以进行集合通信。
初始化函数如下:
finalize_bucket_dense 和 finalize_backward 都会重置。
autograd_hook 之中如果使用了,就设置为1
在 mark_variable_ready 时候会调用到 all_reduce_local_used_map,如果需要同步,这里进行同步。我们还是翻译一下注释:
DDP 用异步H2D来避免阻塞开销。异步复制和allreduce 会着眼于当前流,因此将正确排序。
关于主机操作的正确顺序也很重要。H2D <code>copy_</code> 是按流排序的,而主机对 <code>local_used_maps_</code> 的更改是按主机排序的。
如果大量积压的cuda流工作将 copy_ 操作推迟到将来,并且如果从现在到finalize_backward 之间没有发生阻塞调用,那么finalize_backward 会在流执行复制之前将主机上使用的本地映射重新归零,在这种情况下,copy_会读取到这些零,而不是我们在这里告诉它读取的值。
将 local_used_maps_[i] 复制到pinned临时内存(固定的缓存分配器应该异步提供)可以避免这种恶劣的、罕见的争用情况。
在希望使用所有参数的情况下,从现在到重新调零,DDP本身不会做任何阻塞工作,因此这种危险情况是真实存在的。
所以,Reducer 采用防御性操作,以确保 local_used_maps_tmp 与local_used_maps_[i] 不同。
我们接下来分析一些计算梯度所涉及到的基本函数和支撑类。
该类用来封装 distributed::autograd::ContextPtr。
其作用就是保持了 autograd hook,也是起到了bookkeeping 作用。
初始化如下:
我们通过 [DDP Communication Hook] 来看看概念。
DDP通信钩子是一种增强功能,它提供了一个钩子,其可用于覆盖DDP来进行跨rank梯度通信,这可用于梯度压缩/GossipGrad等算法。可以使用Python API <code>register_comm_hook</code>来注册钩子函数。
如果未注册DDP通信钩子(DDP communication hook),则reducer只需调用allreduce即可对桶进行规约。如果注册了,则会调用钩子并使用future work handle来处理。如果注册,reducer也会跳过"将梯度除以世界大小(world size)" 这个步骤。这样做的目的是:通信钩子可以完全覆盖我们执行通信的方式,用户可以完全控制如何处理梯度。
<code>PythonCommHook</code>是<code>CommHookInterface</code>的子类,其可以注册一个 Python 钩子。此外,还有一些内置的C++钩子实现,可以通过调用Python API <code>register_builtin_comm_hook</code>来指定。
我们通过 torch/distributed/algorithms/ddp_comm_hooks/default_hooks.py 来看看。
下面的 hook 就是在 all-reduce 前后进行自己的特殊处理。如果使用这个 hook,就使用 ddp_model.register_comm_hook(process_group, fp16_compress_hook)。
mark_variable_ready_dense 函数会调用到 runGradCallbackForVariable。
Reducer的runGradCallbackForVariable如下,其调用 distributed::autograd::ContextPtr.runGradCallbackForVariable 来处理。
我们顺着来到 DistAutogradContext。
它会在累积的梯度之中,在 accumulatedGrads_ 之中找到张量 对应的梯度 grad,然后用传入的回调函数来处理梯度grad,最后把处理后的梯度拷贝回accumulatedGrads_。这样就从 hook获取梯度 开始,到传回规约之后的梯度结束,完成了一个闭环。
DistAutogradContext 的 accumulatedGrads_会记录张量对应的当前梯度。
至此,我们初步介绍了一些基本类,下一章继续介绍(是在是太多了......)。
pytorch分布式系列3——分布式训练时,torch.utils.data.distributed.DistributedSampler做了什么?
pytorch分布式系列1——搞清torch.distributed.launch相关的环境变量
pytorch分布式系列2——DistributedDataParallel是如何做同步的?
pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel
Pytorch的nn.DataParallel
https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20
https://pytorch.org/docs/stable/distributed.html
PyTorch 源码解读之分布式训练了解一下?
实操教程|PyTorch AutoGrad C++层实现
PYTORCH 自动微分(一)
PyTorch如何加速数据并行训练?分布式秘籍大揭秘
pytorch分布式训练(二init_process_group)
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
https://pytorch.org/docs/master/notes/ddp.html
https://pytorch.org/tutorials/intermediate/dist_tuto.html
PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析
Pytorch模型中的parameter与buffer