参考文章
https://www.cnblogs.com/heimazaifei/p/12435875.html
写的很好,他的另一个C++多线程
https://www.cnblogs.com/heimazaifei/p/12176724.html
自己的理解
ThreadPool::ThreadPool(int num_threads) {
absl::MutexLock locker(&mutex_);
for (int i = 0; i != num_threads; ++i) {
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}
上面这段代码在初始化的时候每个线程都执行这函数ThreadPool::DoWork();这个函数也很给力,是个死循环,直到析构才返回,只要task_queue有任务,就执行操作。这不很好吗?
void ThreadPool::DoWork() {
#ifdef __linux__
// This changes the per-thread nice level of the current thread on Linux. We
// do this so that the background work done by the thread pool is not taking
// away CPU resources from more important foreground threads.
CHECK_NE(nice(10), -1);
#endif
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return !task_queue_.empty() || !running_;
};
for (;;) {
std::shared_ptr<Task> task;
{
absl::MutexLock locker(&mutex_);
mutex_.Await(absl::Condition(&predicate));
if (!task_queue_.empty()) {
task = std::move(task_queue_.front());
task_queue_.pop_front();
} else if (!running_) {
return;
}
}
CHECK(task);
CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
Execute(task.get());
}
}
这么简单? 那当然也不符合咱的气质,这里Task不是想执行就执行的,他是有状态和顺序的,如下
一个主任务(Task)还有很多任务依赖(dependent_tasks_),task_queue_是可以直接执行的任务了,tasks_not_ready_是暂时不能执行的任务。直到dependent_tasks_都完成了,Task才能执行。
std::deque<std::shared_ptr<Task>> task_queue_ GUARDED_BY(mutex_);
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_
我们来看下cartographer是代码如何实现的
先来看这个,每个可执行的任务执行后,会调用dependent_task->OnDependenyCompleted();,意思就是我我任务完成了,你可以继续执行下一步任务了(打个比方,啊我一年级读完了,可以读二年级的任务了)
void Task::Execute() {
{
absl::MutexLock locker(&mutex_);
CHECK_EQ(state_, DEPENDENCIES_COMPLETED);
state_ = RUNNING;
}
// Execute the work item.
if (work_item_) {
work_item_();
}
absl::MutexLock locker(&mutex_);
state_ = COMPLETED;
for (Task* dependent_task : dependent_tasks_) {
dependent_task->OnDependenyCompleted();
}
}
继续看下OnDependenyCompleted个函数,如果uncompleted_dependencies_为0,则执行thread_pool_to_notify_->NotifyDependenciesCompleted(this);,这个是干什么的呢?
void Task::OnDependenyCompleted() {
absl::MutexLock locker(&mutex_);
CHECK(state_ == NEW || state_ == DISPATCHED);
--uncompleted_dependencies_;
if (uncompleted_dependencies_ == 0 && state_ == DISPATCHED) {
state_ = DEPENDENCIES_COMPLETED;
CHECK(thread_pool_to_notify_);
thread_pool_to_notify_->NotifyDependenciesCompleted(this);
}
}
继续跟进NotifyDependenciesCompleted,明白了就是把任务添加进来task_queue_.push_back(it->second);
void ThreadPool::NotifyDependenciesCompleted(Task* task) {
absl::MutexLock locker(&mutex_);
auto it = tasks_not_ready_.find(task);
CHECK(it != tasks_not_ready_.end());
task_queue_.push_back(it->second);
tasks_not_ready_.erase(it);
}
好了,我们现在已经明白他的实现原理了。
继续说一下,cartographer都并行执行了哪些函数,哪些其实并没有并行,
根据上面介绍,我们找关键字thread_pool_->Schedule
第一个在线程池中运行的函数DrainWorkQueue,这个函数不断处理AddWorkItem增加的任务,直到返回类型是WorkItem::Result::kRunOptimization
work_queue_ = absl::make_unique<WorkQueue>();
auto task = absl::make_unique<common::Task>();
task->SetWorkItem([this]() { DrainWorkQueue(); });
thread_pool_->Schedule(std::move(task));
第二个函数WhenDone,这个函数其实调用了RunWhenDoneCallback,而RunWhenDoneCallback又调用了HandleWorkQueue,所以运行HandleWorkQueue是另外起一个线程执行的。我感觉一直在一个线程里运行的写法也没问题。
when_done_task_->SetWorkItem([this] { RunWhenDoneCallback(); });
thread_pool_->Schedule(std::move(when_done_task_));
第三个函数在函数MaybeAddConstraint里
auto constraint_task = absl::make_unique<common::Task>();
constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
ComputeConstraint(submap_id, submap, node_id, false, /* match_full_submap */
constant_data, initial_relative_pose, *scan_matcher,
constraint);
});
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
第四个函数在函数MaybeAddGlobalConstraint里
auto constraint_task = absl::make_unique<common::Task>();
constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
ComputeConstraint(submap_id, submap, node_id, true, /* match_full_submap */
constant_data, transform::Rigid2d::Identity(),
*scan_matcher, constraint);
});
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
第五个函数在函数NotifyEndOfNode里
CHECK(finish_node_task_ != nullptr);
finish_node_task_->SetWorkItem([this] {
absl::MutexLock locker(&mutex_);
++num_finished_nodes_;
});
auto finish_node_task_handle =
thread_pool_->Schedule(std::move(finish_node_task_));
第六个函数在函数DispatchScanMatcherConstruction里
auto scan_matcher_task = absl::make_unique<common::Task>();
scan_matcher_task->SetWorkItem(
[&submap_scan_matcher, &scan_matcher_options]() {
submap_scan_matcher.fast_correlative_scan_matcher =
absl::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
*submap_scan_matcher.grid, scan_matcher_options);
});
submap_scan_matcher.creation_task_handle =
thread_pool_->Schedule(std::move(scan_matcher_task));
好了这里看一下work_queue_里面都AddWorkItem了那些函数
这里列一下
解释:为Node添加约束
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
return ComputeConstraintsForNode(node_id, insertion_submaps,
newly_finished_submap);
});
解释:增加Imu数据
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解释:增加里程计数据
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddOdometryData(trajectory_id, odometry_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解释:增加GPS数据
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddFixedFramePoseData(trajectory_id,
fixed_frame_pose_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解释:增加landmark数据
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
for (const auto& observation : landmark_data.landmark_observations) {
data_.landmark_nodes[observation.id].landmark_observations.emplace_back(
PoseGraphInterface::LandmarkNode::LandmarkObservation{
trajectory_id, landmark_data.time,
observation.landmark_to_tracking_transform,
observation.translation_weight, observation.rotation_weight});
}
}
return WorkItem::Result::kDoNotRunOptimization;
});
解释:删除路径
AddWorkItem([this, trajectory_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
CHECK(data_.trajectories_state.at(trajectory_id).state !=
TrajectoryState::ACTIVE);
CHECK(data_.trajectories_state.at(trajectory_id).state !=
TrajectoryState::DELETED);
CHECK(data_.trajectories_state.at(trajectory_id).deletion_state ==
InternalTrajectoryState::DeletionState::SCHEDULED_FOR_DELETION);
data_.trajectories_state.at(trajectory_id).deletion_state =
InternalTrajectoryState::DeletionState::WAIT_FOR_DELETION;
return WorkItem::Result::kDoNotRunOptimization;
});
解释:结束路径
AddWorkItem([this, trajectory_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
CHECK(!IsTrajectoryFinished(trajectory_id));
data_.trajectories_state[trajectory_id].state = TrajectoryState::FINISHED;
for (const auto& submap : data_.submap_data.trajectory(trajectory_id)) {
data_.submap_data.at(submap.id).state = SubmapState::kFinished;
}
return WorkItem::Result::kRunOptimization;
});
解释:冻结路径
AddWorkItem([this, trajectory_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
CHECK(!IsTrajectoryFrozen(trajectory_id));
// Connect multiple frozen trajectories among each other.
// This is required for localization against multiple frozen trajectories
// because we lose inter-trajectory constraints when freezing.
for (const auto& entry : data_.trajectories_state) {
const int other_trajectory_id = entry.first;
if (!IsTrajectoryFrozen(other_trajectory_id)) {
continue;
}
if (data_.trajectory_connectivity_state.TransitivelyConnected(
trajectory_id, other_trajectory_id)) {
// Already connected, nothing to do.
continue;
}
data_.trajectory_connectivity_state.Connect(
trajectory_id, other_trajectory_id, common::FromUniversal(0));
}
data_.trajectories_state[trajectory_id].state = TrajectoryState::FROZEN;
return WorkItem::Result::kDoNotRunOptimization;
});
解释:从文件加载子图数据
AddWorkItem(
[this, submap_id, global_submap_pose_2d]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
data_.submap_data.at(submap_id).state = SubmapState::kFinished;
optimization_problem_->InsertSubmap(submap_id, global_submap_pose_2d);
return WorkItem::Result::kDoNotRunOptimization;
});
解释:从文件加载Node数据
AddWorkItem([this, node_id, global_pose]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
const auto& constant_data =
data_.trajectory_nodes.at(node_id).constant_data;
const auto gravity_alignment_inverse = transform::Rigid3d::Rotation(
constant_data->gravity_alignment.inverse());
optimization_problem_->InsertTrajectoryNode(
node_id,
optimization::NodeSpec2D{
constant_data->time,
transform::Project2D(constant_data->local_pose *
gravity_alignment_inverse),
transform::Project2D(global_pose * gravity_alignment_inverse),
constant_data->gravity_alignment});
return WorkItem::Result::kDoNotRunOptimization;
});
解释:从文件设置trajectory_data
AddWorkItem([this, trajectory_id, trajectory_data]()
LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->SetTrajectoryData(
trajectory_id, trajectory_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解释:从加载文件设置Node和submap之间的关系
AddWorkItem([this, node_id, submap_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(submap_id.trajectory_id)) {
data_.submap_data.at(submap_id).node_ids.insert(node_id);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解释:从加载文件设置Node和submap之阿的约束
AddWorkItem([this, constraints]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
for (const auto& constraint : constraints) {
CHECK(data_.trajectory_nodes.Contains(constraint.node_id));
CHECK(data_.submap_data.Contains(constraint.submap_id));
CHECK(data_.trajectory_nodes.at(constraint.node_id).constant_data !=
nullptr);
CHECK(data_.submap_data.at(constraint.submap_id).submap != nullptr);
switch (constraint.tag) {
case Constraint::Tag::INTRA_SUBMAP:
CHECK(data_.submap_data.at(constraint.submap_id)
.node_ids.emplace(constraint.node_id)
.second);
break;
case Constraint::Tag::INTER_SUBMAP:
UpdateTrajectoryConnectivity(constraint);
break;
}
const Constraint::Pose pose = {
constraint.pose.zbar_ij *
transform::Rigid3d::Rotation(
data_.trajectory_nodes.at(constraint.node_id)
.constant_data->gravity_alignment.inverse()),
constraint.pose.translation_weight, constraint.pose.rotation_weight};
data_.constraints.push_back(Constraint{
constraint.submap_id, constraint.node_id, pose, constraint.tag});
}
LOG(INFO) << "Loaded " << constraints.size() << " constraints.";
return WorkItem::Result::kDoNotRunOptimization;
});
解释:增加路径修剪器
AddWorkItem([this, trimmer_ptr]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
trimmers_.emplace_back(trimmer_ptr);
return WorkItem::Result::kDoNotRunOptimization;
});
解释:运行全局优化函数
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
return WorkItem::Result::kRunOptimization;
});
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
解释:设置landmark位姿
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
data_.landmark_nodes[landmark_id].global_landmark_pose = global_pose;
data_.landmark_nodes[landmark_id].frozen = frozen;
return WorkItem::Result::kDoNotRunOptimization;
});
好了,让我也消化一下。