參考文章
https://www.cnblogs.com/heimazaifei/p/12435875.html
寫的很好,他的另一個C++多線程
https://www.cnblogs.com/heimazaifei/p/12176724.html
自己的了解
ThreadPool::ThreadPool(int num_threads) {
absl::MutexLock locker(&mutex_);
for (int i = 0; i != num_threads; ++i) {
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}
上面這段代碼在初始化的時候每個線程都執行這函數ThreadPool::DoWork();這個函數也很給力,是個死循環,直到析構才傳回,隻要task_queue有任務,就執行操作。這不很好嗎?
void ThreadPool::DoWork() {
#ifdef __linux__
// This changes the per-thread nice level of the current thread on Linux. We
// do this so that the background work done by the thread pool is not taking
// away CPU resources from more important foreground threads.
CHECK_NE(nice(10), -1);
#endif
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return !task_queue_.empty() || !running_;
};
for (;;) {
std::shared_ptr<Task> task;
{
absl::MutexLock locker(&mutex_);
mutex_.Await(absl::Condition(&predicate));
if (!task_queue_.empty()) {
task = std::move(task_queue_.front());
task_queue_.pop_front();
} else if (!running_) {
return;
}
}
CHECK(task);
CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
Execute(task.get());
}
}
這麼簡單? 那當然也不符合咱的氣質,這裡Task不是想執行就執行的,他是有狀态和順序的,如下
一個主任務(Task)還有很多任務依賴(dependent_tasks_),task_queue_是可以直接執行的任務了,tasks_not_ready_是暫時不能執行的任務。直到dependent_tasks_都完成了,Task才能執行。
std::deque<std::shared_ptr<Task>> task_queue_ GUARDED_BY(mutex_);
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_
我們來看下cartographer是代碼如何實作的
先來看這個,每個可執行的任務執行後,會調用dependent_task->OnDependenyCompleted();,意思就是我我任務完成了,你可以繼續執行下一步任務了(打個比方,啊我一年級讀完了,可以讀二年級的任務了)
void Task::Execute() {
{
absl::MutexLock locker(&mutex_);
CHECK_EQ(state_, DEPENDENCIES_COMPLETED);
state_ = RUNNING;
}
// Execute the work item.
if (work_item_) {
work_item_();
}
absl::MutexLock locker(&mutex_);
state_ = COMPLETED;
for (Task* dependent_task : dependent_tasks_) {
dependent_task->OnDependenyCompleted();
}
}
繼續看下OnDependenyCompleted個函數,如果uncompleted_dependencies_為0,則執行thread_pool_to_notify_->NotifyDependenciesCompleted(this);,這個是幹什麼的呢?
void Task::OnDependenyCompleted() {
absl::MutexLock locker(&mutex_);
CHECK(state_ == NEW || state_ == DISPATCHED);
--uncompleted_dependencies_;
if (uncompleted_dependencies_ == 0 && state_ == DISPATCHED) {
state_ = DEPENDENCIES_COMPLETED;
CHECK(thread_pool_to_notify_);
thread_pool_to_notify_->NotifyDependenciesCompleted(this);
}
}
繼續跟進NotifyDependenciesCompleted,明白了就是把任務添加進來task_queue_.push_back(it->second);
void ThreadPool::NotifyDependenciesCompleted(Task* task) {
absl::MutexLock locker(&mutex_);
auto it = tasks_not_ready_.find(task);
CHECK(it != tasks_not_ready_.end());
task_queue_.push_back(it->second);
tasks_not_ready_.erase(it);
}
好了,我們現在已經明白他的實作原理了。
繼續說一下,cartographer都并行執行了哪些函數,哪些其實并沒有并行,
根據上面介紹,我們找關鍵字thread_pool_->Schedule
第一個線上程池中運作的函數DrainWorkQueue,這個函數不斷處理AddWorkItem增加的任務,直到傳回類型是WorkItem::Result::kRunOptimization
work_queue_ = absl::make_unique<WorkQueue>();
auto task = absl::make_unique<common::Task>();
task->SetWorkItem([this]() { DrainWorkQueue(); });
thread_pool_->Schedule(std::move(task));
第二個函數WhenDone,這個函數其實調用了RunWhenDoneCallback,而RunWhenDoneCallback又調用了HandleWorkQueue,是以運作HandleWorkQueue是另外起一個線程執行的。我感覺一直在一個線程裡運作的寫法也沒問題。
when_done_task_->SetWorkItem([this] { RunWhenDoneCallback(); });
thread_pool_->Schedule(std::move(when_done_task_));
第三個函數在函數MaybeAddConstraint裡
auto constraint_task = absl::make_unique<common::Task>();
constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
ComputeConstraint(submap_id, submap, node_id, false, /* match_full_submap */
constant_data, initial_relative_pose, *scan_matcher,
constraint);
});
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
第四個函數在函數MaybeAddGlobalConstraint裡
auto constraint_task = absl::make_unique<common::Task>();
constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
ComputeConstraint(submap_id, submap, node_id, true, /* match_full_submap */
constant_data, transform::Rigid2d::Identity(),
*scan_matcher, constraint);
});
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
第五個函數在函數NotifyEndOfNode裡
CHECK(finish_node_task_ != nullptr);
finish_node_task_->SetWorkItem([this] {
absl::MutexLock locker(&mutex_);
++num_finished_nodes_;
});
auto finish_node_task_handle =
thread_pool_->Schedule(std::move(finish_node_task_));
第六個函數在函數DispatchScanMatcherConstruction裡
auto scan_matcher_task = absl::make_unique<common::Task>();
scan_matcher_task->SetWorkItem(
[&submap_scan_matcher, &scan_matcher_options]() {
submap_scan_matcher.fast_correlative_scan_matcher =
absl::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
*submap_scan_matcher.grid, scan_matcher_options);
});
submap_scan_matcher.creation_task_handle =
thread_pool_->Schedule(std::move(scan_matcher_task));
好了這裡看一下work_queue_裡面都AddWorkItem了那些函數
這裡列一下
解釋:為Node添加限制
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
return ComputeConstraintsForNode(node_id, insertion_submaps,
newly_finished_submap);
});
解釋:增加Imu資料
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:增加裡程計資料
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddOdometryData(trajectory_id, odometry_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:增加GPS資料
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddFixedFramePoseData(trajectory_id,
fixed_frame_pose_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:增加landmark資料
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
for (const auto& observation : landmark_data.landmark_observations) {
data_.landmark_nodes[observation.id].landmark_observations.emplace_back(
PoseGraphInterface::LandmarkNode::LandmarkObservation{
trajectory_id, landmark_data.time,
observation.landmark_to_tracking_transform,
observation.translation_weight, observation.rotation_weight});
}
}
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:删除路徑
AddWorkItem([this, trajectory_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
CHECK(data_.trajectories_state.at(trajectory_id).state !=
TrajectoryState::ACTIVE);
CHECK(data_.trajectories_state.at(trajectory_id).state !=
TrajectoryState::DELETED);
CHECK(data_.trajectories_state.at(trajectory_id).deletion_state ==
InternalTrajectoryState::DeletionState::SCHEDULED_FOR_DELETION);
data_.trajectories_state.at(trajectory_id).deletion_state =
InternalTrajectoryState::DeletionState::WAIT_FOR_DELETION;
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:結束路徑
AddWorkItem([this, trajectory_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
CHECK(!IsTrajectoryFinished(trajectory_id));
data_.trajectories_state[trajectory_id].state = TrajectoryState::FINISHED;
for (const auto& submap : data_.submap_data.trajectory(trajectory_id)) {
data_.submap_data.at(submap.id).state = SubmapState::kFinished;
}
return WorkItem::Result::kRunOptimization;
});
解釋:當機路徑
AddWorkItem([this, trajectory_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
CHECK(!IsTrajectoryFrozen(trajectory_id));
// Connect multiple frozen trajectories among each other.
// This is required for localization against multiple frozen trajectories
// because we lose inter-trajectory constraints when freezing.
for (const auto& entry : data_.trajectories_state) {
const int other_trajectory_id = entry.first;
if (!IsTrajectoryFrozen(other_trajectory_id)) {
continue;
}
if (data_.trajectory_connectivity_state.TransitivelyConnected(
trajectory_id, other_trajectory_id)) {
// Already connected, nothing to do.
continue;
}
data_.trajectory_connectivity_state.Connect(
trajectory_id, other_trajectory_id, common::FromUniversal(0));
}
data_.trajectories_state[trajectory_id].state = TrajectoryState::FROZEN;
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:從檔案加載子圖資料
AddWorkItem(
[this, submap_id, global_submap_pose_2d]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
data_.submap_data.at(submap_id).state = SubmapState::kFinished;
optimization_problem_->InsertSubmap(submap_id, global_submap_pose_2d);
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:從檔案加載Node資料
AddWorkItem([this, node_id, global_pose]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
const auto& constant_data =
data_.trajectory_nodes.at(node_id).constant_data;
const auto gravity_alignment_inverse = transform::Rigid3d::Rotation(
constant_data->gravity_alignment.inverse());
optimization_problem_->InsertTrajectoryNode(
node_id,
optimization::NodeSpec2D{
constant_data->time,
transform::Project2D(constant_data->local_pose *
gravity_alignment_inverse),
transform::Project2D(global_pose * gravity_alignment_inverse),
constant_data->gravity_alignment});
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:從檔案設定trajectory_data
AddWorkItem([this, trajectory_id, trajectory_data]()
LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->SetTrajectoryData(
trajectory_id, trajectory_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:從加載檔案設定Node和submap之間的關系
AddWorkItem([this, node_id, submap_id]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(submap_id.trajectory_id)) {
data_.submap_data.at(submap_id).node_ids.insert(node_id);
}
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:從加載檔案設定Node和submap之阿的限制
AddWorkItem([this, constraints]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
for (const auto& constraint : constraints) {
CHECK(data_.trajectory_nodes.Contains(constraint.node_id));
CHECK(data_.submap_data.Contains(constraint.submap_id));
CHECK(data_.trajectory_nodes.at(constraint.node_id).constant_data !=
nullptr);
CHECK(data_.submap_data.at(constraint.submap_id).submap != nullptr);
switch (constraint.tag) {
case Constraint::Tag::INTRA_SUBMAP:
CHECK(data_.submap_data.at(constraint.submap_id)
.node_ids.emplace(constraint.node_id)
.second);
break;
case Constraint::Tag::INTER_SUBMAP:
UpdateTrajectoryConnectivity(constraint);
break;
}
const Constraint::Pose pose = {
constraint.pose.zbar_ij *
transform::Rigid3d::Rotation(
data_.trajectory_nodes.at(constraint.node_id)
.constant_data->gravity_alignment.inverse()),
constraint.pose.translation_weight, constraint.pose.rotation_weight};
data_.constraints.push_back(Constraint{
constraint.submap_id, constraint.node_id, pose, constraint.tag});
}
LOG(INFO) << "Loaded " << constraints.size() << " constraints.";
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:增加路徑修剪器
AddWorkItem([this, trimmer_ptr]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
trimmers_.emplace_back(trimmer_ptr);
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:運作全局優化函數
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
return WorkItem::Result::kRunOptimization;
});
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
解釋:設定landmark位姿
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
data_.landmark_nodes[landmark_id].global_landmark_pose = global_pose;
data_.landmark_nodes[landmark_id].frozen = frozen;
return WorkItem::Result::kDoNotRunOptimization;
});
好了,讓我也消化一下。