天天看點

cartographer_grpc_server代碼流程分析

map_builder_server_main.cc 還是Run函數第一句

cartographer::cloud::Run(FLAGS_configuration_directory,
                           FLAGS_configuration_basename);
           

繼續跟進,Run函數裡構造了MapBuilder,這個變量可以直接給cartographer_ros子產品使用,而在這裡成為了MapBuilderServer的成員變量。

void Run(const std::string& configuration_directory,
         const std::string& configuration_basename) {
#if USE_PROMETHEUS
  metrics::prometheus::FamilyFactory registry;
  ::cartographer::metrics::RegisterAllMetrics(&registry);
  RegisterMapBuilderServerMetrics(&registry);
  ::prometheus::Exposer exposer("0.0.0.0:9100");
  exposer.RegisterCollectable(registry.GetCollectable());
  LOG(INFO) << "Exposing metrics at http://localhost:9100/metrics";
#endif

  proto::MapBuilderServerOptions map_builder_server_options =
      LoadMapBuilderServerOptions(configuration_directory,
                                  configuration_basename);
  auto map_builder = absl::make_unique<mapping::MapBuilder>(
      map_builder_server_options.map_builder_options());
  std::unique_ptr<MapBuilderServerInterface> map_builder_server =
      CreateMapBuilderServer(map_builder_server_options,
                             std::move(map_builder));
  map_builder_server->Start();
  map_builder_server->WaitForShutdown();
}
           

CreateMapBuilderServer函數跟進

std::unique_ptr<MapBuilderServerInterface> CreateMapBuilderServer(
    const proto::MapBuilderServerOptions& map_builder_server_options,
    std::unique_ptr<mapping::MapBuilderInterface> map_builder) {
  return absl::make_unique<MapBuilderServer>(map_builder_server_options,
                                             std::move(map_builder));
}
           

可以看到這裡我們就建立了MapBuilderServer對象,我們來看下它的構造函數,可以看到主要是設定grpc_server的參數,待詳細考究,比如SetNumEventThreads、SetExecutionContext的含義及作用。然後還使用SetGlobalSlamOptimizationCallback設定了全局優化回調函數。

MapBuilderServer::MapBuilderServer(
    const proto::MapBuilderServerOptions& map_builder_server_options,
    std::unique_ptr<mapping::MapBuilderInterface> map_builder)
    : map_builder_(std::move(map_builder)) {
  async_grpc::Server::Builder server_builder;
  server_builder.SetServerAddress(map_builder_server_options.server_address());
  server_builder.SetNumGrpcThreads(
      map_builder_server_options.num_grpc_threads());
  server_builder.SetNumEventThreads(
      map_builder_server_options.num_event_threads());
  server_builder.SetMaxSendMessageSize(kMaxMessageSize);
  server_builder.SetMaxReceiveMessageSize(kMaxMessageSize);
  if (!map_builder_server_options.uplink_server_address().empty()) {
    local_trajectory_uploader_ = CreateLocalTrajectoryUploader(
        map_builder_server_options.uplink_server_address(),
        map_builder_server_options.upload_batch_size(),
        map_builder_server_options.enable_ssl_encryption(),
        map_builder_server_options.enable_google_auth());
  }
  server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
  server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();
  server_builder.RegisterHandler<handlers::AddImuDataHandler>();
  server_builder.RegisterHandler<handlers::AddRangefinderDataHandler>();
  server_builder.RegisterHandler<handlers::AddFixedFramePoseDataHandler>();
  server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
  server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
  server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
  server_builder.RegisterHandler<handlers::DeleteTrajectoryHandler>();
  server_builder.RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
  server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
  server_builder.RegisterHandler<handlers::GetSubmapHandler>();
  server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
  server_builder.RegisterHandler<handlers::GetTrajectoryStatesHandler>();
  server_builder.RegisterHandler<handlers::GetLandmarkPosesHandler>();
  server_builder.RegisterHandler<handlers::GetAllSubmapPosesHandler>();
  server_builder.RegisterHandler<handlers::GetLocalToGlobalTransformHandler>();
  server_builder.RegisterHandler<handlers::GetConstraintsHandler>();
  server_builder.RegisterHandler<handlers::IsTrajectoryFinishedHandler>();
  server_builder.RegisterHandler<handlers::IsTrajectoryFrozenHandler>();
  server_builder.RegisterHandler<handlers::LoadStateHandler>();
  server_builder.RegisterHandler<handlers::LoadStateFromFileHandler>();
  server_builder.RegisterHandler<handlers::RunFinalOptimizationHandler>();
  server_builder.RegisterHandler<handlers::WriteStateHandler>();
  server_builder.RegisterHandler<handlers::WriteStateToFileHandler>();
  server_builder.RegisterHandler<handlers::SetLandmarkPoseHandler>();
  grpc_server_ = server_builder.Build();
  if (map_builder_server_options.map_builder_options()
          .use_trajectory_builder_2d()) {
    grpc_server_->SetExecutionContext(
        absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
  } else if (map_builder_server_options.map_builder_options()
                 .use_trajectory_builder_3d()) {
    grpc_server_->SetExecutionContext(
        absl::make_unique<MapBuilderContext<mapping::Submap3D>>(this));
  } else {
    LOG(FATAL)
        << "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
  }
  map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
      [this](const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
             const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
        OnGlobalSlamOptimizations(last_optimized_submap_ids,
                                  last_optimized_node_ids);
      });
}
           

然後執行MapBuilderServer::Start()函數

void MapBuilderServer::Start() {
  shutting_down_ = false;
  if (local_trajectory_uploader_) {
    local_trajectory_uploader_->Start();
  }
  StartSlamThread();
  grpc_server_->Start();
}
           

繼續跟進StartSlamThread()函數

void MapBuilderServer::StartSlamThread() {
  CHECK(!slam_thread_);

  // Start the SLAM processing thread.
  slam_thread_ = absl::make_unique<std::thread>(
      [this]() { this->ProcessSensorDataQueue(); });
}
           

就是建立了一個線程執行ProcessSensorDataQueue函數,如下,這個函數不斷把接收的資料傳進來

void MapBuilderServer::ProcessSensorDataQueue() {
  LOG(INFO) << "Starting SLAM thread.";
  while (!shutting_down_) {
    kIncomingDataQueueMetric->Set(incoming_data_queue_.Size());
    std::unique_ptr<MapBuilderContextInterface::Data> sensor_data =
        incoming_data_queue_.PopWithTimeout(kPopTimeout);
    if (sensor_data) {
      grpc_server_->GetContext<MapBuilderContextInterface>()
          ->AddSensorDataToTrajectory(*sensor_data);
    }
  }
}
           

繼續跟進這個函數AddSensorDataToTrajectory():

template <class SubmapType>
void MapBuilderContext<SubmapType>::AddSensorDataToTrajectory(
    const Data& sensor_data) {
  sensor_data.data->AddToTrajectoryBuilder(
      map_builder_server_->map_builder_->GetTrajectoryBuilder(
          sensor_data.trajectory_id));
}
           

可以看到這裡和之前的傳入方式就保持一緻了。

這裡把之前的方式也放在這裡data傳入wrapped_trajectory_builder_

void CollatedTrajectoryBuilder::HandleCollatedSensorData(
    const std::string& sensor_id, std::unique_ptr<sensor::Data> data) {
  auto it = rate_timers_.find(sensor_id);
  if (it == rate_timers_.end()) {
    it = rate_timers_
             .emplace(
                 std::piecewise_construct, std::forward_as_tuple(sensor_id),
                 std::forward_as_tuple(
                     common::FromSeconds(kSensorDataRatesLoggingPeriodSeconds)))
             .first;
  }
  it->second.Pulse(data->GetTime());

  if (std::chrono::steady_clock::now() - last_logging_time_ >
      common::FromSeconds(kSensorDataRatesLoggingPeriodSeconds)) {
    for (const auto& pair : rate_timers_) {
      LOG(INFO) << pair.first << " rate: " << pair.second.DebugString();
    }
    last_logging_time_ = std::chrono::steady_clock::now();
  }

data->AddToTrajectoryBuilder(wrapped_trajectory_builder_.get());
}

           

題外:

在MapBuilderServer的構造函數中我們看到這樣一句話,這是幹什麼用的呢?

grpc_server_->SetExecutionContext(
        absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
           

我們看下MapBuilderContext的定義

template <class SubmapType>
class MapBuilderContext : public MapBuilderContextInterface {
 public:
  MapBuilderContext(MapBuilderServer* map_builder_server);
  mapping::MapBuilderInterface& map_builder() override;
  common::BlockingQueue<std::unique_ptr<MapBuilderContextInterface::Data>>&
  sensor_data_queue() override;
  mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
  GetLocalSlamResultCallbackForSubscriptions() override;
  void AddSensorDataToTrajectory(const Data& sensor_data) override;
  MapBuilderContextInterface::LocalSlamSubscriptionId SubscribeLocalSlamResults(
      int trajectory_id, LocalSlamSubscriptionCallback callback) override;
  void UnsubscribeLocalSlamResults(
      const LocalSlamSubscriptionId& subscription_id) override;
  int SubscribeGlobalSlamOptimizations(
      GlobalSlamOptimizationCallback callback) override;
  void UnsubscribeGlobalSlamOptimizations(int subscription_index) override;
  void NotifyFinishTrajectory(int trajectory_id) override;
  LocalTrajectoryUploaderInterface* local_trajectory_uploader() override;
  void EnqueueSensorData(int trajectory_id,
                         std::unique_ptr<sensor::Data> data) override;
  void EnqueueLocalSlamResultData(int trajectory_id,
                                  const std::string& sensor_id,
                                  const mapping::proto::LocalSlamResultData&
                                      local_slam_result_data) override;
  void RegisterClientIdForTrajectory(const std::string& client_id,
                                     int trajectory_id) override;
  bool CheckClientIdForTrajectory(const std::string& client_id,
                                  int trajectory_id) override;

 private:
  MapBuilderServer* map_builder_server_;
  mapping::SubmapController<SubmapType> submap_controller_;
  std::map</*trajectory_id=*/int, /*client_id=*/std::string> client_ids_;
};
           

似乎沒發現什麼,我們再看下MapBuilderContextInterface的定義

class MapBuilderContextInterface : public async_grpc::ExecutionContext {
 public:
  struct LocalSlamResult {
    int trajectory_id;
    common::Time time;
    transform::Rigid3d local_pose;
    std::shared_ptr<const sensor::RangeData> range_data;
    std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
        insertion_result;
  };
  // Calling with 'nullptr' signals subscribers that the subscription has ended,
  // e.g. this happens when the corresponding trajectory was finished and hence
  // no more local SLAM updates will occur.
  // The callback can return 'false' to indicate that the client is not
  // interested in more local SLAM updates and 'MapBuilderServer' will end the
  // subscription.
  using LocalSlamSubscriptionCallback =
      std::function<bool(std::unique_ptr<LocalSlamResult>)>;

  // The callback can return 'false' to indicate that the client is not
  // interested in more global SLAM runs and 'MapBuilderServer' will end the
  // subscription.
  using GlobalSlamOptimizationCallback = std::function<bool(
      const std::map<int /* trajectory_id */, mapping::SubmapId>&,
      const std::map<int /* trajectory_id */, mapping::NodeId>&)>;

  struct Data {
    int trajectory_id;
    std::unique_ptr<sensor::Data> data;
  };
  struct LocalSlamSubscriptionId {
    const int trajectory_id;
    const int subscription_index;
  };
           

我們看到MapBuilderContextInterface是async_grpc::ExecutionContext類型,那麼這個類型資料有什麼用呢?我們繼續跟進ExecutionContext

// Implementations of this class allow RPC handlers to share state among one
// another. Using Server::SetExecutionContext(...) a server-wide
// 'ExecutionContext' can be specified. This 'ExecutionContext' can be retrieved
// by all implementations of 'RpcHandler' by calling
// 'RpcHandler::GetContext<MyContext>()'.
class ExecutionContext {
 public:
  // Automatically locks an ExecutionContext for shared use by RPC handlers.
  // This non-movable, non-copyable class is used to broker access from various
  // RPC handlers to the shared 'ExecutionContext'.
  template <typename ContextType>
  class Synchronized {
   public:
    ContextType* operator->() {
      return static_cast<ContextType*>(execution_context_);
    }
    Synchronized(common::Mutex* lock, ExecutionContext* execution_context)
        : locker_(lock), execution_context_(execution_context) {}
    Synchronized(const Synchronized&) = delete;
    Synchronized(Synchronized&&) = delete;

   private:
    common::MutexLocker locker_;
    ExecutionContext* execution_context_;
  };
  ExecutionContext() = default;
  virtual ~ExecutionContext() = default;
  ExecutionContext(const ExecutionContext&) = delete;
  ExecutionContext& operator=(const ExecutionContext&) = delete;
  common::Mutex* lock() { return &lock_; }

 private:
  common::Mutex lock_;
};
           

翻譯上面的解釋:

此類的實作允許RPC處理程式彼此共享狀态。使用Server::SetExecutionContext(…)可以指定伺服器範圍的“ExecutionContext”。“RpcHandler”的所有實作都可以通過調用“RpcHandler::GetContext()”來檢索此“ExecutionContext”。

我的了解是一種資料變量可以使用SetExecutionContext制定,并在RPC函數中共享,隻要通過GetContext調用就可以傳回變量内容。