天天看點

OneFlow: 從 Op 到 Job

前言

前面在初始化 Session 的時候,通過 CurJobAddOp 将 Op 加入到計算圖當中。在啟動 Session 之前,需要将這些 Op 組成的計算圖編譯成 JobSet。這篇文章主要分析如何從一個個 op 變成一個 Job,Job 如何在 Complete 函數内進行優化的。

流程分析

上一篇文章中,_CompileJob 将會去調用使用者定義的 Job 函數,将 Op 加入到計算圖中。這個過程執行完成之後,會調用 CurJobBuildAndInferCtx_Complete,生成 Job 或 JobSet(如果使用者定義了多個 Job)。Complete 的時候,會執行一些 Pass,将計算圖進行改寫優化。

# python/oneflow/compatible/single_client/framework/compiler.py: 44
def Compile(session, function_desc, config_proto):
    with InterpretScope(session, function_desc, config_proto):
        _CompileJob(session, function_desc)
        session.StashJob(function_desc.job_func.__name__)
        oneflow._oneflow_internal.CurJobBuildAndInferCtx_Complete()
        session.StashJob(
            function_desc.job_func.__name__,
            function_desc.job_func.__name__ + "_after_complete",
        )
           

CurJobBuildAndInferCtx_Complete

Complete 的時候做了什麼呢?它的輸入是什麼?輸出又是什麼?輸入和輸出都是 job,問題是這個輸入的 job 是從哪裡來的呢?我們可以看到 job 和 mut_job 等方法的調用,這些方法傳回的都是類成員 job_。這個 job_ 是怎麼構造來的呢?

// oneflow/core/job/job_build_and_infer_ctx.cpp: 960
Maybe<void> LazyJobBuildAndInferCtx::Complete() {
  CHECK_GT_OR_RETURN(job().net().op_size(), 0)
      << " Sorry, nn.Graph need at least 1 op in net, but get 0 now.";
  CHECK_NOTNULL(Global<JobDesc>::Get());
  Global<JobDesc>::Delete();
  auto scope = std::make_unique<GlobalJobDescScope>(mut_job()->job_conf(), job_id());
  JobPassCtx job_pass_ctx(GlobalJobDesc());
  auto DoPass = [&](const std::string& pass_name) -> Maybe<void> {
    return JobPass4Name(pass_name)(mut_job(), &job_pass_ctx);
  };
  // ... DoPass
  return Maybe<void>::Ok();
}
           

Job 是哪裡來的

來看看構造函數,和 SetJobConf,這兩個函數都會修改 JobBuildAndInferCtx 内部的 job_ 成員。

// oneflow/core/job/job_build_and_infer_ctx.cpp: 103
JobBuildAndInferCtx::JobBuildAndInferCtx(Job* job, int64_t job_id)
    : job_(job), job_id_(job_id), unique_op_name_index_(0) {
  is_job_conf_frozen_ = false;
  has_job_conf_ = false;
}

Maybe<void> JobBuildAndInferCtx::SetJobConf(const JobConfigProto& job_conf) {
  CHECK_OR_RETURN(!is_job_conf_frozen_) << Error::JobConfFrozenError();
  CHECK_OR_RETURN(!has_job_conf_) << Error::JobConfRepeatedSetError();
  has_job_conf_ = true;
  CHECK_EQ_OR_RETURN(job_->job_conf().job_name(), job_conf.job_name())
      << Error::JobNameNotEqualError() << "job name you set: " << job_conf.job_name()
      << " not equal to origin job name: " << job_->job_conf().job_name();
  job_->mutable_job_conf()->CopyFrom(job_conf);
  CHECK_ISNULL_OR_RETURN(Global<JobDesc>::Get());
  Global<JobDesc>::New(job_conf, job_id_);
  return Maybe<void>::Ok();
}
           
  • 如果你對上一篇還有點印象的話,會記得,在 CurJobAddOp 之前,需要調用 SetJobConf。下面是運作 lenet_model.py 的 JobConf。是以 SetJobConf 會對 JobBuildAndInferCtx 的 job_ 成員進行初始化。
job_name: "train_job"
train_conf {
}
           
  • 設定了 JobConf 之後,就可以進行添加算子,調用 CurJobAddOp,然後調用 AddAndInferConsistentOp,最後到了 AddAndInferOp。AddAndInferOp 特别長,主要是進行一些 SBP 屬性的推理,對于 SBP 我完全是不懂呢,是以先跳過。總之這個 AddAndInferOp 裡面調用了一個重要的方法:AddOpAndUpdateJobParallelViewConf。正如名字暗示的那樣,AddOp!
// oneflow/core/job/job_build_and_infer_ctx.cpp: 546
Maybe<OpAttribute> JobBuildAndInferCtx::AddAndInferConsistentOp(const OperatorConf& op_conf) {
  CHECK_OR_RETURN(op_conf.has_scope_symbol_id());
  const auto& scope = Global<symbol::Storage<Scope>>::Get()->Get(op_conf.scope_symbol_id());
  const auto& parallel_desc = *JUST(scope.GetParallelDesc(op_conf));
  const auto* job_desc = JUST(scope.job_desc());
  return AddAndInferOp(op_conf, parallel_desc.parallel_conf(), job_desc, false);
}

// TODO(): add handle error of same interface op blob between jobs
Maybe<OpAttribute> JobBuildAndInferCtx::AddAndInferOp(const OperatorConf& op_conf,
                                                      const ParallelConf& origin_parallel_conf,
                                                      const JobDesc* job_desc,
                                                      bool is_mirrored_parallel_view) {
  // ...
  AddOpAndUpdateJobParallelViewConf(*new_op_conf, parallel_desc, nd_sbp_sig_conf,
                                    is_mirrored_parallel_view);
  // ...
  return op->GetOpAttributeWithoutOpNameAndLbn();
}
           
  • AddOpAndUpdateJobParallelViewConf 這個方法的最後一行,将 Op 添加到了 job_ 裡面。于是呢,在我們調用 Complete 的時候,我們可以找到這個 Op,找到整個計算圖。
// oneflow/core/job/job_build_and_infer_ctx.cpp: 176
void JobBuildAndInferCtx::AddOpAndUpdateJobParallelViewConf(
    const OperatorConf& operator_conf, const ParallelDesc& parallel_desc,
    const cfg::NdSbpSignature& nd_sbp_signature, bool is_mirrored_parallel_view) const {
  auto* op_name2sbp_sig =
      job_->mutable_job_parallel_view_conf()->mutable_op_name2sbp_signature_conf();
  auto* op_name2nd_sbp_sig =
      job_->mutable_job_parallel_view_conf()->mutable_op_name2nd_sbp_signature_conf();
  if (nd_sbp_signature.bn_in_op2nd_sbp().size() > 0) {
    nd_sbp_signature.ToProto(&(*op_name2nd_sbp_sig)[operator_conf.name()]);
    if (parallel_desc.hierarchy()->NumAxes() == 1) {
      cfg::SbpSignature sbp_signature;
      NdSbpSignatureToSbpSignature(nd_sbp_signature, &sbp_signature);
      sbp_signature.ToProto(&(*op_name2sbp_sig)[operator_conf.name()]);
    }
  }
  auto* op_name2is_mirrored_parallel_view =
      job_->mutable_job_parallel_view_conf()->mutable_op_name2is_mirrored_parallel_view();
  if (is_mirrored_parallel_view) {
    (*op_name2is_mirrored_parallel_view)[operator_conf.name()] = true;
  }
  job_->mutable_net()->add_op()->CopyFrom(operator_conf);
}
           

proto 定義

Job

前面的 job_ 其實就是一個 Proto 的 message,不妨将它的定義找來看一看。一個 Job 由 5 個部分組成,其中 DLNetConf 就是計算圖,JobConfigProto 表示這個 job 的配置(train 或者 predict)。

// oneflow/core/job/job.proto: 29
message Job {
  optional DLNetConf net = 1;
  optional Placement placement = 2;
  required JobConfigProto job_conf = 3;
  optional JobParallelViewConf job_parallel_view_conf = 4;
  optional JobHelperConf helper = 5;
}
           

DLNetConf

DLNetConf 是一個由 OperatorConf 構成的計算圖。

message DLNetConf {
  repeated OperatorConf op = 1;
}
           

OperatorConf

OperatorConf 定義了一個算子,算子的名字,算子的裝置等資訊。

message OperatorConf {
  required string name = 1;
  optional string device_tag = 4 [default = "invalid_device"];
  repeated string ctrl_in_op_name = 7;
  optional int64 scope_symbol_id = 8;
  optional uint32 stream_index_hint = 9;
  optional string pass_tag = 10;
  oneof op_type {
      // ...
  }
}
           

Complete

既然我們知道了 Job 是怎麼來的,它長什麼樣,那麼我們就可以開始嘗試着将它畫出來!每個 Pass 之後就畫一張圖看看。OneFlow 中提供了 Graph 基類,裡面不僅僅有一些常用的圖算法,還有可視化,可以将圖轉為 .dot 檔案。OneFlow 中還提供了一個 OpGraph 類,它接收一個 Job 對象,然後可以調用可視化的方法。

初始計算圖

Complete 之前的計算圖,就是使用者定義的那樣,按照 Job 函數裡面的算子進行構圖,調用 CurJobAddOp 加入到 Job 對象裡面。下面就是在 Complete 開始之前可視化出來的計算圖。文末附有訓練腳本,這次為了友善檢視,選擇了隻有幾個節點的 mlp 識别手寫字型模型。

OneFlow: 從 Op 到 Job

最終計算圖

在 Complete 中的 Pass 中,并不是所有的 Pass 都會改寫計算圖,有的計算圖可能因為沒有配置,是以不執行。目前肉眼可見對計算圖改變最大的 Pass 是:GenerateBackwardAndOptimizerOpConfs。這個 Pass 會生成後向算子和優化器節點。

OneFlow: 從 Op 到 Job

損失函數節點

損失函數節點,接受 Input_1 和 dense2-bias_add 兩個節點的輸出,Input_1 是标簽,dense2-bias_add 是前向傳播計算的結果。用這兩個節點的值就可以計算出損失。在損失函數節點後面,有三個節點,一個是 Return_4,這個節點輸出損失,還有另外兩個節點用來輸出損失函數的梯度,這兩個節點的值我猜想應該是 1。

OneFlow: 從 Op 到 Job

附:mlp 識别手寫字型

from oneflow.compatible import single_client as flow
from oneflow.compatible.single_client import typing as tp
import numpy as np

BATCH_SIZE = 100
flow.enable_eager_execution(False)


def mlp_model(images, labels, train=True):
    # [batch_size, image_sizes] -> [batch_size, pixels]
    # reshape = flow.reshape(images, [images.shape[0], -1])
    reshape = flow.flatten(images, start_dim=1)

    # dense, [batch_size, pixels] -> [batch_size, 500]
    initializer1 = flow.random_uniform_initializer(-1 / 28.0, 1 / 28.0)
    hidden = flow.layers.dense(
        reshape,
        500,
        activation=flow.nn.relu,
        kernel_initializer=initializer1,
        bias_initializer=initializer1,
        name="dense1"
    )

    # dense, [batch_size, 500] -> [batch_size, logits]
    initializer2 = flow.random_uniform_initializer(
        -np.sqrt(1 / 500.0), np.sqrt(1 / 500.0)
    )
    logits = flow.layers.dense(
        hidden,
        10,
        kernel_initializer=initializer2,
        bias_initializer=initializer2,
        name="dense2"
    )

    if train:
        loss = flow.nn.sparse_softmax_cross_entropy_with_logits(labels, logits)
        return loss
    else:
        return logits


@flow.global_function(type="train")
def train_job(
        images: tp.Numpy.Placeholder((BATCH_SIZE, 1, 28, 28), dtype=flow.float),
        labels: tp.Numpy.Placeholder((BATCH_SIZE,), dtype=flow.int32),
) -> tp.Numpy:
    with flow.scope.placement("gpu", "0:0"):
        loss = mlp_model(images, labels)

    flow.optimizer.Adam(
        flow.optimizer.PiecewiseConstantScheduler([], [0.001])
    ).minimize(loss)
    return loss


if __name__ == '__main__':
    (train_images, train_labels), (test_images, test_labels) = flow.data.load_mnist(
        BATCH_SIZE, BATCH_SIZE
    )

    for epoch in range(20):
        for i, (images, labels) in enumerate(zip(train_images, train_labels)):
            loss = train_job(images, labels)
            if i % 20 == 0:
                print("Epoch [{}/{}], Loss: {:.4f}".format(epoch + 1, 20, loss.mean()))
    flow.checkpoint.save("./mlp_model")