move_base導航配置檔案:
<?xml version="1.0"?>
<launch>
<node pkg="move_base" type="move_base" respawn="false" name="move_base" output="screen" clear_params="true">
<rosparam file="$(find navigation_stage)/nav/params/costmap_common_params.yaml" command="load" ns="global_costmap" />
<rosparam file="$(find navigation_stage)/nav/params/costmap_common_params.yaml" command="load" ns="local_costmap" />
<rosparam file="$(find navigation_stage)/nav/params/global_costmap_params.yaml" command="load" />
<rosparam file="$(find navigation_stage)/nav/params/local_costmap_params.yaml" command="load" />
<rosparam file="$(find navigation_stage)/nav/params/base_global_planner_params.yaml" command="load" />
<rosparam file="$(find navigation_stage)/nav/params/teb_local_planner_params.yaml" command="load" />
<rosparam file="$(find navigation_stage)/nav/params/costmap_converter_params.yaml" command="load" />
<param name="base_global_planner" value="global_planner/GlobalPlanner" />
<param name="base_local_planner" value="teb_local_planner/TebLocalPlannerROS" />
</node>
</launch>
這是navigation的第一篇文章,主要通過分析ROS代碼級實作,了解navigation。本文從move_base入手。
機器人導航主要架構如圖:
Navigation Stack主要組成部分:move_base:
使用者調用movebase是通過傳入帶tf參數的構造函數:
move_base::MoveBase move_base( tf );
以下分析move_base的構造函數:
MoveBase::MoveBase(tf::TransformListener& tf):
tf_(tf),
as_(NULL),
planner_costmap_ros_(NULL), controller_costmap_ros_(NULL),
bgp_loader_("nav_core", "nav_core::BaseGlobalPlanner"),
blp_loader_("nav_core", "nav_core::BaseLocalPlanner"),
recovery_loader_("nav_core", "nav_core::RecoveryBehavior"),
planner_plan_(NULL), latest_plan_(NULL), controller_plan_(NULL),
runPlanner_(false), setup_(false), p_freq_change_(false),
c_freq_change_(false), new_global_plan_(false)
這部分是構造函數的初始化清單,可以看到幾個重要的參數:
planner_costmap_ros_(NULL), controller_costmap_ros_(NULL),
bgp_loader_("nav_core", "nav_core::BaseGlobalPlanner"),
blp_loader_("nav_core", "nav_core::BaseLocalPlanner"),
recovery_loader_("nav_core", "nav_core::RecoveryBehavior")
planner_costmap_ros_是用于全局導航的地圖,controller_costmap_ros_ 是局部導航用的地圖,地圖類型為經過ROS封裝的costmap_2d::Costmap2DROS* 。關于地圖類型的分析會在接下來的文章中進行。
bgp_loader_ 是global planner, blp_loader_ 是local planner。二者的聲明為:
pluginlib::ClassLoader<nav_core::BaseGlobalPlanner> bgp_loader_;
pluginlib::ClassLoader<nav_core::BaseLocalPlanner> blp_loader_;
是屬于一個模闆類ClassLoader,執行個體化為BaseGlobalPlanner或者BaseLocalPlanner。關于pluginlib的分析也有在接下來的文章中進行。
bgp_loader_ 和 blp_loader_ 的作用是為以下類成員提供執行個體化:
boost::shared_ptr<nav_core::BaseLocalPlanner> tc_;
boost::shared_ptr<nav_core::BaseGlobalPlanner> planner_;
//initialize the global planner
try {
planner_ = bgp_loader_.createInstance(global_planner);
planner_->initialize(bgp_loader_.getName(global_planner), planner_costmap_ros_);
}
//create a local planner
try {
tc_ = blp_loader_.createInstance(local_planner);
ROS_INFO("Created local_planner %s", local_planner.c_str());
tc_->initialize(blp_loader_.getName(local_planner), &tf_, controller_costmap_ros_);
}
進入構造函數,第一句:
as_ = new MoveBaseActionServer(ros::NodeHandle(), "move_base", boost::bind(&MoveBase::executeCb, this, _1), false);
as_維護movebase的actionServer狀态機,并且建立了一個executeCb線程。
接下來從private_nh中擷取參數設定值。
設定plan buffer:
planner_plan_ = new std::vector<geometry_msgs::PoseStamped>();
latest_plan_ = new std::vector<geometry_msgs::PoseStamped>();
controller_plan_ = new std::vector<geometry_msgs::PoseStamped>();
注意,這三個plan都是global_plan,最終由controller_plan_ 傳給 local planner:
//Line821
if(!tc_->setPlan(*controller_plan_)){
開啟planner thread:
//set up the planner's thread
planner_thread_ = new boost::thread(boost::bind(&MoveBase::planThread, this));
接下來設定一堆publisher, 包括cmd_vel,current_goal,goal。
然後是機器人的幾何尺寸設定等。
然後執行個體化planner_costmap_ros_,controller_costmap_ros_
planner_costmap_ros_ = new costmap_2d::Costmap2DROS("global_costmap", tf_);
controller_costmap_ros_ = new costmap_2d::Costmap2DROS("local_costmap", tf_);
開啟costmap基于傳感器資料的更新:
// Start actively updating costmaps based on sensor data
planner_costmap_ros_->start();
controller_costmap_ros_->start();
載入recovery behavior,這部分是狀态機的設計問題,可以采用預設的設定,也可以使用者指定:
if (!loadRecoveryBehaviors(private_nh)){
loadDefaultRecoveryBehaviors();
}
然後開啟actionserver: as_->start(); 開啟參數動态配置服務,完了~這就是整個構造函數做的事情。
以下分析各個成員函數
void MoveBase::clearCostmapWindows(double size_x, double size_y)
首先通過planner_costmap_ros_->getRobotPose(global_pose); 擷取機器人在全局地圖的pose,然後以這個點為中心,找到以size_x和size_y 為邊長的矩形的四個頂點,調用planner_costmap_ros_->getCostmap()->setConvexPolygonCost(clear_poly, costmap_2d::FREE_SPACE); 完成對機器人所在區域的clear工作。同樣的操作在controller_costmap_ros_ 上也操作一遍,這樣關于globa costmap 和local costmap都已經在機器人所在的區域完成clear工作。
bool MoveBase::planService(nav_msgs::GetPlan::Request &req, nav_msgs::GetPlan::Response &resp)
req參數包含了起點和目标點資訊,如果使用者沒有指定起點,則将機器人目前的pose作為起點。首先判斷global planner 的costmap是否存在,以及是否能夠得到機器人所在位置;然後調用clearCostmapWindows(2 * clearing_radius_, 2 * clearing_radius_) 完成對機器人區域的clear;最後分兩種方式進行全局的路徑規劃,模式一:對确切的目标進行makeplan(),如果規劃成功,則可以擷取到global_plan數組; 如果不成功,則模式二: 目标在障礙物中,則将目标點設定在指定的公差範圍内進行規劃。,最終将global_plan數組指派給resp.plan.poses。
bool MoveBase::makePlan(const geometry_msgs::PoseStamped& goal, std::vector<geometry_msgs::PoseStamped>& plan)
首先lock 住global costmap,然後判斷global costmap是否存在,以及是否能夠獲得機器人所在位置,如果失敗,則直接傳回失敗。最後調用核心代碼:if(!planner_->makePlan(start, goal, plan) || plan.empty()) 計算出global_plan數組。
geometry_msgs::PoseStamped MoveBase::goalToGlobalFrame(const geometry_msgs::PoseStamped& goal_pose_msg)
這個函數的核心功能是将機器人的目标姿勢從機器人的坐标系轉換到全局的坐标系: tf_.transform(goal_pose_msg, global_pose,global_frame);
void MoveBase::wakePlanner(const ros::TimerEvent& event)
{
// we have slept long enough for rate
planner_cond_.notify_one();
}
這個函數隻有一句代碼:通過boost thread的條件變量planner_cond_ 喚醒線程 planThread。
MoveBase的核心函數是線程planThread:
void MoveBase::planThread()
進入函數首先會将planner_mutex_ 鎖住:
boost::unique_lock<boost::mutex> lock(planner_mutex_);
while(n.ok()){
//check if we should run the planner (the mutex is locked)
while(wait_for_wake || !runPlanner_){
//if we should not be running the planner then suspend this thread
ROS_DEBUG_NAMED("move_base_plan_thread","Planner thread is suspending");
planner_cond_.wait(lock);
wait_for_wake = false;
}
是以線程繼續運作依賴于變量
runPlanner_
,喚醒線程依賴于條件變量
planner_cond_
這兩個變量是後續其他函數需要喚醒并執行線程planThread的關鍵。
然後調用makePlan:
//run planner
planner_plan_->clear();
bool gotPlan = n.ok() && makePlan(temp_goal, *planner_plan_);
如果makePlan成功,則上鎖,然後對planner_plan_ 拷貝,過程如下:
将最新的plan path給latest_plan_,然後将上一次的plan path給planner_plan_,這樣這兩個buffer就儲存了最新的plan 和上一次的plan。
void MoveBase::executeCb(const move_base_msgs::MoveBaseGoalConstPtr& move_base_goal)
首先主要是檢查輸入參數 move_base_goal -> target_pose.pose.orientation的合法性。然後調用geometry_msgs::PoseStamped goal = goalToGlobalFrame(move_base_goal->target_pose) 将goal轉換為在全局地圖下的goal。
然後開啟planner:
boost::unique_lock<boost::mutex> lock(planner_mutex_);
planner_goal_ = goal;
runPlanner_ = true;
planner_cond_.notify_one();
lock.unlock();
然後進入死循環:首先判斷as_是否可搶占的,檢查是否有新的goal,如果有則處理新的goal,判斷new goal的合法性,并開啟planThread 為new goal 生成plan。繼續往下,判斷goal的坐标系和planner_costmap_ros_ 的坐标系是否是一緻的,如果不是,則調用goal = goalToGlobalFrame(goal) 轉換到 planner_costmap_ros_的坐标系下,然後重新開啟planThread。最終,我們拿到了global plan, 接下來調用核心代碼:
bool done = executeCycle(goal, global_plan);
在死循環最後sleep足夠時間,以滿足frequency的要求。如果死循環被退出,則在本函數末尾開啟planThread,似的線程能正常執行到末尾,當線程再次在死循環中運作時,檢查while(n.ok())會失敗,然後線程正常清理退出。
bool MoveBase::executeCycle(geometry_msgs::PoseStamped& goal, std::vector<geometry_msgs::PoseStamped>& global_plan)
進入函數,第一行 boost::recursive_mutex::scoped_lock ecl(configuration_mutex_) 鎖住函數:
處理震蕩問題: 檢查一下我們是否移動到足夠遠的地方來重置振蕩逾時。
//check to see if we've moved far enough to reset our oscillation timeout
if(distance(current_position, oscillation_pose_) >= oscillation_distance_)
{
last_oscillation_reset_ = ros::Time::now();
oscillation_pose_ = current_position;
if(recovery_trigger_ == OSCILLATION_R)
recovery_index_ = 0;
}
然後檢查controller_costmap_ros_ 是否是最新的
//check that the observation buffers for the costmap are current, we don't want to drive blind
if(!controller_costmap_ros_->isCurrent()){
ROS_WARN("[%s]:Sensor data is out of date, we're not going to allow commanding of the base for safety",ros::this_node::getName().c_str());
publishZeroVelocity();
return false;
}
然後檢查是否有新的plan,如果有,則上鎖遞推controller_plan_和 latest_plan_。
然後将global的plan傳遞給tc,如果失敗則關閉planThread并直接退出。
if(!tc_->setPlan(*controller_plan_)){
//ABORT and SHUTDOWN COSTMAPS
ROS_ERROR("Failed to pass global plan to the controller, aborting.");
resetState();
//disable the planner thread
lock.lock();
runPlanner_ = false;
lock.unlock();
as_->setAborted(move_base_msgs::MoveBaseResult(), "Failed to pass global plan to the controller.");
return true;
}
然後進入movebase的控制邏輯的狀态機: {state_, recovery_trigger_} ,這兩個變量的定義:
enum MoveBaseState{PLANNING, CONTROLLING, CLEARING};
enum RecoveryTrigger{PLANNING_R, CONTROLLING_R,OSCILLATION_R};
RecoveryTrigger對應的是在相應狀态下出現失敗。 注意,這個狀态機和planThread是在互動的。
狀态機一旦進入PLANNING狀态,就喚醒planThread線程,在planThread線程中一旦找到plan就将state_設定為CONTROLLING
//if we are in a planning state, then we'll attempt to make a plan
case PLANNING:
{
boost::recursive_mutex::scoped_lock lock(planner_mutex_);
runPlanner_ = true;
planner_cond_.notify_one();
}
ROS_DEBUG_NAMED("move_base","Waiting for plan, in the planning state.");
break;
如果沒有找到plan,則state_ = CLEARING;recovery_trigger_ = PLANNING_R;
如果狀态機在CONTROLLING狀态
首先檢查是否到達目的地
//check to see if we've reached our goal
if(tc_->isGoalReached()){
ROS_DEBUG_NAMED("move_base","Goal reached!");
resetState();
//disable the planner thread
boost::unique_lock<boost::recursive_mutex> lock(planner_mutex_);
runPlanner_ = false;
lock.unlock();
as_->setSucceeded(move_base_msgs::MoveBaseResult(), "Goal reached.");
return true;
}
如果震蕩條件滿足,則:publishZeroVelocity();state_ = CLEARING; recovery_trigger_ = OSCILLATION_R;;然後鎖住local costmap :
boost::unique_lock<costmap_2d::Costmap2D::mutex_t> lock(*(controller_costmap_ros_->getCostmap()->getMutex()));
然後由baseLocalPlanner計算cmd_vel:
成功則可以控制地盤了,如果失敗,檢查是否逾時,如果逾時:publishZeroVelocity();state_ = CLEARING;recovery_trigger_ = CONTROLLING_R; 沒有逾時:state_ = PLANNING;publishZeroVelocity(); 并喚醒planThread線程。
如果狀态機在CLEARING狀态:
case CLEARING:
ROS_DEBUG_NAMED("move_base","In clearing/recovery state");
//we'll invoke whatever recovery behavior we're currently on if they're enabled
if(recovery_behavior_enabled_ && recovery_index_ < recovery_behaviors_.size()){
ROS_DEBUG_NAMED("move_base_recovery","Executing behavior %u of %zu", recovery_index_, recovery_behaviors_.size());
recovery_behaviors_[recovery_index_]->runBehavior();
//we at least want to give the robot some time to stop oscillating after executing the behavior
last_oscillation_reset_ = ros::Time::now();
//we'll check if the recovery behavior actually worked
ROS_DEBUG_NAMED("move_base_recovery","Going back to planning state");
last_valid_plan_ = ros::Time::now();
planning_retries_ = 0;
state_ = PLANNING;
//update the index of the next recovery behavior that we'll try
recovery_index_++;
}
else{
ROS_DEBUG_NAMED("move_base_recovery","All recovery behaviors have failed, locking the planner and disabling it.");
//disable the planner thread
boost::unique_lock<boost::recursive_mutex> lock(planner_mutex_);
runPlanner_ = false;
lock.unlock();
ROS_DEBUG_NAMED("move_base_recovery","Something should abort after this.");
if(recovery_trigger_ == CONTROLLING_R){
ROS_ERROR("Aborting because a valid control could not be found. Even after executing all recovery behaviors");
as_->setAborted(move_base_msgs::MoveBaseResult(), "Failed to find a valid control. Even after executing recovery behaviors.");
}
else if(recovery_trigger_ == PLANNING_R){
ROS_ERROR("Aborting because a valid plan could not be found. Even after executing all recovery behaviors");
as_->setAborted(move_base_msgs::MoveBaseResult(), "Failed to find a valid plan. Even after executing recovery behaviors.");
}
else if(recovery_trigger_ == OSCILLATION_R){
ROS_ERROR("Aborting because the robot appears to be oscillating over and over. Even after executing all recovery behaviors");
as_->setAborted(move_base_msgs::MoveBaseResult(), "Robot is oscillating. Even after executing recovery behaviors.");
}
resetState();
return true;
}
break;
moveBase基本上主要内容就是這些,其他未分析的函數都比較簡單。但是在moveBase的實作中,主要依賴于costmap_2d 以及global planner 和local planner。還有pluginlib的使用,簡化了各種具體實作,是以在接下來的文章中着重分析這幾部分。