天天看點

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

作者:OneFlow
OneFlow源碼解析:Eager模式下的裝置管理與并發執行

作者|鄭建華

更新|趙露陽

通過這篇筆記,希望能初步了解 OneFlow 在 Eager 模式下對裝置的管理方式、裝置執行計算的過程以及如何充分利用裝置計算能力。這裡的裝置主要指類似 CUDA 這樣的并行計算加速裝置。

1

裝置、流相關類型及關系

架構通過流(Stream)向裝置(Device)送出計算任務。一個 Stream 是一個指令序列,可以類比 CUDA Stream(https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#streams),或者 CPU Thread 的指令序列。同一個 Stream 中的指令按順序執行;不同 Stream 之間的指令有依賴關系時,需要同步。不同的任務,比如 kernel 計算、host2device、device2host 等都有自己獨立的 Stream,可以并發執行,進而在 Eager 模式下盡可能充分利用裝置的異步并發執行能力。

OneFlow 中Device和Stream相關的部分類結構如下所示:

Device相關類型

oneflow::Device

oneflow::Device是用于表示裝置的基礎類型,例如:建構tensor時 flow.tensor(shape, device="cuda:1")就會在内部構造出這個基礎的Device類型,其中裝置編号為1、裝置類型為CUDA。

oneflow/core/framework/device.h:

class Device final {
 public:
  ...

 private:
  Device(const std::string& type, int64_t device_id);
  Maybe<void> Init();

  const std::string type_;
  DeviceType enum_type_;
  const int64_t device_id_;
  const size_t hash_value_;
  std::shared_ptr<MemoryCase> mem_case_;
};           

oneflow::Device中最重要的兩個成員變量分别是用于表示裝置類型的DeviceType;用于表示裝置編号的device_id_。

DeviceType

DeviceType是一個枚舉類,不同的值代表不同的計算裝置類型,其定義位于 oneflow/core/common/device_type.proto:

enum DeviceType {
  kInvalidDevice = 0;  // 無效裝置
  kCPU = 1;            // cpu裝置
  kCUDA = 2;           // cuda裝置
  kMockDevice = 3;     // pseudo device for test.
}           

目前在oneflow master分支中,主要有kCPU表示cpu裝置;kCUDA表示nvidia cuda裝置;在其他多裝置支援的分支中,這裡還增加了更多的裝置類型。

oneflow::ep::Device

oneflow::Device是oneflow中對裝置的基礎封裝類型,而oneflow::ep::Device則是一個抽象類,屬于oneflow的ep子產品(execution provider),是對裝置行為的封裝,ep子產品為多硬體裝置提供了更高層次的抽象,友善oneflow支援和相容多硬體裝置提供了更高的靈活性和可拓展性。

oneflow::ep::Device不僅提供了表示裝置類型的device_type()方法、表示裝置編号的device_index()方法,還提供了建立/銷毀ep::Stream、建立/銷毀Event、在裝置上申請/釋放記憶體的各種方法。

oneflow/core/ep/include/device.h:

class Device {
 public:
  OF_DISALLOW_COPY_AND_MOVE(Device);
  Device() = default;
  virtual ~Device() = default;

  virtual void SetAsActiveDevice() = 0;

  virtual DeviceType device_type() const = 0;
  virtual size_t device_index() const = 0;
  virtual DeviceManager* device_manager() const = 0;

  virtual Stream* CreateStream() = 0;
  virtual void DestroyStream(Stream* stream) = 0;

  virtual Event* CreateEvent();
  virtual void DestroyEvent(Event* event);
  virtual void CreateEvents(Event** events, size_t count) = 0;
  virtual void DestroyEvents(Event** events, size_t count) = 0;

  virtual Maybe<void> Alloc(const AllocationOptions& options, void** ptr, size_t size) = 0;
  virtual void Free(const AllocationOptions& options, void* ptr) = 0;
  virtual Maybe<void> AllocPinned(const AllocationOptions& options, void** ptr, size_t size) = 0;
  virtual void FreePinned(const AllocationOptions& options, void* ptr) = 0;
  virtual bool IsStreamOrderedMemoryAllocationSupported() const;
};           

oneflow::ep::Device有如下子類實作:

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

Stream相關類型

oneflow::Stream和cuda device以及stream的關系類似,oneflow中也存在類似的基礎Stream類型。

oneflow/core/framework/stream.h:

class Stream final {
 ....
 private:
  Stream(Symbol<Device> device, StreamType stream_type, size_t thread_uid);

  static Maybe<Symbol<Stream>> RawNew(Symbol<Device> device, StreamType stream_type,
                                      size_t thread_uid);

  Maybe<void> Init(size_t unique_stream_id);

  Symbol<Device> device_;
  StreamType stream_type_;
  size_t thread_uid_;
  size_t unique_stream_id_;
};           

可以看見Stream類中的成員變量:

  • device_ 表示該Stream對象将在何種裝置上執行
  • streamtype_ 表示該Stream的類型,是用于計算的compute stream還是用于資料搬運的host2device、device2host stream等
  • threaduid_ 表示負責啟動該Stream的線程id
  • unique_streamid_ 表示這個stream自身的unique id

StreamType

和DeviceType分為kCpu和kCuda類似,Stream也有各種類型之分,具體如下:

oneflow/core/common/stream_type.h

enum class StreamType {
    kInvalid = 0,    // 無效
    kCompute,        // kernel計算流
    kHost2Device,    // 資料搬運(host -> device)流
    kDevice2Host,    // 資料搬運(device -> host)流
    kCcl,            // 集合通信流
    kBarrier,        // 線程屏障流
    kCriticalSection,// 臨界區流
    kLazyJobLauncher,// job啟動流(lazy mode)
    kPinnedCompute   // pinned memory kernel計算流
};           

oneflow::ep::Stream

oneflow中的ep子產品提供了一個更高層次的對Stream的抽象類,除了可以擷取裝置的device()、擷取裝置類型的device_type()方法外,還提供了一系列虛方法如:

  • 同步Sync()
  • 執行Event事件RecordEvent()

oneflow/core/ep/include/stream.h:

class Stream {
 public:
  OF_DISALLOW_COPY_AND_MOVE(Stream);
  Stream() = default;
  virtual ~Stream() = default;

  virtual DeviceType device_type() const = 0;
  virtual Device* device() const = 0;
  virtual Maybe<void> Sync() = 0;
  virtual void RecordEvent(Event* event) = 0;
  virtual Maybe<void> GetAsyncError() { return Maybe<void>::Ok(); }

  virtual Maybe<void> AllocAsync(void** ptr, size_t size) { UNIMPLEMENTED_THEN_RETURN(); }
  virtual Maybe<void> FreeAsync(void* ptr) { UNIMPLEMENTED_THEN_RETURN(); }
  template<typename T>
  Maybe<void> AllocAsync(T** ptr, size_t size) {
    return AllocAsync(reinterpret_cast<void**>(ptr), size);
  }

  virtual Maybe<void> OnExecutionContextSetup() { return Maybe<void>::Ok(); }
  virtual Maybe<void> OnExecutionContextTeardown() { return Maybe<void>::Ok(); }

  template<typename T>
  T* As() {
    return static_cast<T*>(this);
  }
};           

oneflow::ep::Stream有如下子類實作:

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

oneflow::vm::Stream

oneflow vm(virtual machine)中的oneflow::vm::Stream類型,用于vm内部維護stream極其依賴關系、StreamPolicy、排程線程等。

oneflow/core/vm/stream.h:

class Stream final : public intrusive::Base {
 public:
  ...
 private:
  ...
  // fields
  ThreadCtx* thread_ctx_;
  Symbol<Device> device_;
  StreamType stream_type_;
  std::shared_ptr<StreamPolicy> stream_policy_;
  bool on_scheduler_thread_;
  std::unique_ptr<char, std::function<void(char*)>> small_pinned_mem_ptr_;
  ...
};           

StreamPolicy

StreamPolicy是oneflow vm中獨有的概念,提供了一系列虛方法如:

  • stream() 擷取oneflow::ep::Stream指針
  • mut_allocator() 擷取vm::Allocator指針(用于tensor記憶體管理)
  • device_type() 擷取device裝置類型

除此之外,提供了一系列vm相關的指令狀态初始化、查詢、删除等方法。

oneflow/core/vm/stream_policy.h:

class StreamPolicy {
 public:
  virtual ~StreamPolicy() = default;

  virtual ep::Stream* stream() = 0;
  virtual vm::Allocator* mut_allocator() = 0;
  virtual DeviceType device_type() const = 0;

  virtual void InitInstructionStatus(const Stream& stream,
                                     InstructionStatusBuffer* status_buffer) const = 0;
  virtual void DeleteInstructionStatus(const Stream& stream,
                                       InstructionStatusBuffer* status_buffer) const = 0;
  virtual bool QueryInstructionStatusLaunched(
      const Stream& stream, const InstructionStatusBuffer& status_buffer) const = 0;
  virtual bool QueryInstructionStatusDone(const Stream& stream,
                                          const InstructionStatusBuffer& status_buffer) const = 0;
  virtual bool OnSchedulerThread(StreamType stream_type) const;
  virtual bool SupportingTransportInstructions() const = 0;

  void RunIf(Instruction* instruction) const;

 protected:
  StreamPolicy() = default;

 private:
  virtual void Run(Instruction* instruction) const = 0;
};           

StreamPolicy有如下子類實作:

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

2

Eager Local模式下的Device和Stream推導

下面,梳理一下普通的eager模式(eager local mode)下,算子執行全過程中device和stream相關的推導流程。

2.1 推導Device

首先,對于一個算子(op)來說,要為其設定一個預設的device用于實際計算,這一步在:

Symbol<Device> default_device = JUST(GetDefaultDevice(inputs, ctx))

這裡GetDefaultDevice的邏輯是:

  • 1.如果inputs tensor非空,則根據第一個input tensor的device來設定default的device
  • 2.如果inputs tensor為空,則優先從OpExprInterpContext中擷取device,若OpExprInterpContext中未設定,則會通過Device::New("cpu");預設給一個cpu device

值得說明的是,在1.種情況時,如果input tensor建立時指定了device為cuda裝置,則這裡推導出的default device同樣為相同的cuda device;如果未顯示指定,則預設還是cpu device。

2.2 推導Stream

oneflow::Stream的推導主要在:

  • JUST(user_op_expr.mut_local_tensor_infer_cache()->GetOrInfer(infer_args)));Symbol<Stream> stream = JUST(InferDeviceAndStream(...));

InferDeviceAndStream中,Stream推導的邏輯是會根據user_op_expr是否定義了device_and_stream_infer_fn而有所差別

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

GetDefaultStreamByDevice的具體實作:

Maybe<Symbol<Stream>> RawGetDefaultStreamByDevice(Symbol<Device> device) {
  return Stream::New(device, StreamType::kCompute);
}           

可以看見,根據傳入的device、StreamType::kCompute,new了一個oneflow::Stream。

2.3 InstructionsBuilder::Call和vm::Stream推導

在上述device和stream推導完成後,會通過InstructionsBuilder調用Call方法:

JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {
    return builder->Call(kernel, std::move(input_eager_blob_objects),
                         std::move(output_eager_blob_objects), ctx, result->stream());
}));           

Call方法中會通過

  • JUST(SoftSyncStream(output_eager_blob_objects, stream));
  • JUST(SoftSyncStream(input_eager_blob_objects, stream));
  • auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));

完成outputs inputs tensor的流同步(SoftSyncStream)過程以及vm::Stream的推導,然後通過構造OpCallInstructionPolicy指令派發至vm執行。

SoftSyncStream的同步這裡省略,具體過程見第4節。

2.3.1 構造 ThreadCtx 對象,啟動執行指令的線程

ThreadCtx 對象指針儲存在 VirtualMachine 的 HashMap 中。每個 DeviceType(CPU或CUDA)對應一個 ThreadCtx 對象;臨界區和 LazyJob 有自己的 ThreadCtx 對象。

首次通路 HashMap 時得到的是零值(空指針),需要調用 CreateThreadCtx 建立對象。實際通過虛拟機指令建立對象,ThreadCtx 對象儲存在 VirtualMachineEngine::thread_ctx_list_ 中。

ThreadCtx 對象構造後,會建立一個 worker 線程、執行 WorkerLoop 方法,并添加到 worker_threads_。是以 worker_threads_ 是與 ThreadCtx 對象一一對應的。

這個線程負責其所歸屬的指令的執行:

  • WorkerLoop 在收到通知後,會調用 ThreadCtx::TryReceiveAndRun 處理指令。
  • 在這個函數中,将 ThreadCtx 的指令挪到臨時清單、通過 StreamPolicy 執行每個指令。
  • ThreadCtx 的指令,是 VirtualMachineEngine 在 DispatchInstruction 時添加進去的。

ThreadCtx建立完成後,将持有vm::Stream對象。oneflow::vm::Stream和oneflow::Stream的數量是一一對應的,vm::Stream 按照<DeviceType, StreamRole>分組存儲在對應的 ThreadCtx 中。

vm::Stream的推導流程細節如下:

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

2.4 執行OpCall指令和ep::Stream推導

有幾個場景會建立(擷取) ep::Stream 對象。比如 kernel 執行時。

OpCall指令在構造時,指令政策類型是 OpCallInstructionPolicy。虛拟機在 DispatchInstruction 時,無論哪個分支,後續都會調用 EpStreamType::Run,最終通過

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

例如 GpuL2NormalizeKernel::Compute,最終在其kernel的Compute方法中,會通過ctx->stream()建立(擷取)ep::Stream 對象,launch kernel 執行計算。

2.4.1 擷取/建立ep::Stream

下面,我們重點看一下OpCall指令實際執行時,調用的OpCallInstructionUtil::Compute()方法:

static inline Maybe<void> Compute(OpCallInstructionPolicy* op_call_instruction_policy,
                                    Instruction* instruction) {
    Allocator* allocator = instruction->mut_stream()->mut_stream_policy()->mut_allocator();
    JUST(AllocateOutputBlobsMemory(op_call_instruction_policy, allocator, instruction));
    if (unlikely(op_call_instruction_policy->need_temp_storage())) {
      JUST(TryAllocateTempStorage(op_call_instruction_policy, allocator));
    }
    ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();
    user_op::OpKernelState* state = nullptr;
    user_op::OpKernelCache* cache = nullptr;
    if (op_call_instruction_policy->user_opkernel()->has_state_or_cache()) {
      TryInitOpKernelStateAndCache(op_call_instruction_policy, stream, &state, &cache);
    }
    OpKernelCompute(op_call_instruction_policy, stream, state, cache);
    if (unlikely(op_call_instruction_policy->need_temp_storage())) {
      DeallocateTempStorage(op_call_instruction_policy, allocator);
    }
    return Maybe<void>::Ok();
  }           

其中會通過ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();完成ep::Stream的推導,之後在OpKernelCompute()方法中實際完成op/kernel的執行。

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

這裡->stream()會調用ep_stream_policy_base.h中的:

ep::Stream* stream() override { return GetOrCreateEpStream(); }

這是一個private方法:

private:
  ep::Stream* GetOrCreateEpStream() const {
    if (unlikely(ep_stream_ == nullptr)) {
      ep_stream_ = GetOrCreateEpDevice()->CreateStream();
      CHECK(ep_stream_ != nullptr);
    }
    return ep_stream_;
  }           

可以看到,如果成員變量ep_stream_非空,則直接傳回;否則,通過 ep_stream_ = GetOrCreateEpDevice()->CreateStream(); 來建立建立ep::Stream。

2.4.2 擷取/建立ep::Device

而這裡的GetOrCreateEpDevice方法如下:

ep::Device* GetOrCreateEpDevice() const {
    if (unlikely(ep_device_ == nullptr)) {
      ep_device_ = Singleton<ep::DeviceManagerRegistry>::Get()->GetDevice(device_->enum_type(),
                                                                          device_->device_id());
      CHECK(ep_device_);
    }
    return ep_device_.get();
  }           

根據oneflow::Device中拿到的device id和device type,去全局單例的ep::DeviceManagerRegistry中取出對應的oneflow::ep::Device

oneflow::vm::StreamPolicy和oneflow::vm::EpStreamPolicy推導

  • stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));
    • std::shared_ptr<vm::StreamPolicy>(new vm::EpStreamPolicy(device));

3

Eager Global模式下的Device和Stream推導

eager global模式下,device資訊隐藏在placement中,placement不僅包括了device type資訊還包括其tensor具體分布在哪些ranks上的資訊,placement在 C++ 中的對應類型是 ParallelDesc。

是以device以及stream的部分推導過程和eager local模式下有所差別,但OpCall指令執行;device、vm::Stream和ep::Stream的推導過程都和eager local模式下是類似的。

3.1 推導Device

3.1.1 placement 的 parallel_id

oneflow中的placement表示tensor存放的裝置叢集(device group),如:p = flow.placement(type="cuda", ranks=[0, 1, 2, 3])表示tensor分布于1台機器上,cuda device 0、1、2、3四個裝置上;p = flow.placement(type="cuda", ranks=[[0, 1], [2, 3]])則表示tensor分布于2台機器上,host1的device0、1以及host2的device2、3。

在 oneflow 的分布式環境下,各個 host 上需要有相同數量的device,每個程序使用一個device。這樣根據環境變量 RANK 可以得出 machine_id,LOCAL_RANK 就是程序在 制定host 上的 rank序号。

如果 input tensor 的 placement 與目前程序無關,可以省掉很多不必要的計算。通過 placement 的 parallel_id 可以判斷計算任務是否與目前程序相關。

placement 在 C++ 中的對應類型是 ParallelDesc,其中并沒有 parallel_id 字段,這個資訊隐含在其它字段中。

ParallelDesc 在構造時會調用 ClearUp 函數,從中可以看到

  • ParallelDesc::parallel_id2machine_id_ 是 placement 分布的 machine。
  • ParallelDesc::parallel_id2device_id_ 是 placement 分布的 device_id。
  • parallel_id 是上述 2 個數組的索引,一個 parallel_id 對應一個 machine_id:device_id 組合。這樣,根據parallel_id可以查到對應的 machine_id 和 device_id。
  • 反過來,根據 machine_id:device_id 也可以從 machine_id2device_id2parallel_id_ 查到 parallel_id。

3.1.2 eager 模式下根據 parallel_id 忽略無關計算任務

在 eager 分布時場景處理計算任務時,會調用GetTensorDevice4CurrentProcessCtx,推導得到輸出tensor的device,以及擷取目前程序的 machine_id、device_id 在 placement 中的 parallel_id 值。

如果目前程序與該 placement 無關,parallel_id 就是空,後續處理時就可以忽略一些計算:

  • EagerGlobalTensorImpl::New 中隻需要用 functional::Empty 構造一個 shape 為 0 的空的 tensor。
  • GetBoxingOutput計算時,如果parallel_id為空則表示目前rank程序無效,無需計算直接傳回。
  • Interpret 可以不給 vm 送出指令、提前傳回。

3.2 推導Stream

在 ConsistentTensorInferCache 中推導 SBP Signature 時,也會同時推導出目前的 tensor 計算任務、在目前程序所用的device。推導時,會先确認所有 inputs 的 placement 是一緻的,都分布在相同的device上。如前所述,如果計算任務與目前程序無關,會提前傳回;而一個程序隻使用一個device。

這裡和eager local模式下stream的推導類似,通過JUST(InferDeviceAndStream(user_op_expr, infer_args))推導出oneflow::Stream對象,StreamRole 是 kCompute。差別在于eager global模式下

3.2.1 unique_stream_id

unique_stream_id 表示 oneflow::Stream 對象的建立次序。所有的 oneflow::Stream 對象都儲存在全局的 StreamMgr::stream2unique_stream_id_ 中。unique_stream_id2stream_symbol_ 可看作是引用類型的副本,unique_stream_id 就是 Stream 對象在這個數組中的索引。與 parallel_id 不同,unique_stream_id 是 Stream 對象在程序内的唯一辨別。

并不是每次都需要加鎖通路 StreamMgr。oneflow::Stream 包含的都是描述性資訊,其引用是以 ThreadLocal 的方式存儲的,可以提升後續讀取的效率。虛拟機在執行指令時,也會用 unique_stream_id 進行邏輯判斷。

4

Eager模式下的Stream同步——SoftSyncStream

設想以下場景:将 CPU 下的 tensor 拷貝到 CUDA 裝置,然後在 CUDA 上再進行 tensor add 的計算。這涉及到兩個流,一個是 Host2Device,一個是 CUDA Compute。這兩個流的計算任務是并發執行的。需要有同步措施,才能保證拷貝完再執行 add 計算。

Eager 模式下,在 InstructionsBuilder::Call 中構造指令時,對 SoftSyncStream的調用會在必要時向指令清單插入同步指令。

SoftSyncStream 中,幾個重要概念:

  • tensor在oneflow記憶體中的實際承載者是 eager_blob_object
  • last_used_stream 表示一個tensor(blob)上一次使用到的stream,可能是compute stream、h2d stream、d2h stream、集合通信ccl stream等
  • 如果 last_used_stream 與目前計算執行的流 stream 相同,則可以忽略,因為相同stream間天然順序執行是以無需同步,否則就需要進行後續的同步處理

SoftSyncStream代碼如下:

Maybe<void> InstructionsBuilder::SoftSyncStream(const vm::EagerBlobObjectList& eager_blob_objects,
                                                Symbol<Stream> stream) {
  JUST(ForEachEagerBlobObjectsNeedingSoftSync(
      eager_blob_objects, stream,
      [&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {
        return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);
      }));
  for (const auto& eager_blob_object : eager_blob_objects) {
    eager_blob_object->set_last_used_stream(stream);
  }
  return Maybe<void>::Ok();
}           

主體邏輯是,會在ForEachEagerBlobObjectsNeedingSoftSync方法中周遊每一個tensor對象(eager blob object),對于每一個需要同步的blob運用lambda方法并最終調用SoftSyncStreamBetween完成stream間的同步。

這裡,我們看一下ForEachEagerBlobObjectsNeedingSoftSync的邏輯:

OneFlow源碼解析:Eager模式下的裝置管理與并發執行

首先if/else的主體業務邏輯是類似的,主要差別在于,當blob的size <= kOpArgsReservedSize時(預設為4)會使用small vector來存放LocalDepObject變量,效率會更快一些(否則會走到else分支,主體邏輯類似,這裡就不看了)。

  • const auto& opt_last_used_stream = eager_blob_object->last_used_stream();
  • if (unlikely(!opt_last_used_stream.has_value())) { continue; }

這兩句是查詢該tensor(blob)上一次被使用時用到的stream——last_used_stream,如果為空,則直接continue跳過,因為如果此tensor之前并未被任何stream使用,則無需進行stream間的同步操作,因為在目前stream上不會有關于該tensor的其他依賴關系;

if (last_used_stream != stream) {
    small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize> dep_objects{
        intrusive::shared_ptr<LocalDepObject>(
            JUST(eager_blob_object->compute_local_dep_object()))};
    JUST(DoEach(last_used_stream, std::move(dep_objects)));
  }           

如果last_used_stream!=stream則表示需要在兩個stream間進行同步,則會應用傳入的lambda函數DoEach進行處理,在這裡lambda函數即:

[&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {
    return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);
  }));           

既實際調用的是SoftSyncStreamBetween來完成實際的stream間同步,這裡主要有3個變量:

  • dep_objects存儲了tensor間的依賴關系
  • last_used_stream則是該tensor上一次使用的stream
  • stream該tensor目前使用的stream

SoftSyncStreamBetween的代碼如下:

Maybe<void> InstructionsBuilder::SoftSyncStreamBetween(
    small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize>&& dependences,
    Symbol<Stream> from_stream, Symbol<Stream> to_stream) {
  CHECK(from_stream != to_stream) << "synchronization is unnecessary";
  if (SupportingStreamWait(from_stream, to_stream)) {
    JUST(StreamWait(std::move(dependences), from_stream, to_stream));
  } else {
    JUST(RecordEvent(std::move(dependences), from_stream));
  }
  return Maybe<void>::Ok();
}           

SoftSyncStreamBetween的主要邏輯如下:

  • 先額外做了一次check,檢測如果待同步的兩個stream相同,則check會報錯并提示"synchronization is unnecessary"
  • 通過SupportingStreamWait判斷from 和 to stream間是否支援stream wait,是則調用StreamWait方法;否則,直接調用RecordEvent方法
  • SupportingStreamWait的主要邏輯是,通過stream的device、以及StreamType的Visit方法來判斷。簡單來說,如果from 和 to stream之間是不同的device(譬如cpu stream <-> cuda stream之間的同步),或者from stream的device為cpu,則SupportingStreamWait一定是false;如果是相同的,則繼續通過其他判斷條件進行判斷。

SupportingStreamWait為True

SupportingStreamWait為True時,即from to stream同為Cuda Stream間的同步情況,在這種情況下會走到StreamWait的函數,該函數最終會派發一個StreamWaitEventInstructionPolicy的指令給vm執行,StreamWaitEventInstructionPolicy的執行邏輯主要是兩個cuda event:

  • cudaEventRecord
  • cudaStreamWaitEvent
OneFlow源碼解析:Eager模式下的裝置管理與并發執行
  • 對于from_stream來說,插入一個cudaEventRecord,用于标志from stream是否完成該stream上的event事件;
  • 對于to_stream來說,插入一個cudaStreamWaitEvent等待from stream上的事件完成後,再繼續執行to_stream。

SupportingStreamWait為False

SupportingStreamWait為False時,會直接調用JUST(RecordEvent(std::move(dependences), from_stream)); 其内部實作會從對象池中擷取可複用的cuda event對象并執行event。

這裡有個細節,由于cuda event的建立和銷毀都會引發cuda kernel的launch由異步轉同步,是以基于對象池的cuda event可以避免這個開銷。

實際上最終調用的還是cudaEventRecord,而cudaEventRecord本身隻是起到一個“占位符”的作用,并不能起到(保證該stream上其他kernel全部執行完)的作用,真正能保證stream同步作用的是oneflow vm(vitual machine)控制下的指令間依賴關系/執行順序。

5

CPU 下的并行計算

CpuStream 隻有一個線程。CPU kernel 應該是通過 OpenMP 或者 Intel OneApi 等實作并行計算加速。

參考資料

1.https://github.com/Oneflow-Inc/oneflow/tree/845595e2c0abc3d384ff047e188295afdc41faaa

歡迎 Star、試用 OneFlow 最新版本:

https://github.com/Oneflow-Inc/oneflow/

繼續閱讀