作者|鄭建華
更新|趙露陽
通過這篇筆記,希望能初步了解 OneFlow 在 Eager 模式下對裝置的管理方式、裝置執行計算的過程以及如何充分利用裝置計算能力。這裡的裝置主要指類似 CUDA 這樣的并行計算加速裝置。
1
裝置、流相關類型及關系
架構通過流(Stream)向裝置(Device)送出計算任務。一個 Stream 是一個指令序列,可以類比 CUDA Stream(https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#streams),或者 CPU Thread 的指令序列。同一個 Stream 中的指令按順序執行;不同 Stream 之間的指令有依賴關系時,需要同步。不同的任務,比如 kernel 計算、host2device、device2host 等都有自己獨立的 Stream,可以并發執行,進而在 Eager 模式下盡可能充分利用裝置的異步并發執行能力。
OneFlow 中Device和Stream相關的部分類結構如下所示:
Device相關類型
oneflow::Device
oneflow::Device是用于表示裝置的基礎類型,例如:建構tensor時 flow.tensor(shape, device="cuda:1")就會在内部構造出這個基礎的Device類型,其中裝置編号為1、裝置類型為CUDA。
oneflow/core/framework/device.h:
class Device final {
public:
...
private:
Device(const std::string& type, int64_t device_id);
Maybe<void> Init();
const std::string type_;
DeviceType enum_type_;
const int64_t device_id_;
const size_t hash_value_;
std::shared_ptr<MemoryCase> mem_case_;
};
oneflow::Device中最重要的兩個成員變量分别是用于表示裝置類型的DeviceType;用于表示裝置編号的device_id_。
DeviceType
DeviceType是一個枚舉類,不同的值代表不同的計算裝置類型,其定義位于 oneflow/core/common/device_type.proto:
enum DeviceType {
kInvalidDevice = 0; // 無效裝置
kCPU = 1; // cpu裝置
kCUDA = 2; // cuda裝置
kMockDevice = 3; // pseudo device for test.
}
目前在oneflow master分支中,主要有kCPU表示cpu裝置;kCUDA表示nvidia cuda裝置;在其他多裝置支援的分支中,這裡還增加了更多的裝置類型。
oneflow::ep::Device
oneflow::Device是oneflow中對裝置的基礎封裝類型,而oneflow::ep::Device則是一個抽象類,屬于oneflow的ep子產品(execution provider),是對裝置行為的封裝,ep子產品為多硬體裝置提供了更高層次的抽象,友善oneflow支援和相容多硬體裝置提供了更高的靈活性和可拓展性。
oneflow::ep::Device不僅提供了表示裝置類型的device_type()方法、表示裝置編号的device_index()方法,還提供了建立/銷毀ep::Stream、建立/銷毀Event、在裝置上申請/釋放記憶體的各種方法。
oneflow/core/ep/include/device.h:
class Device {
public:
OF_DISALLOW_COPY_AND_MOVE(Device);
Device() = default;
virtual ~Device() = default;
virtual void SetAsActiveDevice() = 0;
virtual DeviceType device_type() const = 0;
virtual size_t device_index() const = 0;
virtual DeviceManager* device_manager() const = 0;
virtual Stream* CreateStream() = 0;
virtual void DestroyStream(Stream* stream) = 0;
virtual Event* CreateEvent();
virtual void DestroyEvent(Event* event);
virtual void CreateEvents(Event** events, size_t count) = 0;
virtual void DestroyEvents(Event** events, size_t count) = 0;
virtual Maybe<void> Alloc(const AllocationOptions& options, void** ptr, size_t size) = 0;
virtual void Free(const AllocationOptions& options, void* ptr) = 0;
virtual Maybe<void> AllocPinned(const AllocationOptions& options, void** ptr, size_t size) = 0;
virtual void FreePinned(const AllocationOptions& options, void* ptr) = 0;
virtual bool IsStreamOrderedMemoryAllocationSupported() const;
};
oneflow::ep::Device有如下子類實作:
Stream相關類型
oneflow::Stream和cuda device以及stream的關系類似,oneflow中也存在類似的基礎Stream類型。
oneflow/core/framework/stream.h:
class Stream final {
....
private:
Stream(Symbol<Device> device, StreamType stream_type, size_t thread_uid);
static Maybe<Symbol<Stream>> RawNew(Symbol<Device> device, StreamType stream_type,
size_t thread_uid);
Maybe<void> Init(size_t unique_stream_id);
Symbol<Device> device_;
StreamType stream_type_;
size_t thread_uid_;
size_t unique_stream_id_;
};
可以看見Stream類中的成員變量:
- device_ 表示該Stream對象将在何種裝置上執行
- streamtype_ 表示該Stream的類型,是用于計算的compute stream還是用于資料搬運的host2device、device2host stream等
- threaduid_ 表示負責啟動該Stream的線程id
- unique_streamid_ 表示這個stream自身的unique id
StreamType
和DeviceType分為kCpu和kCuda類似,Stream也有各種類型之分,具體如下:
oneflow/core/common/stream_type.h
enum class StreamType {
kInvalid = 0, // 無效
kCompute, // kernel計算流
kHost2Device, // 資料搬運(host -> device)流
kDevice2Host, // 資料搬運(device -> host)流
kCcl, // 集合通信流
kBarrier, // 線程屏障流
kCriticalSection,// 臨界區流
kLazyJobLauncher,// job啟動流(lazy mode)
kPinnedCompute // pinned memory kernel計算流
};
oneflow::ep::Stream
oneflow中的ep子產品提供了一個更高層次的對Stream的抽象類,除了可以擷取裝置的device()、擷取裝置類型的device_type()方法外,還提供了一系列虛方法如:
- 同步Sync()
- 執行Event事件RecordEvent()
oneflow/core/ep/include/stream.h:
class Stream {
public:
OF_DISALLOW_COPY_AND_MOVE(Stream);
Stream() = default;
virtual ~Stream() = default;
virtual DeviceType device_type() const = 0;
virtual Device* device() const = 0;
virtual Maybe<void> Sync() = 0;
virtual void RecordEvent(Event* event) = 0;
virtual Maybe<void> GetAsyncError() { return Maybe<void>::Ok(); }
virtual Maybe<void> AllocAsync(void** ptr, size_t size) { UNIMPLEMENTED_THEN_RETURN(); }
virtual Maybe<void> FreeAsync(void* ptr) { UNIMPLEMENTED_THEN_RETURN(); }
template<typename T>
Maybe<void> AllocAsync(T** ptr, size_t size) {
return AllocAsync(reinterpret_cast<void**>(ptr), size);
}
virtual Maybe<void> OnExecutionContextSetup() { return Maybe<void>::Ok(); }
virtual Maybe<void> OnExecutionContextTeardown() { return Maybe<void>::Ok(); }
template<typename T>
T* As() {
return static_cast<T*>(this);
}
};
oneflow::ep::Stream有如下子類實作:
oneflow::vm::Stream
oneflow vm(virtual machine)中的oneflow::vm::Stream類型,用于vm内部維護stream極其依賴關系、StreamPolicy、排程線程等。
oneflow/core/vm/stream.h:
class Stream final : public intrusive::Base {
public:
...
private:
...
// fields
ThreadCtx* thread_ctx_;
Symbol<Device> device_;
StreamType stream_type_;
std::shared_ptr<StreamPolicy> stream_policy_;
bool on_scheduler_thread_;
std::unique_ptr<char, std::function<void(char*)>> small_pinned_mem_ptr_;
...
};
StreamPolicy
StreamPolicy是oneflow vm中獨有的概念,提供了一系列虛方法如:
- stream() 擷取oneflow::ep::Stream指針
- mut_allocator() 擷取vm::Allocator指針(用于tensor記憶體管理)
- device_type() 擷取device裝置類型
除此之外,提供了一系列vm相關的指令狀态初始化、查詢、删除等方法。
oneflow/core/vm/stream_policy.h:
class StreamPolicy {
public:
virtual ~StreamPolicy() = default;
virtual ep::Stream* stream() = 0;
virtual vm::Allocator* mut_allocator() = 0;
virtual DeviceType device_type() const = 0;
virtual void InitInstructionStatus(const Stream& stream,
InstructionStatusBuffer* status_buffer) const = 0;
virtual void DeleteInstructionStatus(const Stream& stream,
InstructionStatusBuffer* status_buffer) const = 0;
virtual bool QueryInstructionStatusLaunched(
const Stream& stream, const InstructionStatusBuffer& status_buffer) const = 0;
virtual bool QueryInstructionStatusDone(const Stream& stream,
const InstructionStatusBuffer& status_buffer) const = 0;
virtual bool OnSchedulerThread(StreamType stream_type) const;
virtual bool SupportingTransportInstructions() const = 0;
void RunIf(Instruction* instruction) const;
protected:
StreamPolicy() = default;
private:
virtual void Run(Instruction* instruction) const = 0;
};
StreamPolicy有如下子類實作:
2
Eager Local模式下的Device和Stream推導
下面,梳理一下普通的eager模式(eager local mode)下,算子執行全過程中device和stream相關的推導流程。
2.1 推導Device
首先,對于一個算子(op)來說,要為其設定一個預設的device用于實際計算,這一步在:
Symbol<Device> default_device = JUST(GetDefaultDevice(inputs, ctx))
這裡GetDefaultDevice的邏輯是:
- 1.如果inputs tensor非空,則根據第一個input tensor的device來設定default的device
- 2.如果inputs tensor為空,則優先從OpExprInterpContext中擷取device,若OpExprInterpContext中未設定,則會通過Device::New("cpu");預設給一個cpu device
值得說明的是,在1.種情況時,如果input tensor建立時指定了device為cuda裝置,則這裡推導出的default device同樣為相同的cuda device;如果未顯示指定,則預設還是cpu device。
2.2 推導Stream
oneflow::Stream的推導主要在:
- JUST(user_op_expr.mut_local_tensor_infer_cache()->GetOrInfer(infer_args)));Symbol<Stream> stream = JUST(InferDeviceAndStream(...));
InferDeviceAndStream中,Stream推導的邏輯是會根據user_op_expr是否定義了device_and_stream_infer_fn而有所差別
GetDefaultStreamByDevice的具體實作:
Maybe<Symbol<Stream>> RawGetDefaultStreamByDevice(Symbol<Device> device) {
return Stream::New(device, StreamType::kCompute);
}
可以看見,根據傳入的device、StreamType::kCompute,new了一個oneflow::Stream。
2.3 InstructionsBuilder::Call和vm::Stream推導
在上述device和stream推導完成後,會通過InstructionsBuilder調用Call方法:
JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {
return builder->Call(kernel, std::move(input_eager_blob_objects),
std::move(output_eager_blob_objects), ctx, result->stream());
}));
Call方法中會通過
- JUST(SoftSyncStream(output_eager_blob_objects, stream));
- JUST(SoftSyncStream(input_eager_blob_objects, stream));
- auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
完成outputs inputs tensor的流同步(SoftSyncStream)過程以及vm::Stream的推導,然後通過構造OpCallInstructionPolicy指令派發至vm執行。
SoftSyncStream的同步這裡省略,具體過程見第4節。
2.3.1 構造 ThreadCtx 對象,啟動執行指令的線程
ThreadCtx 對象指針儲存在 VirtualMachine 的 HashMap 中。每個 DeviceType(CPU或CUDA)對應一個 ThreadCtx 對象;臨界區和 LazyJob 有自己的 ThreadCtx 對象。
首次通路 HashMap 時得到的是零值(空指針),需要調用 CreateThreadCtx 建立對象。實際通過虛拟機指令建立對象,ThreadCtx 對象儲存在 VirtualMachineEngine::thread_ctx_list_ 中。
ThreadCtx 對象構造後,會建立一個 worker 線程、執行 WorkerLoop 方法,并添加到 worker_threads_。是以 worker_threads_ 是與 ThreadCtx 對象一一對應的。
這個線程負責其所歸屬的指令的執行:
- WorkerLoop 在收到通知後,會調用 ThreadCtx::TryReceiveAndRun 處理指令。
- 在這個函數中,将 ThreadCtx 的指令挪到臨時清單、通過 StreamPolicy 執行每個指令。
- ThreadCtx 的指令,是 VirtualMachineEngine 在 DispatchInstruction 時添加進去的。
ThreadCtx建立完成後,将持有vm::Stream對象。oneflow::vm::Stream和oneflow::Stream的數量是一一對應的,vm::Stream 按照<DeviceType, StreamRole>分組存儲在對應的 ThreadCtx 中。
vm::Stream的推導流程細節如下:
2.4 執行OpCall指令和ep::Stream推導
有幾個場景會建立(擷取) ep::Stream 對象。比如 kernel 執行時。
OpCall指令在構造時,指令政策類型是 OpCallInstructionPolicy。虛拟機在 DispatchInstruction 時,無論哪個分支,後續都會調用 EpStreamType::Run,最終通過
例如 GpuL2NormalizeKernel::Compute,最終在其kernel的Compute方法中,會通過ctx->stream()建立(擷取)ep::Stream 對象,launch kernel 執行計算。
2.4.1 擷取/建立ep::Stream
下面,我們重點看一下OpCall指令實際執行時,調用的OpCallInstructionUtil::Compute()方法:
static inline Maybe<void> Compute(OpCallInstructionPolicy* op_call_instruction_policy,
Instruction* instruction) {
Allocator* allocator = instruction->mut_stream()->mut_stream_policy()->mut_allocator();
JUST(AllocateOutputBlobsMemory(op_call_instruction_policy, allocator, instruction));
if (unlikely(op_call_instruction_policy->need_temp_storage())) {
JUST(TryAllocateTempStorage(op_call_instruction_policy, allocator));
}
ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();
user_op::OpKernelState* state = nullptr;
user_op::OpKernelCache* cache = nullptr;
if (op_call_instruction_policy->user_opkernel()->has_state_or_cache()) {
TryInitOpKernelStateAndCache(op_call_instruction_policy, stream, &state, &cache);
}
OpKernelCompute(op_call_instruction_policy, stream, state, cache);
if (unlikely(op_call_instruction_policy->need_temp_storage())) {
DeallocateTempStorage(op_call_instruction_policy, allocator);
}
return Maybe<void>::Ok();
}
其中會通過ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();完成ep::Stream的推導,之後在OpKernelCompute()方法中實際完成op/kernel的執行。
這裡->stream()會調用ep_stream_policy_base.h中的:
ep::Stream* stream() override { return GetOrCreateEpStream(); }
這是一個private方法:
private:
ep::Stream* GetOrCreateEpStream() const {
if (unlikely(ep_stream_ == nullptr)) {
ep_stream_ = GetOrCreateEpDevice()->CreateStream();
CHECK(ep_stream_ != nullptr);
}
return ep_stream_;
}
可以看到,如果成員變量ep_stream_非空,則直接傳回;否則,通過 ep_stream_ = GetOrCreateEpDevice()->CreateStream(); 來建立建立ep::Stream。
2.4.2 擷取/建立ep::Device
而這裡的GetOrCreateEpDevice方法如下:
ep::Device* GetOrCreateEpDevice() const {
if (unlikely(ep_device_ == nullptr)) {
ep_device_ = Singleton<ep::DeviceManagerRegistry>::Get()->GetDevice(device_->enum_type(),
device_->device_id());
CHECK(ep_device_);
}
return ep_device_.get();
}
根據oneflow::Device中拿到的device id和device type,去全局單例的ep::DeviceManagerRegistry中取出對應的oneflow::ep::Device
oneflow::vm::StreamPolicy和oneflow::vm::EpStreamPolicy推導
- stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));
- std::shared_ptr<vm::StreamPolicy>(new vm::EpStreamPolicy(device));
3
Eager Global模式下的Device和Stream推導
eager global模式下,device資訊隐藏在placement中,placement不僅包括了device type資訊還包括其tensor具體分布在哪些ranks上的資訊,placement在 C++ 中的對應類型是 ParallelDesc。
是以device以及stream的部分推導過程和eager local模式下有所差別,但OpCall指令執行;device、vm::Stream和ep::Stream的推導過程都和eager local模式下是類似的。
3.1 推導Device
3.1.1 placement 的 parallel_id
oneflow中的placement表示tensor存放的裝置叢集(device group),如:p = flow.placement(type="cuda", ranks=[0, 1, 2, 3])表示tensor分布于1台機器上,cuda device 0、1、2、3四個裝置上;p = flow.placement(type="cuda", ranks=[[0, 1], [2, 3]])則表示tensor分布于2台機器上,host1的device0、1以及host2的device2、3。
在 oneflow 的分布式環境下,各個 host 上需要有相同數量的device,每個程序使用一個device。這樣根據環境變量 RANK 可以得出 machine_id,LOCAL_RANK 就是程序在 制定host 上的 rank序号。
如果 input tensor 的 placement 與目前程序無關,可以省掉很多不必要的計算。通過 placement 的 parallel_id 可以判斷計算任務是否與目前程序相關。
placement 在 C++ 中的對應類型是 ParallelDesc,其中并沒有 parallel_id 字段,這個資訊隐含在其它字段中。
ParallelDesc 在構造時會調用 ClearUp 函數,從中可以看到
- ParallelDesc::parallel_id2machine_id_ 是 placement 分布的 machine。
- ParallelDesc::parallel_id2device_id_ 是 placement 分布的 device_id。
- parallel_id 是上述 2 個數組的索引,一個 parallel_id 對應一個 machine_id:device_id 組合。這樣,根據parallel_id可以查到對應的 machine_id 和 device_id。
- 反過來,根據 machine_id:device_id 也可以從 machine_id2device_id2parallel_id_ 查到 parallel_id。
3.1.2 eager 模式下根據 parallel_id 忽略無關計算任務
在 eager 分布時場景處理計算任務時,會調用GetTensorDevice4CurrentProcessCtx,推導得到輸出tensor的device,以及擷取目前程序的 machine_id、device_id 在 placement 中的 parallel_id 值。
如果目前程序與該 placement 無關,parallel_id 就是空,後續處理時就可以忽略一些計算:
- EagerGlobalTensorImpl::New 中隻需要用 functional::Empty 構造一個 shape 為 0 的空的 tensor。
- GetBoxingOutput計算時,如果parallel_id為空則表示目前rank程序無效,無需計算直接傳回。
- Interpret 可以不給 vm 送出指令、提前傳回。
3.2 推導Stream
在 ConsistentTensorInferCache 中推導 SBP Signature 時,也會同時推導出目前的 tensor 計算任務、在目前程序所用的device。推導時,會先确認所有 inputs 的 placement 是一緻的,都分布在相同的device上。如前所述,如果計算任務與目前程序無關,會提前傳回;而一個程序隻使用一個device。
這裡和eager local模式下stream的推導類似,通過JUST(InferDeviceAndStream(user_op_expr, infer_args))推導出oneflow::Stream對象,StreamRole 是 kCompute。差別在于eager global模式下
3.2.1 unique_stream_id
unique_stream_id 表示 oneflow::Stream 對象的建立次序。所有的 oneflow::Stream 對象都儲存在全局的 StreamMgr::stream2unique_stream_id_ 中。unique_stream_id2stream_symbol_ 可看作是引用類型的副本,unique_stream_id 就是 Stream 對象在這個數組中的索引。與 parallel_id 不同,unique_stream_id 是 Stream 對象在程序内的唯一辨別。
并不是每次都需要加鎖通路 StreamMgr。oneflow::Stream 包含的都是描述性資訊,其引用是以 ThreadLocal 的方式存儲的,可以提升後續讀取的效率。虛拟機在執行指令時,也會用 unique_stream_id 進行邏輯判斷。
4
Eager模式下的Stream同步——SoftSyncStream
設想以下場景:将 CPU 下的 tensor 拷貝到 CUDA 裝置,然後在 CUDA 上再進行 tensor add 的計算。這涉及到兩個流,一個是 Host2Device,一個是 CUDA Compute。這兩個流的計算任務是并發執行的。需要有同步措施,才能保證拷貝完再執行 add 計算。
Eager 模式下,在 InstructionsBuilder::Call 中構造指令時,對 SoftSyncStream的調用會在必要時向指令清單插入同步指令。
SoftSyncStream 中,幾個重要概念:
- tensor在oneflow記憶體中的實際承載者是 eager_blob_object
- last_used_stream 表示一個tensor(blob)上一次使用到的stream,可能是compute stream、h2d stream、d2h stream、集合通信ccl stream等
- 如果 last_used_stream 與目前計算執行的流 stream 相同,則可以忽略,因為相同stream間天然順序執行是以無需同步,否則就需要進行後續的同步處理
SoftSyncStream代碼如下:
Maybe<void> InstructionsBuilder::SoftSyncStream(const vm::EagerBlobObjectList& eager_blob_objects,
Symbol<Stream> stream) {
JUST(ForEachEagerBlobObjectsNeedingSoftSync(
eager_blob_objects, stream,
[&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {
return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);
}));
for (const auto& eager_blob_object : eager_blob_objects) {
eager_blob_object->set_last_used_stream(stream);
}
return Maybe<void>::Ok();
}
主體邏輯是,會在ForEachEagerBlobObjectsNeedingSoftSync方法中周遊每一個tensor對象(eager blob object),對于每一個需要同步的blob運用lambda方法并最終調用SoftSyncStreamBetween完成stream間的同步。
這裡,我們看一下ForEachEagerBlobObjectsNeedingSoftSync的邏輯:
首先if/else的主體業務邏輯是類似的,主要差別在于,當blob的size <= kOpArgsReservedSize時(預設為4)會使用small vector來存放LocalDepObject變量,效率會更快一些(否則會走到else分支,主體邏輯類似,這裡就不看了)。
- const auto& opt_last_used_stream = eager_blob_object->last_used_stream();
- if (unlikely(!opt_last_used_stream.has_value())) { continue; }
這兩句是查詢該tensor(blob)上一次被使用時用到的stream——last_used_stream,如果為空,則直接continue跳過,因為如果此tensor之前并未被任何stream使用,則無需進行stream間的同步操作,因為在目前stream上不會有關于該tensor的其他依賴關系;
if (last_used_stream != stream) {
small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize> dep_objects{
intrusive::shared_ptr<LocalDepObject>(
JUST(eager_blob_object->compute_local_dep_object()))};
JUST(DoEach(last_used_stream, std::move(dep_objects)));
}
如果last_used_stream!=stream則表示需要在兩個stream間進行同步,則會應用傳入的lambda函數DoEach進行處理,在這裡lambda函數即:
[&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {
return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream);
}));
既實際調用的是SoftSyncStreamBetween來完成實際的stream間同步,這裡主要有3個變量:
- dep_objects存儲了tensor間的依賴關系
- last_used_stream則是該tensor上一次使用的stream
- stream該tensor目前使用的stream
SoftSyncStreamBetween的代碼如下:
Maybe<void> InstructionsBuilder::SoftSyncStreamBetween(
small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize>&& dependences,
Symbol<Stream> from_stream, Symbol<Stream> to_stream) {
CHECK(from_stream != to_stream) << "synchronization is unnecessary";
if (SupportingStreamWait(from_stream, to_stream)) {
JUST(StreamWait(std::move(dependences), from_stream, to_stream));
} else {
JUST(RecordEvent(std::move(dependences), from_stream));
}
return Maybe<void>::Ok();
}
SoftSyncStreamBetween的主要邏輯如下:
- 先額外做了一次check,檢測如果待同步的兩個stream相同,則check會報錯并提示"synchronization is unnecessary"
- 通過SupportingStreamWait判斷from 和 to stream間是否支援stream wait,是則調用StreamWait方法;否則,直接調用RecordEvent方法
- SupportingStreamWait的主要邏輯是,通過stream的device、以及StreamType的Visit方法來判斷。簡單來說,如果from 和 to stream之間是不同的device(譬如cpu stream <-> cuda stream之間的同步),或者from stream的device為cpu,則SupportingStreamWait一定是false;如果是相同的,則繼續通過其他判斷條件進行判斷。
SupportingStreamWait為True
SupportingStreamWait為True時,即from to stream同為Cuda Stream間的同步情況,在這種情況下會走到StreamWait的函數,該函數最終會派發一個StreamWaitEventInstructionPolicy的指令給vm執行,StreamWaitEventInstructionPolicy的執行邏輯主要是兩個cuda event:
- cudaEventRecord
- cudaStreamWaitEvent
- 對于from_stream來說,插入一個cudaEventRecord,用于标志from stream是否完成該stream上的event事件;
- 對于to_stream來說,插入一個cudaStreamWaitEvent等待from stream上的事件完成後,再繼續執行to_stream。
SupportingStreamWait為False
SupportingStreamWait為False時,會直接調用JUST(RecordEvent(std::move(dependences), from_stream)); 其内部實作會從對象池中擷取可複用的cuda event對象并執行event。
這裡有個細節,由于cuda event的建立和銷毀都會引發cuda kernel的launch由異步轉同步,是以基于對象池的cuda event可以避免這個開銷。
實際上最終調用的還是cudaEventRecord,而cudaEventRecord本身隻是起到一個“占位符”的作用,并不能起到(保證該stream上其他kernel全部執行完)的作用,真正能保證stream同步作用的是oneflow vm(vitual machine)控制下的指令間依賴關系/執行順序。
5
CPU 下的并行計算
CpuStream 隻有一個線程。CPU kernel 應該是通過 OpenMP 或者 Intel OneApi 等實作并行計算加速。
參考資料
1.https://github.com/Oneflow-Inc/oneflow/tree/845595e2c0abc3d384ff047e188295afdc41faaa
歡迎 Star、試用 OneFlow 最新版本:
https://github.com/Oneflow-Inc/oneflow/