map_builder_server_main.cc 還是Run函數第一句
cartographer::cloud::Run(FLAGS_configuration_directory,
FLAGS_configuration_basename);
繼續跟進,Run函數裡構造了MapBuilder,這個變量可以直接給cartographer_ros子產品使用,而在這裡成為了MapBuilderServer的成員變量。
void Run(const std::string& configuration_directory,
const std::string& configuration_basename) {
#if USE_PROMETHEUS
metrics::prometheus::FamilyFactory registry;
::cartographer::metrics::RegisterAllMetrics(®istry);
RegisterMapBuilderServerMetrics(®istry);
::prometheus::Exposer exposer("0.0.0.0:9100");
exposer.RegisterCollectable(registry.GetCollectable());
LOG(INFO) << "Exposing metrics at http://localhost:9100/metrics";
#endif
proto::MapBuilderServerOptions map_builder_server_options =
LoadMapBuilderServerOptions(configuration_directory,
configuration_basename);
auto map_builder = absl::make_unique<mapping::MapBuilder>(
map_builder_server_options.map_builder_options());
std::unique_ptr<MapBuilderServerInterface> map_builder_server =
CreateMapBuilderServer(map_builder_server_options,
std::move(map_builder));
map_builder_server->Start();
map_builder_server->WaitForShutdown();
}
CreateMapBuilderServer函數跟進
std::unique_ptr<MapBuilderServerInterface> CreateMapBuilderServer(
const proto::MapBuilderServerOptions& map_builder_server_options,
std::unique_ptr<mapping::MapBuilderInterface> map_builder) {
return absl::make_unique<MapBuilderServer>(map_builder_server_options,
std::move(map_builder));
}
可以看到這裡我們就建立了MapBuilderServer對象,我們來看下它的構造函數,可以看到主要是設定grpc_server的參數,待詳細考究,比如SetNumEventThreads、SetExecutionContext的含義及作用。然後還使用SetGlobalSlamOptimizationCallback設定了全局優化回調函數。
MapBuilderServer::MapBuilderServer(
const proto::MapBuilderServerOptions& map_builder_server_options,
std::unique_ptr<mapping::MapBuilderInterface> map_builder)
: map_builder_(std::move(map_builder)) {
async_grpc::Server::Builder server_builder;
server_builder.SetServerAddress(map_builder_server_options.server_address());
server_builder.SetNumGrpcThreads(
map_builder_server_options.num_grpc_threads());
server_builder.SetNumEventThreads(
map_builder_server_options.num_event_threads());
server_builder.SetMaxSendMessageSize(kMaxMessageSize);
server_builder.SetMaxReceiveMessageSize(kMaxMessageSize);
if (!map_builder_server_options.uplink_server_address().empty()) {
local_trajectory_uploader_ = CreateLocalTrajectoryUploader(
map_builder_server_options.uplink_server_address(),
map_builder_server_options.upload_batch_size(),
map_builder_server_options.enable_ssl_encryption(),
map_builder_server_options.enable_google_auth());
}
server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();
server_builder.RegisterHandler<handlers::AddImuDataHandler>();
server_builder.RegisterHandler<handlers::AddRangefinderDataHandler>();
server_builder.RegisterHandler<handlers::AddFixedFramePoseDataHandler>();
server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
server_builder.RegisterHandler<handlers::DeleteTrajectoryHandler>();
server_builder.RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
server_builder.RegisterHandler<handlers::GetSubmapHandler>();
server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
server_builder.RegisterHandler<handlers::GetTrajectoryStatesHandler>();
server_builder.RegisterHandler<handlers::GetLandmarkPosesHandler>();
server_builder.RegisterHandler<handlers::GetAllSubmapPosesHandler>();
server_builder.RegisterHandler<handlers::GetLocalToGlobalTransformHandler>();
server_builder.RegisterHandler<handlers::GetConstraintsHandler>();
server_builder.RegisterHandler<handlers::IsTrajectoryFinishedHandler>();
server_builder.RegisterHandler<handlers::IsTrajectoryFrozenHandler>();
server_builder.RegisterHandler<handlers::LoadStateHandler>();
server_builder.RegisterHandler<handlers::LoadStateFromFileHandler>();
server_builder.RegisterHandler<handlers::RunFinalOptimizationHandler>();
server_builder.RegisterHandler<handlers::WriteStateHandler>();
server_builder.RegisterHandler<handlers::WriteStateToFileHandler>();
server_builder.RegisterHandler<handlers::SetLandmarkPoseHandler>();
grpc_server_ = server_builder.Build();
if (map_builder_server_options.map_builder_options()
.use_trajectory_builder_2d()) {
grpc_server_->SetExecutionContext(
absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
} else if (map_builder_server_options.map_builder_options()
.use_trajectory_builder_3d()) {
grpc_server_->SetExecutionContext(
absl::make_unique<MapBuilderContext<mapping::Submap3D>>(this));
} else {
LOG(FATAL)
<< "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
}
map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
[this](const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
OnGlobalSlamOptimizations(last_optimized_submap_ids,
last_optimized_node_ids);
});
}
然後執行MapBuilderServer::Start()函數
void MapBuilderServer::Start() {
shutting_down_ = false;
if (local_trajectory_uploader_) {
local_trajectory_uploader_->Start();
}
StartSlamThread();
grpc_server_->Start();
}
繼續跟進StartSlamThread()函數
void MapBuilderServer::StartSlamThread() {
CHECK(!slam_thread_);
// Start the SLAM processing thread.
slam_thread_ = absl::make_unique<std::thread>(
[this]() { this->ProcessSensorDataQueue(); });
}
就是建立了一個線程執行ProcessSensorDataQueue函數,如下,這個函數不斷把接收的資料傳進來
void MapBuilderServer::ProcessSensorDataQueue() {
LOG(INFO) << "Starting SLAM thread.";
while (!shutting_down_) {
kIncomingDataQueueMetric->Set(incoming_data_queue_.Size());
std::unique_ptr<MapBuilderContextInterface::Data> sensor_data =
incoming_data_queue_.PopWithTimeout(kPopTimeout);
if (sensor_data) {
grpc_server_->GetContext<MapBuilderContextInterface>()
->AddSensorDataToTrajectory(*sensor_data);
}
}
}
繼續跟進這個函數AddSensorDataToTrajectory():
template <class SubmapType>
void MapBuilderContext<SubmapType>::AddSensorDataToTrajectory(
const Data& sensor_data) {
sensor_data.data->AddToTrajectoryBuilder(
map_builder_server_->map_builder_->GetTrajectoryBuilder(
sensor_data.trajectory_id));
}
可以看到這裡和之前的傳入方式就保持一緻了。
這裡把之前的方式也放在這裡data傳入wrapped_trajectory_builder_
void CollatedTrajectoryBuilder::HandleCollatedSensorData(
const std::string& sensor_id, std::unique_ptr<sensor::Data> data) {
auto it = rate_timers_.find(sensor_id);
if (it == rate_timers_.end()) {
it = rate_timers_
.emplace(
std::piecewise_construct, std::forward_as_tuple(sensor_id),
std::forward_as_tuple(
common::FromSeconds(kSensorDataRatesLoggingPeriodSeconds)))
.first;
}
it->second.Pulse(data->GetTime());
if (std::chrono::steady_clock::now() - last_logging_time_ >
common::FromSeconds(kSensorDataRatesLoggingPeriodSeconds)) {
for (const auto& pair : rate_timers_) {
LOG(INFO) << pair.first << " rate: " << pair.second.DebugString();
}
last_logging_time_ = std::chrono::steady_clock::now();
}
data->AddToTrajectoryBuilder(wrapped_trajectory_builder_.get());
}
題外:
在MapBuilderServer的構造函數中我們看到這樣一句話,這是幹什麼用的呢?
grpc_server_->SetExecutionContext(
absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
我們看下MapBuilderContext的定義
template <class SubmapType>
class MapBuilderContext : public MapBuilderContextInterface {
public:
MapBuilderContext(MapBuilderServer* map_builder_server);
mapping::MapBuilderInterface& map_builder() override;
common::BlockingQueue<std::unique_ptr<MapBuilderContextInterface::Data>>&
sensor_data_queue() override;
mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
GetLocalSlamResultCallbackForSubscriptions() override;
void AddSensorDataToTrajectory(const Data& sensor_data) override;
MapBuilderContextInterface::LocalSlamSubscriptionId SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback) override;
void UnsubscribeLocalSlamResults(
const LocalSlamSubscriptionId& subscription_id) override;
int SubscribeGlobalSlamOptimizations(
GlobalSlamOptimizationCallback callback) override;
void UnsubscribeGlobalSlamOptimizations(int subscription_index) override;
void NotifyFinishTrajectory(int trajectory_id) override;
LocalTrajectoryUploaderInterface* local_trajectory_uploader() override;
void EnqueueSensorData(int trajectory_id,
std::unique_ptr<sensor::Data> data) override;
void EnqueueLocalSlamResultData(int trajectory_id,
const std::string& sensor_id,
const mapping::proto::LocalSlamResultData&
local_slam_result_data) override;
void RegisterClientIdForTrajectory(const std::string& client_id,
int trajectory_id) override;
bool CheckClientIdForTrajectory(const std::string& client_id,
int trajectory_id) override;
private:
MapBuilderServer* map_builder_server_;
mapping::SubmapController<SubmapType> submap_controller_;
std::map</*trajectory_id=*/int, /*client_id=*/std::string> client_ids_;
};
似乎沒發現什麼,我們再看下MapBuilderContextInterface的定義
class MapBuilderContextInterface : public async_grpc::ExecutionContext {
public:
struct LocalSlamResult {
int trajectory_id;
common::Time time;
transform::Rigid3d local_pose;
std::shared_ptr<const sensor::RangeData> range_data;
std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
insertion_result;
};
// Calling with 'nullptr' signals subscribers that the subscription has ended,
// e.g. this happens when the corresponding trajectory was finished and hence
// no more local SLAM updates will occur.
// The callback can return 'false' to indicate that the client is not
// interested in more local SLAM updates and 'MapBuilderServer' will end the
// subscription.
using LocalSlamSubscriptionCallback =
std::function<bool(std::unique_ptr<LocalSlamResult>)>;
// The callback can return 'false' to indicate that the client is not
// interested in more global SLAM runs and 'MapBuilderServer' will end the
// subscription.
using GlobalSlamOptimizationCallback = std::function<bool(
const std::map<int /* trajectory_id */, mapping::SubmapId>&,
const std::map<int /* trajectory_id */, mapping::NodeId>&)>;
struct Data {
int trajectory_id;
std::unique_ptr<sensor::Data> data;
};
struct LocalSlamSubscriptionId {
const int trajectory_id;
const int subscription_index;
};
我們看到MapBuilderContextInterface是async_grpc::ExecutionContext類型,那麼這個類型資料有什麼用呢?我們繼續跟進ExecutionContext
// Implementations of this class allow RPC handlers to share state among one
// another. Using Server::SetExecutionContext(...) a server-wide
// 'ExecutionContext' can be specified. This 'ExecutionContext' can be retrieved
// by all implementations of 'RpcHandler' by calling
// 'RpcHandler::GetContext<MyContext>()'.
class ExecutionContext {
public:
// Automatically locks an ExecutionContext for shared use by RPC handlers.
// This non-movable, non-copyable class is used to broker access from various
// RPC handlers to the shared 'ExecutionContext'.
template <typename ContextType>
class Synchronized {
public:
ContextType* operator->() {
return static_cast<ContextType*>(execution_context_);
}
Synchronized(common::Mutex* lock, ExecutionContext* execution_context)
: locker_(lock), execution_context_(execution_context) {}
Synchronized(const Synchronized&) = delete;
Synchronized(Synchronized&&) = delete;
private:
common::MutexLocker locker_;
ExecutionContext* execution_context_;
};
ExecutionContext() = default;
virtual ~ExecutionContext() = default;
ExecutionContext(const ExecutionContext&) = delete;
ExecutionContext& operator=(const ExecutionContext&) = delete;
common::Mutex* lock() { return &lock_; }
private:
common::Mutex lock_;
};
翻譯上面的解釋:
此類的實作允許RPC處理程式彼此共享狀态。使用Server::SetExecutionContext(…)可以指定伺服器範圍的“ExecutionContext”。“RpcHandler”的所有實作都可以通過調用“RpcHandler::GetContext()”來檢索此“ExecutionContext”。
我的了解是一種資料變量可以使用SetExecutionContext制定,并在RPC函數中共享,隻要通過GetContext調用就可以傳回變量内容。